diff --git a/sources/sql_database/__init__.py b/sources/sql_database/__init__.py index 8d28e3b0e..93b1a0869 100644 --- a/sources/sql_database/__init__.py +++ b/sources/sql_database/__init__.py @@ -178,7 +178,7 @@ def sql_table( else: reflection_level = reflection_level or "minimal" - engine = engine_from_credentials(credentials) + engine = engine_from_credentials(credentials, may_dispose_after_use=True) engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size) metadata = metadata or MetaData(schema=schema) diff --git a/sources/sql_database/helpers.py b/sources/sql_database/helpers.py index 11d0e1634..fa054cc90 100644 --- a/sources/sql_database/helpers.py +++ b/sources/sql_database/helpers.py @@ -218,17 +218,27 @@ def table_rows( loader = TableLoader( engine, backend, table, columns, incremental=incremental, chunk_size=chunk_size ) - yield from loader.load_rows(backend_kwargs) + try: + yield from loader.load_rows(backend_kwargs) + finally: + # dispose the engine if created for this particular table + # NOTE: database wide engines are not disposed, not externally provided + if getattr(engine, "may_dispose_after_use", False): + engine.dispose() def engine_from_credentials( - credentials: Union[ConnectionStringCredentials, Engine, str], **backend_kwargs: Any + credentials: Union[ConnectionStringCredentials, Engine, str], + may_dispose_after_use: bool = False, + **backend_kwargs: Any, ) -> Engine: if isinstance(credentials, Engine): return credentials if isinstance(credentials, ConnectionStringCredentials): credentials = credentials.to_native_representation() - return create_engine(credentials, **backend_kwargs) + engine = create_engine(credentials, **backend_kwargs) + setattr(engine, "may_dispose_after_use", may_dispose_after_use) # noqa + return engine def unwrap_json_connector_x(field: str) -> TDataItem: diff --git a/tests/sql_database/test_arrow_helpers.py b/tests/sql_database/test_arrow_helpers.py index 98326acca..230eb6a08 100644 --- a/tests/sql_database/test_arrow_helpers.py +++ b/tests/sql_database/test_arrow_helpers.py @@ -14,7 +14,7 @@ def test_row_tuples_to_arrow_unknown_types(all_unknown: bool) -> None: from sqlalchemy.dialects.postgresql import Range # Applies to NUMRANGE, DATERANGE, etc sql types. Sqlalchemy returns a Range dataclass - IntRange = Range[int] + IntRange = Range rows = [ ( diff --git a/tests/sql_database/test_sql_database_source.py b/tests/sql_database/test_sql_database_source.py index 6be5ba281..9db1cf5d6 100644 --- a/tests/sql_database/test_sql_database_source.py +++ b/tests/sql_database/test_sql_database_source.py @@ -32,6 +32,15 @@ from tests.sql_database.sql_source import SQLAlchemySourceDB +@pytest.fixture(autouse=True) +def dispose_engines(): + yield + import gc + + # will collect and dispose all hanging engines + gc.collect() + + def make_pipeline(destination_name: str) -> dlt.Pipeline: return dlt.pipeline( pipeline_name="sql_database", @@ -452,6 +461,7 @@ def sql_table_source() -> List[DltResource]: except Exception as exc: if isinstance(exc.__context__, NotImplementedError): pytest.skip("Test skipped due to: " + str(exc.__context__)) + raise # half of the records loaded -1 record. end values is non inclusive assert data_item_length(rows) == abs(end_id - start_id) # check first and last id to see if order was applied