Skip to content

Commit

Permalink
[sql_database] disposes sql alchemy engines created per table (#527)
Browse files Browse the repository at this point in the history
disposes sql alchemy engines created per table
  • Loading branch information
rudolfix authored Jul 15, 2024
1 parent 1fb3028 commit cb96e69
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 5 deletions.
2 changes: 1 addition & 1 deletion sources/sql_database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def sql_table(
else:
reflection_level = reflection_level or "minimal"

engine = engine_from_credentials(credentials)
engine = engine_from_credentials(credentials, may_dispose_after_use=True)
engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size)
metadata = metadata or MetaData(schema=schema)

Expand Down
16 changes: 13 additions & 3 deletions sources/sql_database/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,17 +218,27 @@ def table_rows(
loader = TableLoader(
engine, backend, table, columns, incremental=incremental, chunk_size=chunk_size
)
yield from loader.load_rows(backend_kwargs)
try:
yield from loader.load_rows(backend_kwargs)
finally:
# dispose the engine if created for this particular table
# NOTE: database wide engines are not disposed, not externally provided
if getattr(engine, "may_dispose_after_use", False):
engine.dispose()


def engine_from_credentials(
credentials: Union[ConnectionStringCredentials, Engine, str], **backend_kwargs: Any
credentials: Union[ConnectionStringCredentials, Engine, str],
may_dispose_after_use: bool = False,
**backend_kwargs: Any,
) -> Engine:
if isinstance(credentials, Engine):
return credentials
if isinstance(credentials, ConnectionStringCredentials):
credentials = credentials.to_native_representation()
return create_engine(credentials, **backend_kwargs)
engine = create_engine(credentials, **backend_kwargs)
setattr(engine, "may_dispose_after_use", may_dispose_after_use) # noqa
return engine


def unwrap_json_connector_x(field: str) -> TDataItem:
Expand Down
2 changes: 1 addition & 1 deletion tests/sql_database/test_arrow_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def test_row_tuples_to_arrow_unknown_types(all_unknown: bool) -> None:
from sqlalchemy.dialects.postgresql import Range

# Applies to NUMRANGE, DATERANGE, etc sql types. Sqlalchemy returns a Range dataclass
IntRange = Range[int]
IntRange = Range

rows = [
(
Expand Down
10 changes: 10 additions & 0 deletions tests/sql_database/test_sql_database_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@
from tests.sql_database.sql_source import SQLAlchemySourceDB


@pytest.fixture(autouse=True)
def dispose_engines():
yield
import gc

# will collect and dispose all hanging engines
gc.collect()


def make_pipeline(destination_name: str) -> dlt.Pipeline:
return dlt.pipeline(
pipeline_name="sql_database",
Expand Down Expand Up @@ -452,6 +461,7 @@ def sql_table_source() -> List[DltResource]:
except Exception as exc:
if isinstance(exc.__context__, NotImplementedError):
pytest.skip("Test skipped due to: " + str(exc.__context__))
raise
# half of the records loaded -1 record. end values is non inclusive
assert data_item_length(rows) == abs(end_id - start_id)
# check first and last id to see if order was applied
Expand Down

0 comments on commit cb96e69

Please sign in to comment.