Skip to content

Commit

Permalink
feat: allow drilling into load ids
Browse files Browse the repository at this point in the history
  • Loading branch information
z3z1ma committed Jun 27, 2024
1 parent 0e52db4 commit 8c05f90
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 10 deletions.
88 changes: 82 additions & 6 deletions src/cdf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
json_decode_state,
json_encode_state,
)
from tomlkit import ws

import cdf.core.constants as c
import cdf.core.context as context
Expand Down Expand Up @@ -1078,6 +1079,12 @@ def inspect_events(
@inspect.command("extracted")
def inspect_extracted(
ctx: typer.Context,
load_ids: t.Annotated[
str,
typer.Argument(
help="A comma-separated list of load ids to list. Use '*' to list all."
),
] = "*",
limit: t.Annotated[
int,
typer.Option(
Expand All @@ -1089,13 +1096,38 @@ def inspect_extracted(
W = t.cast(WorkspaceMonad, ctx.obj).unwrap()
import pandas as pd

with pd.option_context("display.max_rows", limit):
console.print(W.state.fetch_extracted(limit=limit))
if load_ids == "*":
requested_ids = []
else:
requested_ids = load_ids.split(",")
data = W.state.fetch_extracted(*requested_ids, limit=limit)
if data.empty:
console.print("\n[red]No data found.[/red]")
return
if requested_ids:
for load_id in requested_ids:
r = data.loc[data["load_id"] == load_id, "data"]
if r.empty:
console.print(
f"\n[red]No data found for requested load id {load_id}.[/red]"
)
continue
console.print(f"\n[b]Data for load id {load_id}[/b]:")
console.print_json(r.iloc[0])
else:
with pd.option_context("display.max_rows", limit):
console.print(data)


@inspect.command("normalized")
def inspect_normalized(
ctx: typer.Context,
load_ids: t.Annotated[
str,
typer.Argument(
help="A comma-separated list of load ids to list. Use '*' to list all."
),
] = "*",
limit: t.Annotated[
int,
typer.Option(
Expand All @@ -1107,13 +1139,38 @@ def inspect_normalized(
W = t.cast(WorkspaceMonad, ctx.obj).unwrap()
import pandas as pd

with pd.option_context("display.max_rows", limit):
console.print(W.state.fetch_normalized(limit=limit))
if load_ids == "*":
requested_ids = []
else:
requested_ids = load_ids.split(",")
data = W.state.fetch_normalized(*requested_ids, limit=limit)
if data.empty:
console.print("\n[red]No data found.[/red]")
return
if requested_ids:
for load_id in requested_ids:
r = data.loc[data["load_id"] == load_id, "data"]
if r.empty:
console.print(
f"\n[red]No data found for requested load id {load_id}.[/red]"
)
continue
console.print(f"\n[b]Data for load id {load_id}[/b]:")
console.print_json(r.iloc[0])
else:
with pd.option_context("display.max_rows", limit):
console.print(data)


@inspect.command("loaded")
def inspect_loaded(
ctx: typer.Context,
load_ids: t.Annotated[
str,
typer.Argument(
help="A comma-separated list of load ids to list. Use '*' to list all."
),
] = "*",
limit: t.Annotated[
int,
typer.Option(
Expand All @@ -1125,8 +1182,27 @@ def inspect_loaded(
W = t.cast(WorkspaceMonad, ctx.obj).unwrap()
import pandas as pd

with pd.option_context("display.max_rows", limit):
console.print(W.state.fetch_loaded(limit=limit))
if load_ids == "*":
requested_ids = []
else:
requested_ids = load_ids.split(",")
data = W.state.fetch_loaded(*requested_ids, limit=limit)
if data.empty:
console.print("\n[red]No data found.[/red]")
return
if requested_ids:
for load_id in requested_ids:
r = data.loc[data["load_id"] == load_id, "data"]
if r.empty:
console.print(
f"\n[red]No data found for requested load id {load_id}.[/red]"
)
continue
console.print(f"\n[b]Data for load id {load_id}[/b]:")
console.print_json(r.iloc[0])
else:
with pd.option_context("display.max_rows", limit):
console.print(data)


if __name__ == "__main__":
Expand Down
18 changes: 14 additions & 4 deletions src/cdf/core/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def _info_to_payload(
"destination_name": info.pipeline.destination.destination_name,
"destination_type": info.pipeline.destination.destination_type,
"data": json.dumps(pkg.asdict(), default=str),
"success": pkg.state == "loaded",
"success": pkg.state != "aborted",
"elapsed": sum(
[j.elapsed for k in pkg.jobs.keys() for j in pkg.jobs[k]]
),
Expand All @@ -306,19 +306,25 @@ def _info_to_payload(
)
return payload

def fetch_extracted(self, limit: int = 100, failed_only: bool = False):
def fetch_extracted(
self, *load_ids: str, limit: int = 100, failed_only: bool = False
):
"""List all extracted data"""
assert limit > 0 and limit < 1000, "Limit must be between 1 and 1000"
q = exp.select("*").from_(self.extract_table).order_by("timestamp").limit(limit)
if failed_only:
q = q.where("success = false")
if load_ids:
q = q.where(f"load_id IN {tuple(load_ids)}")
df = self.adapter.fetchdf(q)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s", utc=True)
localtz = timezone(timedelta(seconds=-time.timezone))
df["timestamp"] = df["timestamp"].dt.tz_convert(localtz)
return df

def fetch_normalized(self, limit: int = 100, failed_only: bool = False):
def fetch_normalized(
self, *load_ids: str, limit: int = 100, failed_only: bool = False
):
"""List all normalized data"""
assert limit > 0 and limit < 1000, "Limit must be between 1 and 1000"
q = (
Expand All @@ -329,18 +335,22 @@ def fetch_normalized(self, limit: int = 100, failed_only: bool = False):
)
if failed_only:
q = q.where("success = false")
if load_ids:
q = q.where(f"load_id IN {tuple(load_ids)}")
df = self.adapter.fetchdf(q)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s", utc=True)
localtz = timezone(timedelta(seconds=-time.timezone))
df["timestamp"] = df["timestamp"].dt.tz_convert(localtz)
return df

def fetch_loaded(self, limit: int = 100, failed_only: bool = False):
def fetch_loaded(self, *load_ids: str, limit: int = 100, failed_only: bool = False):
"""List all loaded data"""
assert limit > 0 and limit < 1000, "Limit must be between 1 and 1000"
q = exp.select("*").from_(self.load_table).order_by("timestamp").limit(limit)
if failed_only:
q = q.where("success = false")
if load_ids:
q = q.where(f"load_id IN {tuple(load_ids)}")
df = self.adapter.fetchdf(q)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s", utc=True)
localtz = timezone(timedelta(seconds=-time.timezone))
Expand Down

0 comments on commit 8c05f90

Please sign in to comment.