Skip to content

Commit

Permalink
switch back to properties
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Oct 14, 2024
1 parent b52a4a8 commit 9996735
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
36 changes: 23 additions & 13 deletions dlt/destinations/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,18 @@ def __init__(
self._destination = Destination.from_reference(destination)
self._provided_schema = schema
self._dataset_name = dataset_name
self.sql_client: SqlClientBase[Any] = None
self.schema: Schema = None
self._sql_client: SqlClientBase[Any] = None
self._schema: Schema = None

@property
def schema(self) -> Schema:
self._ensure_client_and_schema()
return self._schema

@property
def sql_client(self) -> SqlClientBase[Any]:
self._ensure_client_and_schema()
return self._sql_client

def _destination_client(self, schema: Schema) -> JobClientBase:
client_spec = self._destination.spec()
Expand All @@ -101,34 +111,34 @@ def _destination_client(self, schema: Schema) -> JobClientBase:
def _ensure_client_and_schema(self) -> None:
"""Lazy load schema and client"""
# full schema given, nothing to do
if not self.schema and isinstance(self._provided_schema, Schema):
self.schema = self._provided_schema
if not self._schema and isinstance(self._provided_schema, Schema):
self._schema = self._provided_schema

# schema name given, resolve it from destination by name
elif not self.schema and isinstance(self._provided_schema, str):
elif not self._schema and isinstance(self._provided_schema, str):
with self._destination_client(Schema(self._provided_schema)) as client:
if isinstance(client, WithStateSync):
stored_schema = client.get_stored_schema(self._provided_schema)
if stored_schema:
self.schema = Schema.from_stored_schema(json.loads(stored_schema.schema))
self._schema = Schema.from_stored_schema(json.loads(stored_schema.schema))

# no schema name given, load newest schema from destination
elif not self.schema:
elif not self._schema:
with self._destination_client(Schema(self._dataset_name)) as client:
if isinstance(client, WithStateSync):
stored_schema = client.get_stored_schema()
if stored_schema:
self.schema = Schema.from_stored_schema(json.loads(stored_schema.schema))
self._schema = Schema.from_stored_schema(json.loads(stored_schema.schema))

# default to empty schema with dataset name if nothing found
if not self.schema:
self.schema = Schema(self._dataset_name)
if not self._schema:
self._schema = Schema(self._dataset_name)

# here we create the client bound to the resolved schema
if not self.sql_client:
destination_client = self._destination_client(self.schema)
if not self._sql_client:
destination_client = self._destination_client(self._schema)
if isinstance(destination_client, WithSqlClient):
self.sql_client = destination_client.sql_client
self._sql_client = destination_client.sql_client
else:
raise Exception(
f"Destination {destination_client.config.destination_type} does not support"
Expand Down
4 changes: 2 additions & 2 deletions tests/load/test_read_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ def double_items():
# check dataset factory
dataset = dlt._dataset(destination=destination_for_dataset, dataset_name=pipeline.dataset_name)
# verfiy that sql client and schema are lazy loaded
assert not dataset.schema
assert not dataset.sql_client
assert not dataset._schema
assert not dataset._sql_client
table_relationship = dataset.items
table = table_relationship.fetchall()
assert len(table) == total_records
Expand Down

0 comments on commit 9996735

Please sign in to comment.