Skip to content

Commit

Permalink
Postgis add with lineage (#1632)
Browse files Browse the repository at this point in the history
* update rasterio version

* handle lineage in postgis dataset add

* update whats_new

* fix some tests

* handle lineage in postgis dataset add

* update whats_new

* attempt fixes

* fix mypy

---------

Co-authored-by: Ariana Barzinpour <[email protected]>
Co-authored-by: Paul Haesler <[email protected]>
  • Loading branch information
3 people authored Sep 19, 2024
1 parent 2510b0d commit eeb5e6d
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 55 deletions.
40 changes: 20 additions & 20 deletions datacube/drivers/postgis/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1355,14 +1355,15 @@ def get_all_relations(self, dsids: Iterable[uuid.UUID]) -> Iterable[LineageRelat
source_id=rel.source_dataset_ref,
derived_id=rel.derived_dataset_ref)

def write_relations(self, relations: Iterable[LineageRelation], allow_updates: bool):
def write_relations(self, relations: Iterable[LineageRelation], allow_updates: bool) -> int:
"""
Write a set of LineageRelation objects to the database.
:param relations: An Iterable of LineageRelation objects
:param allow_updates: if False, only allow adding new relations, not updating old ones.
:return: Count of database rows affected
"""
affected = 0
if allow_updates:
by_classifier: dict[str, Any] = {}
for rel in relations:
Expand All @@ -1375,32 +1376,31 @@ def write_relations(self, relations: Iterable[LineageRelation], allow_updates: b
by_classifier[rel.classifier].append(db_repr)
else:
by_classifier[rel.classifier] = [db_repr]
updates = 0
for classifier, values in by_classifier.items():
qry = insert(DatasetLineage).on_conflict_do_update(
index_elements=["derived_dataset_ref", "source_dataset_ref"],
set_={"classifier": classifier},
where=(DatasetLineage.classifier != classifier))
res = self._connection.execute(qry, values)
updates += res.rowcount
return updates
affected += res.rowcount
else:
values = [
{
"derived_dataset_ref": rel.derived_id,
"source_dataset_ref": rel.source_id,
"classifier": rel.classifier
}
for rel in relations
]
qry = insert(DatasetLineage)
try:
res = self._connection.execute(
qry, values
)
return res.rowcount
except IntegrityError:
return 0
for rel in relations:
values = [
{
"derived_dataset_ref": rel.derived_id,
"source_dataset_ref": rel.source_id,
"classifier": rel.classifier
}
]
qry = insert(DatasetLineage)
try:
res = self._connection.execute(
qry, values
)
affected += res.rowcount
except IntegrityError:
return 0
return affected

def load_lineage_relations(self,
roots: Iterable[uuid.UUID],
Expand Down
30 changes: 15 additions & 15 deletions datacube/index/postgis/_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,37 +143,37 @@ def add(self, dataset: Dataset,
:param dataset: dataset to add
:param with_lineage:
- ``True (default)`` attempt adding lineage datasets if missing
- ``False`` record lineage relations, but do not attempt
adding lineage datasets to the db
- ``True (default)`` record lineage relations in the db
Since we no longer accept embedded lineage, any lineage relations should
already exist in the db, so there's no longer a need for differentiating between
adding and recording. This parameter has been kept for compatibility reasons.s
:param archive_less_mature: if integer, search for less
mature versions of the dataset with the int value as a millisecond
delta in timestamp comparison
:rtype: Dataset
"""

if with_lineage:
raise ValueError("Lineage is not yet supported by the postgis driver")

_LOG.info('Indexing %s', dataset.id)

if self.has(dataset.id):
_LOG.warning('Dataset %s is already in the database', dataset.id)
return dataset
with self._db_connection(transaction=True) as transaction:
# 1a. insert (if not already exists)
is_new = transaction.insert_dataset(dataset.metadata_doc_without_lineage(), dataset.id, dataset.product.id)
if is_new:
# 1b. Prepare spatial index extents
transaction.update_spindex(dsids=[dataset.id])
transaction.update_search_index(dsids=[dataset.id])
# 1c. Store locations
if dataset.uri is not None:
self._ensure_new_locations(dataset, transaction=transaction)
transaction.insert_dataset(dataset.metadata_doc_without_lineage(), dataset.id, dataset.product.id)
# 1b. Prepare spatial index extents
transaction.update_spindex(dsids=[dataset.id])
transaction.update_search_index(dsids=[dataset.id])
# 1c. Store locations
if dataset.uri is not None:
self._ensure_new_locations(dataset, transaction=transaction)
if archive_less_mature is not None:
self.archive_less_mature(dataset, archive_less_mature)
if dataset.source_tree is not None:
self._index.lineage.add(dataset.source_tree)
if dataset.derived_tree is not None:
self._index.lineage.add(dataset.derived_tree)

return dataset

Expand Down
11 changes: 3 additions & 8 deletions datacube/model/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,12 @@ def from_eo3_doc(cls, doc: Mapping[str, Any],
"""
Generate a shallow (depth=1) LineageTree from an EO3 dataset document
:param dsid: The (derived) dataset id
:param sources: A dictionary of classifiers to list of source IDs
:param direction: Tree direction (default SOURCEwards, as per an EO3 dataset)
:param doc: The dataset metadata dictionary
:param home: Home database for source datasets (defaults to None).
:param home_derived: Home database for the derived dataset (defaults to None).
:return: A depth==1 LineageTree
:param doc_in:
:return:
"""
lineage = doc.get("lineage", {})
lineage = doc.get("lineage")
return cls.from_data(doc["id"], lineage, home=home, home_derived=home_derived)

@classmethod
Expand All @@ -195,7 +190,7 @@ def from_data(cls, dsid: UUID,
:param home_derived: Home database for the derived dataset (defaults to None).
:return: A depth==1 LineageTree
"""
if sources is None:
if not sources:
children = None
else:
children = {
Expand Down
3 changes: 2 additions & 1 deletion datacube/utils/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,8 @@ def id(self):
@property
def sources(self):
if self._sources is None:
self._sources = {k: SimpleDocNav(v)
# sources aren't expected to be embedded documents anymore
self._sources = {k: SimpleDocNav(v) if isinstance(v, collections.abc.Mapping) else v
for k, v in get_doc_offset(self._sources_path, self._doc, {}).items()}
return self._sources

Expand Down
1 change: 1 addition & 0 deletions docs/about/whats_new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ v1.9.next
- Remove workaround for an odc-geo bug that is now fixed. (:pull:`1622`)
- Fix call to geopolygon search. (:pull:`1627`)
- Use antimeridian package to "fix" extent polygons. (:pull:`1628`)
- Record lineage when adding datasets with postgis index (:pull:`1632`)
- Update schema logic (:pull:`1634`)
- Drop valid-area check and anti-meridian fix 3857 extents (:pull:1635)

Expand Down
27 changes: 16 additions & 11 deletions integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,11 @@ def doc_to_ds(index, product_name, ds_doc, ds_path, src_tree=None, derived_tree=
resolver = Doc2Dataset(index, products=[product_name], verify_lineage=False)
ds, err = resolver(ds_doc, ds_path)
assert err is None and ds is not None
if index.supports_external_lineage:
index.datasets.add(ds, with_lineage=False)
if src_tree:
index.lineage.add(src_tree)
if derived_tree:
index.lineage.add(derived_tree)
if not (src_tree or derived_tree):
eo3_tree = LineageTree.from_eo3_doc(ds_doc)
index.lineage.add(eo3_tree)
else:
index.datasets.add(ds, with_lineage=index.supports_lineage)
if src_tree is not None:
ds.source_tree = src_tree
if derived_tree is not None:
ds.derived_tree = derived_tree
index.datasets.add(ds, with_lineage=index.supports_lineage)
return index.datasets.get(ds.id)


Expand Down Expand Up @@ -386,6 +380,17 @@ def ds_no_region(index, extended_eo3_metadata_type, ls8_eo3_product, final_datas
*doc_no_region)


@pytest.fixture
def ds_with_lineage(index, wo_eo3_product, eo3_wo_dataset_doc, ls8_eo3_dataset):
doc, path = eo3_wo_dataset_doc
# rewrite lineage to correct format
doc["lineage"] = {"source_datasets": {"ard": [ls8_eo3_dataset.id]}}
return doc_to_ds_no_add(
index,
wo_eo3_product.name,
doc, path)


@pytest.fixture
def ga_s2am_ard3_final(index, eo3_sentinel_metadata_type, ga_s2am_ard_3_product, ga_s2am_ard_3_final_doc):
return doc_to_ds_no_add(
Expand Down
10 changes: 10 additions & 0 deletions integration_tests/index/test_index_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,16 @@ def test_index_dataset_with_sources(index, default_metadata_type):
index.datasets.add(child, with_lineage=False)


@pytest.mark.parametrize('datacube_env_name', ('experimental',))
def test_index_dataset_with_lineage(index, ds_with_lineage, ls8_eo3_dataset):
assert ds_with_lineage.source_tree
index.datasets.add(ds_with_lineage)
sources = index.lineage.get_source_tree(ds_with_lineage.id).children
assert len(sources["ard"]) == 1
assert sources["ard"][0].dataset_id == ls8_eo3_dataset.id
assert index.datasets.get(ds_with_lineage.id)


@pytest.mark.parametrize('datacube_env_name', ('datacube', ))
def test_index_dataset_with_location(index: Index, default_metadata_type: MetadataType):
first_file = Path('/tmp/first/something.yaml').absolute()
Expand Down

0 comments on commit eeb5e6d

Please sign in to comment.