Skip to content

Commit

Permalink
feat: improve output of commands with no args for better UX interacti…
Browse files Browse the repository at this point in the history
…vity
  • Loading branch information
z3z1ma committed Jun 26, 2024
1 parent 327ab3f commit 691a21d
Showing 1 changed file with 96 additions and 29 deletions.
125 changes: 96 additions & 29 deletions src/cdf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
PublisherSpecification,
ScriptSpecification,
SinkSpecification,
CoreSpecification,
)
from cdf.types import M
from cdf.proxy import run_mysql_proxy, run_plan_server
Expand Down Expand Up @@ -133,13 +134,29 @@ def path(ctx: typer.Context) -> None:
typer.echo(ctx.obj.unwrap().path)


def _describe(
*displayables: t.Tuple[str, t.Tuple[CoreSpecification, ...]], diag: str = ""
):
for color, components in displayables:
for component in components:
doc = "\n".join(
map(lambda ln: "> " + ln, component.description.splitlines())
)
console.print(
f"[b {color}]{type(component).__name__}[/b {color}]: {component.name}"
)
console.print(f"[dim]{doc}[/dim]\n")
if diag:
console.print(f"[yellow]{diag}[/yellow]\n")


@app.command(rich_help_panel="Core")
def pipeline(
ctx: typer.Context,
pipeline_to_sink: t.Annotated[
str,
t.Optional[str],
typer.Argument(help="The pipeline and sink separated by a colon."),
],
] = None,
select: t.List[str] = typer.Option(
...,
"-s",
Expand Down Expand Up @@ -184,6 +201,12 @@ def pipeline(
no_stage: Allows selective disabling of intermediate staging even if configured in sink.
"""
W = t.cast(WorkspaceMonad, ctx.obj).unwrap()
if pipeline_to_sink is None:
return _describe(
("blue", W.pipelines),
("violet", W.sinks),
diag="To ingest data, use the `pipeline` command with the pipeline:sink combination.",
)
source, destination = pipeline_to_sink.split(":", 1)
return (
W.get_pipeline_spec(source)
Expand All @@ -205,9 +228,9 @@ def pipeline(
def discover(
ctx: typer.Context,
pipeline: t.Annotated[
str,
t.Optional[str],
typer.Argument(help="The pipeline in which to discover resources."),
],
] = None,
no_quiet: t.Annotated[
bool,
typer.Option(
Expand All @@ -223,8 +246,14 @@ def discover(
pipeline: The pipeline in which to discover resources.
no_quiet: Whether to suppress the pipeline stdout.
"""
W = t.cast(WorkspaceMonad, ctx.obj).unwrap()
if pipeline is None:
return _describe(
("blue", W.pipelines),
diag="To discover resources, use the `discover` command with the pipeline name.",
)
for i, source in enumerate(
t.cast(WorkspaceMonad, ctx.obj)
M.ok(W)
.bind(lambda w: w.get_pipeline_spec(pipeline))
.bind(
lambda spec: execute_pipeline_specification(
Expand All @@ -242,8 +271,12 @@ def discover(
@app.command(rich_help_panel="Develop")
def head(
ctx: typer.Context,
pipeline: t.Annotated[str, typer.Argument(help="The pipeline to inspect.")],
resource: t.Annotated[str, typer.Argument(help="The resource to inspect.")],
pipeline: t.Annotated[
t.Optional[str], typer.Argument(help="The pipeline to inspect.")
] = None,
resource: t.Annotated[
t.Optional[str], typer.Argument(help="The resource to inspect.")
] = None,
n: t.Annotated[int, typer.Option("-n", "--rows")] = 5,
) -> None:
""":wrench: Prints the first N rows of a [b green]Resource[/b green] within a [b blue]pipeline[/b blue]. Defaults to [cyan]5[/cyan].
Expand All @@ -260,25 +293,37 @@ def head(
Raises:
typer.BadParameter: If the resource is not found in the pipeline.
"""
target = next(
filter(
lambda r: r.name == resource,
(
resource
for source in t.cast(WorkspaceMonad, ctx.obj)
.bind(lambda w: w.get_pipeline_spec(pipeline))
.bind(
lambda spec: execute_pipeline_specification(
spec, "dummy", dry_run=True, quiet=True
)
W = t.cast(WorkspaceMonad, ctx.obj).unwrap()
if pipeline is None:
return _describe(
("blue", W.pipelines),
diag="To inspect a data pipeline, use the `head` command with the pipeline name.",
)
resource_iter = filter(
lambda r: r.name == resource or resource is None,
(
resource
for source in M.ok(W)
.bind(lambda w: w.get_pipeline_spec(pipeline))
.bind(
lambda spec: execute_pipeline_specification(
spec, "dummy", dry_run=True, quiet=True
)
.map(lambda rv: rv.pipeline.tracked_sources)
.unwrap()
for resource in source.resources.values()
),
)
.map(lambda rv: rv.pipeline.tracked_sources)
.unwrap()
for resource in source.resources.values()
),
None,
)
if resource is None:
console.print("[b green]Resources[/b green]:")
for r in resource_iter:
console.print(f"- {r.name}")
console.print(
f"\n[yellow]To inspect a resource, use `cdf head {pipeline} [cyan]<resource>[/cyan]`[/yellow].\n"
)
return
target = next(resource_iter, None)
if target is None:
raise typer.BadParameter(
f"Resource {resource} not found in pipeline {pipeline}.",
Expand All @@ -296,9 +341,9 @@ def head(
def publish(
ctx: typer.Context,
sink_to_publisher: t.Annotated[
str,
t.Optional[str],
typer.Argument(help="The sink and publisher separated by a colon."),
],
] = None,
skip_verification: t.Annotated[
bool,
typer.Option(
Expand All @@ -315,6 +360,12 @@ def publish(
skip_verification: Whether to skip the verification of the publisher dependencies.
"""
W = t.cast(WorkspaceMonad, ctx.obj).unwrap()
if sink_to_publisher is None:
return _describe(
("violet", W.sinks),
("yellow", W.publishers),
diag="To publish data, use the `publish` command with the sink:publisher combination.",
)
source, publisher = sink_to_publisher.split(":", 1)
return (
W.get_publisher_spec(publisher)
Expand All @@ -330,7 +381,9 @@ def publish(
@app.command(rich_help_panel="Core")
def script(
ctx: typer.Context,
script: t.Annotated[str, typer.Argument(help="The script to execute.")],
script: t.Annotated[
t.Optional[str], typer.Argument(help="The script to execute.")
] = None,
quiet: t.Annotated[bool, typer.Option(help="Suppress the script stdout.")] = False,
) -> t.Any:
""":hammer: Execute a [b yellow]Script[/b yellow] within the context of the current workspace.
Expand All @@ -341,8 +394,14 @@ def script(
script: The script to execute.
quiet: Whether to suppress the script stdout.
"""
W = t.cast(WorkspaceMonad, ctx.obj).unwrap()
if script is None:
return _describe(
("yellow", W.scripts),
diag="To execute a script, use the `script` command with the script name.",
)
return (
t.cast(WorkspaceMonad, ctx.obj)
M.ok(W)
.bind(lambda w: w.get_script_spec(script))
.bind(lambda s: execute_script_specification(s, capture_stdout=quiet))
.unwrap()
Expand All @@ -352,7 +411,9 @@ def script(
@app.command(rich_help_panel="Core")
def notebook(
ctx: typer.Context,
notebook: t.Annotated[str, typer.Argument(help="The notebook to execute.")],
notebook: t.Annotated[
t.Optional[str], typer.Argument(help="The notebook to execute.")
] = None,
params: t.Annotated[
str,
typer.Option(
Expand All @@ -369,8 +430,14 @@ def notebook(
notebook: The notebook to execute.
params: The parameters to pass to the notebook as a json formatted string.
"""
W = t.cast(WorkspaceMonad, ctx.obj).unwrap()
if notebook is None:
return _describe(
("yellow", W.notebooks),
diag="To execute a notebook, use the `notebook` command with the notebook name.",
)
return (
t.cast(WorkspaceMonad, ctx.obj)
M.ok(W)
.bind(lambda w: w.get_notebook_spec(notebook))
.bind(lambda s: execute_notebook_specification(s, **json.loads(params)))
.unwrap()
Expand Down

0 comments on commit 691a21d

Please sign in to comment.