Skip to content

Commit

Permalink
Adding a test to assert RCF values are calculated correctly (#349)
Browse files Browse the repository at this point in the history
* Add a test to assert RCF values are calculated correctly

* Remove flaky assertions

* Bump version to 1.1.16
  • Loading branch information
raghumdani authored Aug 29, 2024
1 parent fe6684c commit 5a4507d
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 9 deletions.
2 changes: 1 addition & 1 deletion deltacat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__))

__version__ = "1.1.15"
__version__ = "1.1.16"


__all__ = [
Expand Down
7 changes: 1 addition & 6 deletions deltacat/compute/compactor_v2/compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
_stage_new_partition,
_run_hash_and_merge,
_process_merge_results,
_upload_compaction_audit,
_write_new_round_completion_file,
_commit_compaction_result,
)
Expand Down Expand Up @@ -201,11 +200,6 @@ def _execute_compaction(

compaction_audit.save_round_completion_stats(mat_results)

_upload_compaction_audit(
params,
compaction_audit,
round_completion_info,
)
compaction_result: ExecutionCompactionResult = _write_new_round_completion_file(
params,
compaction_audit,
Expand All @@ -215,5 +209,6 @@ def _execute_compaction(
rcf_source_partition_locator,
new_compacted_delta_locator,
pyarrow_write_result,
round_completion_info,
)
return compaction_result
12 changes: 10 additions & 2 deletions deltacat/compute/compactor_v2/private/compaction_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ def _run_hash_and_merge(
if mutable_compaction_audit.telemetry_time_in_seconds
else 0.0
)

mutable_compaction_audit.set_telemetry_time_in_seconds(
telemetry_this_round + previous_telemetry
)
Expand Down Expand Up @@ -598,10 +599,10 @@ def _process_merge_results(
return merged_delta, mat_results, hb_id_to_entry_indices_range


def _upload_compaction_audit(
def _update_and_upload_compaction_audit(
params: CompactPartitionParams,
mutable_compaction_audit: CompactionSessionAuditInfo,
round_completion_info: RoundCompletionInfo,
round_completion_info: Optional[RoundCompletionInfo] = None,
) -> None:

# After all incremental delta related calculations, we update
Expand Down Expand Up @@ -637,6 +638,7 @@ def _write_new_round_completion_file(
rcf_source_partition_locator: rcf.PartitionLocator,
new_compacted_delta_locator: DeltaLocator,
pyarrow_write_result: PyArrowWriteResult,
prev_round_completion_info: Optional[RoundCompletionInfo] = None,
) -> ExecutionCompactionResult:
input_inflation = None
input_average_record_size_bytes = None
Expand Down Expand Up @@ -664,6 +666,12 @@ def _write_new_round_completion_file(
f" and average record size={input_average_record_size_bytes}"
)

_update_and_upload_compaction_audit(
params,
mutable_compaction_audit,
prev_round_completion_info,
)

new_round_completion_info = RoundCompletionInfo.of(
high_watermark=params.last_stream_position_to_compact,
compacted_delta_locator=new_compacted_delta_locator,
Expand Down
146 changes: 146 additions & 0 deletions deltacat/tests/compute/compactor_v2/test_compaction_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class TestCompactionSession:
INCREMENTAL_FILE_PATH = (
"deltacat/tests/compute/compactor_v2/data/incremental_source_date_pk.csv"
)
ERROR_RATE = 0.05

def test_compact_partition_when_no_input_deltas_to_compact(
self, local_deltacat_storage_kwargs
Expand Down Expand Up @@ -253,3 +254,148 @@ def test_compact_partition_when_rcf_was_written_by_past_commit(
# as it should be running incremental
assert compaction_audit.uniform_deltas_created == 1
assert compaction_audit.input_records == 6

def test_compact_partition_when_incremental_then_rcf_stats_accurate(
self, s3_resource, local_deltacat_storage_kwargs
):
"""
A test case which asserts the RCF stats are correctly generated for
a rebase and incremental use-case.
"""

# setup
staged_source = stage_partition_from_file_paths(
self.NAMESPACE, ["source"], **local_deltacat_storage_kwargs
)

source_delta = commit_delta_to_staged_partition(
staged_source, [self.BACKFILL_FILE_PATH], **local_deltacat_storage_kwargs
)

staged_dest = stage_partition_from_file_paths(
self.NAMESPACE, ["destination"], **local_deltacat_storage_kwargs
)
dest_partition = ds.commit_partition(
staged_dest, **local_deltacat_storage_kwargs
)

# action
rcf_url = compact_partition(
CompactPartitionParams.of(
{
"compaction_artifact_s3_bucket": TEST_S3_RCF_BUCKET_NAME,
"compacted_file_content_type": ContentType.PARQUET,
"dd_max_parallelism_ratio": 1.0,
"deltacat_storage": ds,
"deltacat_storage_kwargs": local_deltacat_storage_kwargs,
"destination_partition_locator": dest_partition.locator,
"drop_duplicates": True,
"hash_bucket_count": 2,
"last_stream_position_to_compact": source_delta.stream_position,
"list_deltas_kwargs": {
**local_deltacat_storage_kwargs,
**{"equivalent_table_types": []},
},
"primary_keys": ["pk"],
"rebase_source_partition_locator": source_delta.partition_locator,
"rebase_source_partition_high_watermark": source_delta.stream_position,
"records_per_compacted_file": 4000,
"s3_client_kwargs": {},
"source_partition_locator": source_delta.partition_locator,
}
)
)

backfill_rcf = get_rcf(s3_resource, rcf_url)
_, compaction_audit_key = backfill_rcf.compaction_audit_url.strip(
"s3://"
).split("/", 1)
compaction_audit = CompactionSessionAuditInfo(
**read_s3_contents(
s3_resource, TEST_S3_RCF_BUCKET_NAME, compaction_audit_key
)
)

assert abs(backfill_rcf.input_inflation - 0.05235042735042735) <= 1e-5
assert abs(backfill_rcf.input_average_record_size_bytes - 12.25) <= 1e-5

assert compaction_audit.input_records == 4
assert compaction_audit.records_deduped == 0
assert compaction_audit.records_deleted == 0
assert compaction_audit.untouched_file_count == 0
assert compaction_audit.untouched_record_count == 0
assert compaction_audit.untouched_size_bytes == 0
assert compaction_audit.untouched_file_ratio == 0
assert compaction_audit.uniform_deltas_created == 1
assert compaction_audit.hash_bucket_count == 2
assert compaction_audit.input_file_count == 1
assert compaction_audit.output_file_count == 2
assert abs(compaction_audit.output_size_bytes - 1832) / 1832 <= self.ERROR_RATE
assert abs(compaction_audit.input_size_bytes - 936) / 936 <= self.ERROR_RATE

# Now run an incremental compaction and verify if the previous RCF was read properly.
new_source_delta = commit_delta_to_partition(
source_delta.partition_locator,
[self.INCREMENTAL_FILE_PATH],
**local_deltacat_storage_kwargs,
)

new_destination_partition = ds.get_partition(
dest_partition.stream_locator, [], **local_deltacat_storage_kwargs
)

new_rcf_url = compact_partition(
CompactPartitionParams.of(
{
"compaction_artifact_s3_bucket": TEST_S3_RCF_BUCKET_NAME,
"compacted_file_content_type": ContentType.PARQUET,
"dd_max_parallelism_ratio": 1.0,
"deltacat_storage": ds,
"deltacat_storage_kwargs": local_deltacat_storage_kwargs,
"destination_partition_locator": new_destination_partition.locator,
"drop_duplicates": True,
"hash_bucket_count": 2,
"last_stream_position_to_compact": new_source_delta.stream_position,
"list_deltas_kwargs": {
**local_deltacat_storage_kwargs,
**{"equivalent_table_types": []},
},
"primary_keys": ["pk"],
"rebase_source_partition_locator": None,
"rebase_source_partition_high_watermark": None,
"records_per_compacted_file": 4000,
"s3_client_kwargs": {},
"source_partition_locator": new_source_delta.partition_locator,
}
)
)

new_rcf = get_rcf(s3_resource, new_rcf_url)
_, compaction_audit_key = new_rcf.compaction_audit_url.strip("s3://").split(
"/", 1
)
compaction_audit = CompactionSessionAuditInfo(
**read_s3_contents(
s3_resource, TEST_S3_RCF_BUCKET_NAME, compaction_audit_key
)
)

# as it should be running incremental
assert abs(new_rcf.input_inflation - 0.027292576419213975) <= 1e-5
assert abs(new_rcf.input_average_record_size_bytes - 12.5) <= 1e-5

assert compaction_audit.input_records == 6
assert compaction_audit.records_deduped == 1
assert compaction_audit.records_deleted == 0
assert compaction_audit.untouched_file_count == 1
assert compaction_audit.untouched_record_count == 2
assert (
abs(compaction_audit.untouched_size_bytes - 916) / 916 <= self.ERROR_RATE
) # 5% error
assert abs(compaction_audit.untouched_file_ratio - 50) <= 1e-5
assert compaction_audit.uniform_deltas_created == 1
assert compaction_audit.hash_bucket_count == 2
assert compaction_audit.input_file_count == 3
assert compaction_audit.output_file_count == 2
assert abs(compaction_audit.output_size_bytes - 1843) / 1843 <= self.ERROR_RATE
assert abs(compaction_audit.input_size_bytes - 2748) / 2748 <= self.ERROR_RATE

0 comments on commit 5a4507d

Please sign in to comment.