Skip to content

Commit

Permalink
Merge pull request #622 from Aiven-Open/sebinsunny-refactor-pg-baseba…
Browse files Browse the repository at this point in the history
…ckup-metric

refactor progress based basebackup metrics
  • Loading branch information
facetoe authored Jun 7, 2024
2 parents a1da446 + 649d80e commit d28945a
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 55 deletions.
4 changes: 2 additions & 2 deletions pghoard/basebackup/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,8 +564,6 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
db_conn.commit()

self.log.info("Starting to backup %r and %r tablespaces to %r", pgdata, len(tablespaces), compressed_base)
progress_instance = PersistedProgress()
progress_instance.reset_all(metrics=self.metrics)
start_time = time.monotonic()

if delta:
Expand Down Expand Up @@ -686,6 +684,8 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool =
"%r byte input, %r byte output, took %r seconds, waiting to upload", total_file_count, chunks_count,
total_size_plain, total_size_enc, backup_time
)
progress_instance = PersistedProgress()
progress_instance.reset_all(metrics=self.metrics)

finally:
db_conn.rollback()
Expand Down
4 changes: 4 additions & 0 deletions pghoard/basebackup/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ def progress_callback(progress_step: ProgressStep, progress_data: ProgressMetric
persisted_progress.write(self.metrics)
self.last_flush_time = time.monotonic()
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0, tags=tags)
self.log.info(
"Updated snapshot progress for %s to %d files; elapsed time since last check: %.2f seconds.",
progress_step.value, progress_data["handled"], elapsed
)
else:
stalled_age = progress_info.age
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age, tags=tags)
Expand Down
45 changes: 11 additions & 34 deletions pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ def untrack_upload_event(self, file_key: str) -> None:
self._tracked_events.pop(file_key)

def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
metric_data = {}
persisted_progress = PersistedProgress.read(metrics=self.metrics)

with self._tracked_events_lock:
Expand All @@ -160,9 +159,11 @@ def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
FileType.Basebackup_delta,
FileType.Basebackup_delta_chunk,
):
progress_info = persisted_progress.get(file_key)
if total_bytes_uploaded > progress_info.current_progress:
progress_info.update(total_bytes_uploaded)
progress_info = persisted_progress.get("total_bytes_uploaded")
updated_total_bytes_uploaded = progress_info.current_progress + total_bytes_uploaded

if updated_total_bytes_uploaded > progress_info.current_progress:
progress_info.update(updated_total_bytes_uploaded)
persisted_progress.write(metrics=self.metrics)
self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0)
else:
Expand All @@ -174,19 +175,7 @@ def increment(self, file_key: str, total_bytes_uploaded: float) -> None:
file_key,
stalled_age,
)
metric_data = {
"metric": "pghoard.basebackup_bytes_uploaded",
"inc_value": total_bytes_uploaded,
"tags": {
"delta": file_type in (FileType.Basebackup_delta, FileType.Basebackup_delta_chunk)
},
}
elif file_type in (FileType.Wal, FileType.Timeline):
metric_data = {"metric": "pghoard.compressed_file_upload", "inc_value": total_bytes_uploaded}

self._tracked_events[file_key].increments.append(TransferIncrement(total_bytes_uploaded=total_bytes_uploaded))
if metric_data:
self.metrics.increase(**metric_data)

def reset(self) -> None:
with self._tracked_events_lock:
Expand All @@ -204,7 +193,7 @@ def run_safe(self):
time.sleep(self.CHECK_FREQUENCY)
except Exception: # pylint: disable=broad-except
self.log.exception("Failed to update transfer rate %s", "pghoard.compressed_file_upload")
self.metrics.increase("pghoard.transfer_operation.errors")
self.metrics.increase("pghoard.transfer_operation_errors")
self.reset()
self.stop()

Expand Down Expand Up @@ -423,18 +412,11 @@ def run_safe(self):
if file_to_transfer.callback_queue:
file_to_transfer.callback_queue.put(result)

self.log.info(
"%r %stransfer of key: %r, size: %r, took %.3fs", oper, "FAILED " if not result.success else "", key,
oper_size,
time.monotonic() - start_time
)

if file_to_transfer.operation in {TransferOperation.Upload} and filetype in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
):
if result.success:
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)
operation_type = file_to_transfer.operation
status = "FAILED" if not result.success else "successfully"
log_msg = f"{operation_type.capitalize()} of key: {key}, " \
f"size: {oper_size}, {status} in {time.monotonic() - start_time:.3f}s"
self.log.info(log_msg)

self.fetch_manager.stop()
self.log.debug("Quitting TransferAgent")
Expand Down Expand Up @@ -539,10 +521,5 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent):

# Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times
self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20))
if file_to_transfer.file_type in (
FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk
) and file_to_transfer.retry_number < 2:
persisted_progress = PersistedProgress.read(metrics=self.metrics)
persisted_progress.reset(key, metrics=self.metrics)
self.transfer_queue.put(file_to_transfer)
return None
25 changes: 7 additions & 18 deletions test/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,36 +92,25 @@ def test_json_serialization(self, tmpdir):
def test_persisted_progress(self, mocker, tmp_path):
test_progress_file = tmp_path / "test_progress.json"
original_time = 1625072042.123456
test_data = {
"progress": {
"0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": {
"current_progress": 100,
"last_updated_time": original_time
}
}
}
test_data = {"progress": {"total_bytes_uploaded": {"current_progress": 100, "last_updated_time": original_time}}}

with open(test_progress_file, "w") as file:
json.dump(test_data, file)

mocker.patch("pghoard.common.PROGRESS_FILE", test_progress_file)
persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == 100
assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time == 1625072042.123456
assert "total_bytes_uploaded" in persisted_progress.progress
assert persisted_progress.progress["total_bytes_uploaded"].current_progress == 100
assert persisted_progress.progress["total_bytes_uploaded"].last_updated_time == 1625072042.123456

new_progress = 200
progress_info = persisted_progress.get("0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b")
progress_info = persisted_progress.get("total_bytes_uploaded")
progress_info.update(new_progress)
persisted_progress.write(metrics=metrics.Metrics(statsd={}))

updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].current_progress == new_progress
assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b"
].last_updated_time > original_time
assert updated_progress.progress["total_bytes_uploaded"].current_progress == new_progress
assert updated_progress.progress["total_bytes_uploaded"].last_updated_time > original_time

def test_default_persisted_progress_creation(self, mocker, tmp_path):
tmp_file = tmp_path / "non_existent_progress.json"
Expand Down
2 changes: 1 addition & 1 deletion test/test_transferagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,6 @@ def test_handle_upload_with_persisted_progress(self, mocker, tmp_path):
self.transfer_agent.handle_upload("test_site", self.foo_basebackup_path, upload_event)
updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={}))
assert temp_progress_file.exists()
assert updated_progress.progress[self.foo_basebackup_path].current_progress == 3
assert updated_progress.progress["total_bytes_uploaded"].current_progress == 3
if temp_progress_file.exists():
temp_progress_file.unlink()

0 comments on commit d28945a

Please sign in to comment.