Skip to content

Commit

Permalink
chore: suppress and warn during state interaction
Browse files Browse the repository at this point in the history
  • Loading branch information
z3z1ma committed Jul 1, 2024
1 parent 4fcc4e6 commit 4995889
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions src/cdf/core/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from sqlmesh.core.engine_adapter import EngineAdapter

import cdf.core.logger as logger
from cdf.core.context import active_project, execution_id
from cdf.types import M, P

Expand Down Expand Up @@ -130,7 +131,7 @@ def _execute(self, sql: str) -> None:

def store_json(self, key: str, value: JSON) -> None:
"""Store a JSON value"""
with self.adapter.transaction(value is not None):
with self.adapter.transaction(value is not None), logger.suppress_and_warn():
self.adapter.delete_from(self.kv_table, f"key = '{key}'")
if value is not None:
self.adapter.insert_append(
Expand Down Expand Up @@ -208,7 +209,7 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
audit_event["success"] = not isinstance(rv, M.Err)
audit_event["properties"].update(output_props(rv))
audit_event["properties"] = json.dumps(audit_event["properties"])
with self.adapter.transaction():
with self.adapter.transaction(), logger.suppress_and_warn():
self.adapter.insert_append(
self.audit_table,
pd.DataFrame([audit_event]),
Expand All @@ -231,7 +232,7 @@ def audit(
"properties": json.dumps(properties),
"execution_id": execution_id.get(),
}
with self.adapter.transaction():
with self.adapter.transaction(), logger.suppress_and_warn():
self.adapter.insert_append(
self.audit_table,
pd.DataFrame([payload]),
Expand Down Expand Up @@ -267,23 +268,23 @@ def capture_extract_info(self, info: ExtractInfo) -> None:
d = self._info_to_payload(info)
if not d:
return
with self.adapter.transaction():
with self.adapter.transaction(), logger.suppress_and_warn():
self.adapter.insert_append(self.extract_table, pd.DataFrame(d))

def capture_normalize_info(self, info: NormalizeInfo) -> None:
"""Capture normalize information"""
d = self._info_to_payload(info)
if not d:
return
with self.adapter.transaction():
with self.adapter.transaction(), logger.suppress_and_warn():
self.adapter.insert_append(self.normalize_table, pd.DataFrame(d))

def capture_load_info(self, info: LoadInfo) -> None:
"""Capture load information"""
d = self._info_to_payload(info)
if not d:
return
with self.adapter.transaction():
with self.adapter.transaction(), logger.suppress_and_warn():
self.adapter.insert_append(self.load_table, pd.DataFrame(d))

@staticmethod
Expand Down

0 comments on commit 4995889

Please sign in to comment.