diff --git a/pghoard/basebackup/base.py b/pghoard/basebackup/base.py index 00b85a5c..260bd850 100644 --- a/pghoard/basebackup/base.py +++ b/pghoard/basebackup/base.py @@ -564,8 +564,6 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = db_conn.commit() self.log.info("Starting to backup %r and %r tablespaces to %r", pgdata, len(tablespaces), compressed_base) - progress_instance = PersistedProgress() - progress_instance.reset_all(metrics=self.metrics) start_time = time.monotonic() if delta: @@ -686,6 +684,8 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = "%r byte input, %r byte output, took %r seconds, waiting to upload", total_file_count, chunks_count, total_size_plain, total_size_enc, backup_time ) + progress_instance = PersistedProgress() + progress_instance.reset_all(metrics=self.metrics) finally: db_conn.rollback() diff --git a/pghoard/basebackup/delta.py b/pghoard/basebackup/delta.py index 5064d140..016c9b90 100644 --- a/pghoard/basebackup/delta.py +++ b/pghoard/basebackup/delta.py @@ -94,6 +94,10 @@ def progress_callback(progress_step: ProgressStep, progress_data: ProgressMetric persisted_progress.write(self.metrics) self.last_flush_time = time.monotonic() self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0, tags=tags) + self.log.info( + "Updated snapshot progress for %s to %d files; elapsed time since last check: %.2f seconds.", + progress_step.value, progress_data["handled"], elapsed + ) else: stalled_age = progress_info.age self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", stalled_age, tags=tags) diff --git a/pghoard/transfer.py b/pghoard/transfer.py index bf2f2b27..68b39a33 100644 --- a/pghoard/transfer.py +++ b/pghoard/transfer.py @@ -146,7 +146,6 @@ def untrack_upload_event(self, file_key: str) -> None: self._tracked_events.pop(file_key) def increment(self, file_key: str, total_bytes_uploaded: float) -> None: - metric_data = {} persisted_progress = PersistedProgress.read(metrics=self.metrics) with self._tracked_events_lock: @@ -160,9 +159,11 @@ def increment(self, file_key: str, total_bytes_uploaded: float) -> None: FileType.Basebackup_delta, FileType.Basebackup_delta_chunk, ): - progress_info = persisted_progress.get(file_key) - if total_bytes_uploaded > progress_info.current_progress: - progress_info.update(total_bytes_uploaded) + progress_info = persisted_progress.get("total_bytes_uploaded") + updated_total_bytes_uploaded = progress_info.current_progress + total_bytes_uploaded + + if updated_total_bytes_uploaded > progress_info.current_progress: + progress_info.update(updated_total_bytes_uploaded) persisted_progress.write(metrics=self.metrics) self.metrics.gauge("pghoard.seconds_since_backup_progress_stalled", 0) else: @@ -174,19 +175,7 @@ def increment(self, file_key: str, total_bytes_uploaded: float) -> None: file_key, stalled_age, ) - metric_data = { - "metric": "pghoard.basebackup_bytes_uploaded", - "inc_value": total_bytes_uploaded, - "tags": { - "delta": file_type in (FileType.Basebackup_delta, FileType.Basebackup_delta_chunk) - }, - } - elif file_type in (FileType.Wal, FileType.Timeline): - metric_data = {"metric": "pghoard.compressed_file_upload", "inc_value": total_bytes_uploaded} - self._tracked_events[file_key].increments.append(TransferIncrement(total_bytes_uploaded=total_bytes_uploaded)) - if metric_data: - self.metrics.increase(**metric_data) def reset(self) -> None: with self._tracked_events_lock: @@ -204,7 +193,7 @@ def run_safe(self): time.sleep(self.CHECK_FREQUENCY) except Exception: # pylint: disable=broad-except self.log.exception("Failed to update transfer rate %s", "pghoard.compressed_file_upload") - self.metrics.increase("pghoard.transfer_operation.errors") + self.metrics.increase("pghoard.transfer_operation_errors") self.reset() self.stop() @@ -423,18 +412,11 @@ def run_safe(self): if file_to_transfer.callback_queue: file_to_transfer.callback_queue.put(result) - self.log.info( - "%r %stransfer of key: %r, size: %r, took %.3fs", oper, "FAILED " if not result.success else "", key, - oper_size, - time.monotonic() - start_time - ) - - if file_to_transfer.operation in {TransferOperation.Upload} and filetype in ( - FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk - ): - if result.success: - persisted_progress = PersistedProgress.read(metrics=self.metrics) - persisted_progress.reset(key, metrics=self.metrics) + operation_type = file_to_transfer.operation + status = "FAILED" if not result.success else "successfully" + log_msg = f"{operation_type.capitalize()} of key: {key}, " \ + f"size: {oper_size}, {status} in {time.monotonic() - start_time:.3f}s" + self.log.info(log_msg) self.fetch_manager.stop() self.log.debug("Quitting TransferAgent") @@ -539,10 +521,5 @@ def handle_upload(self, site, key, file_to_transfer: UploadEvent): # Sleep for a bit to avoid busy looping. Increase sleep time if the op fails multiple times self.sleep(min(0.5 * 2 ** (file_to_transfer.retry_number - 1), 20)) - if file_to_transfer.file_type in ( - FileType.Basebackup, FileType.Basebackup_chunk, FileType.Basebackup_delta, FileType.Basebackup_delta_chunk - ) and file_to_transfer.retry_number < 2: - persisted_progress = PersistedProgress.read(metrics=self.metrics) - persisted_progress.reset(key, metrics=self.metrics) self.transfer_queue.put(file_to_transfer) return None diff --git a/test/test_common.py b/test/test_common.py index 7aaa42e5..e57d9ec9 100644 --- a/test/test_common.py +++ b/test/test_common.py @@ -92,36 +92,25 @@ def test_json_serialization(self, tmpdir): def test_persisted_progress(self, mocker, tmp_path): test_progress_file = tmp_path / "test_progress.json" original_time = 1625072042.123456 - test_data = { - "progress": { - "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b": { - "current_progress": 100, - "last_updated_time": original_time - } - } - } + test_data = {"progress": {"total_bytes_uploaded": {"current_progress": 100, "last_updated_time": original_time}}} with open(test_progress_file, "w") as file: json.dump(test_data, file) mocker.patch("pghoard.common.PROGRESS_FILE", test_progress_file) persisted_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) - assert "0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" in persisted_progress.progress - assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" - ].current_progress == 100 - assert persisted_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" - ].last_updated_time == 1625072042.123456 + assert "total_bytes_uploaded" in persisted_progress.progress + assert persisted_progress.progress["total_bytes_uploaded"].current_progress == 100 + assert persisted_progress.progress["total_bytes_uploaded"].last_updated_time == 1625072042.123456 new_progress = 200 - progress_info = persisted_progress.get("0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b") + progress_info = persisted_progress.get("total_bytes_uploaded") progress_info.update(new_progress) persisted_progress.write(metrics=metrics.Metrics(statsd={})) updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) - assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" - ].current_progress == new_progress - assert updated_progress.progress["0af668268d0fe14c6e269760b08d80a634c421b8381df25f31fbed5e8a8c8d8b" - ].last_updated_time > original_time + assert updated_progress.progress["total_bytes_uploaded"].current_progress == new_progress + assert updated_progress.progress["total_bytes_uploaded"].last_updated_time > original_time def test_default_persisted_progress_creation(self, mocker, tmp_path): tmp_file = tmp_path / "non_existent_progress.json" diff --git a/test/test_transferagent.py b/test/test_transferagent.py index 31475a96..f88bc482 100644 --- a/test/test_transferagent.py +++ b/test/test_transferagent.py @@ -337,6 +337,6 @@ def test_handle_upload_with_persisted_progress(self, mocker, tmp_path): self.transfer_agent.handle_upload("test_site", self.foo_basebackup_path, upload_event) updated_progress = PersistedProgress.read(metrics=metrics.Metrics(statsd={})) assert temp_progress_file.exists() - assert updated_progress.progress[self.foo_basebackup_path].current_progress == 3 + assert updated_progress.progress["total_bytes_uploaded"].current_progress == 3 if temp_progress_file.exists(): temp_progress_file.unlink()