Skip to content

Commit

Permalink
Merge pull request #9 from s7clarke10/feature/array_size_for_other_re…
Browse files Browse the repository at this point in the history
…p_methods

Feature/array size for other rep methods
  • Loading branch information
s7clarke10 authored Sep 23, 2022
2 parents a84b481 + e1448ea commit 713df3b
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 5 deletions.
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## 1.2.1
* Applying cursor array_size to incremental and log_based.
* Renaming config parameter from `full_table_sync_batch_size` to `cursor_array_size`.
* Increasing the default array size from 100 to 1000.

## 1.2.0
* New config option to provide an offset for incremental loads - offset_value.
* Changing the sort order to sort by the column_id so tables columns match the database source.

## 1.1.9
* Pulling the database name from the env if v$database is unavailable.

Expand All @@ -8,7 +17,7 @@
* This variant uses orjson for serializing 40-50x faster than other libraries.

## 1.1.7
* Bumping cx_Oracle to 8.2
* Bumping cx_Oracle to 8.3
* Removing unnecessary call to get the database name

## 1.1.6
Expand Down
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ Running the the tap requires a `config.json` file. Example with the minimal sett
}
```

Recommended optional settings

* `"filter_schemas": "schema name"` - This will speed up discover time as it only discovers the given schema.
* `"filter_tables": ["SCHEMA-TABLE1", "SCHEMA-TABLE1"]` - this will speed up discovery to just the listed tables.
* `"use_singer_decimal": true` - This will help avoid numeric rounding issues emitting as a string with a format of singer.decimal.
* `"cursor_array_size": 10000` - This will help speed up extracts over a WAN or low latency network. The default is 1000.

Optional:

For older database or connecting to an instance you can use the legacy SID for the connection.
Expand Down Expand Up @@ -176,6 +183,16 @@ Usage (offsetting by +1 day in seconds = 24*3600):
}
```

Optional:

A numeric setting adjusting the internal buffersize. The common query tuning scenario is for SELECT statements that return a large number of rows over a slow network. Increasing arraysize can improve performance by reducing the number of round-trips to the database. However increasing this value increases the amount of memory required.

```json
{
"cursor_array_size": 10000,
}
```

### To run tests:

Tests require Oracle on Amazon RDS >= 12.1, and a user called `ROOT`.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = f.read()

setup(name='pipelinewise-tap-oracle',
version='1.1.8',
version='1.2.1',
description='Singer.io tap for extracting data from Oracle - PipelineWise compatible',
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
6 changes: 4 additions & 2 deletions tap_oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,8 +650,10 @@ def main_impl():
log_miner.DYNAMIC_SCN_WINDOW_SIZE = bool(args.config.get('logminer_dynamic_scn_window'))
if args.config.get('logminer_iter_with_reduction_factor'):
log_miner.ITER_WITH_REDUCTION_FACTOR = int(args.config.get('logminer_iter_with_reduction_factor'))
if args.config.get('full_table_sync_batch_size'):
full_table.BATCH_SIZE = int(args.config.get('full_table_sync_batch_size'))
if args.config.get('cursor_array_size'):
full_table.BATCH_SIZE = int(args.config.get('cursor_array_size'))
incremental.BATCH_SIZE = int(args.config.get('cursor_array_size'))
log_miner.BATCH_SIZE = int(args.config.get('cursor_array_size'))
full_table.USE_ORA_ROWSCN = bool(args.config.get('use_ora_rowscn', True))
use_singer_decimal = bool(args.config.get('use_singer_decimal', False))
incremental.OFFSET_VALUE = args.config.get('offset_value',0)
Expand Down
2 changes: 1 addition & 1 deletion tap_oracle/sync_strategies/full_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

UPDATE_BOOKMARK_PERIOD = 1000

BATCH_SIZE = 100
BATCH_SIZE = 1000

USE_ORA_ROWSCN = True

Expand Down
3 changes: 3 additions & 0 deletions tap_oracle/sync_strategies/incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
# An offset value that can be configured to shift the incremental filter clause
OFFSET_VALUE = 0

BATCH_SIZE = 1000

def sync_table(conn_config, stream, state, desired_columns):
connection = orc_db.open_connection(conn_config)
connection.outputtypehandler = common.OutputTypeHandler

cur = connection.cursor()
cur.arraysize = BATCH_SIZE
cur.execute("ALTER SESSION SET TIME_ZONE = '00:00'")
cur.execute("""ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD"T"HH24:MI:SS."00+00:00"'""")
cur.execute("""ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD"T"HH24:MI:SSXFF"+00:00"'""")
Expand Down
2 changes: 2 additions & 0 deletions tap_oracle/sync_strategies/log_miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
DYNAMIC_SCN_WINDOW_SIZE = False
ITER_WITH_REDUCTION_FACTOR = 10

BATCH_SIZE = 1000

def get_connection_with_common_user_or_default(conn_config):
cdb_conn_config = conn_config.copy()
Expand Down Expand Up @@ -118,6 +119,7 @@ def sync_tables(conn_config, streams, state, end_scn, scn_window_size = None):
""".format(stream.tap_stream_id))

cur = connection.cursor()
cur.arraysize = BATCH_SIZE
cur.execute("ALTER SESSION SET TIME_ZONE = '00:00'")
cur.execute("""ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD"T"HH24:MI:SS."00+00:00"'""")
cur.execute("""ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD"T"HH24:MI:SSXFF"+00:00"'""")
Expand Down

0 comments on commit 713df3b

Please sign in to comment.