Skip to content

Commit

Permalink
feat: add data publisher and generic operation
Browse files Browse the repository at this point in the history
  • Loading branch information
z3z1ma committed Jul 21, 2024
1 parent 05ae48d commit a0533b1
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 9 deletions.
9 changes: 6 additions & 3 deletions src/cdf/injector/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ def resolve_defaults(self, func_or_cls: t.Callable[P, T]) -> t.Callable[..., T]:

@functools.wraps(func_or_cls)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
bound = sig.bind_partial(*args, **kwargs)
bound_args = sig.bind_partial(*args, **kwargs)
for name, param in sig.parameters.items():
value = _MISSING
if not self.is_resolvable(param):
Expand All @@ -478,9 +478,12 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:

# Inject the value into the function
if value is not _MISSING:
bound.arguments[name] = self.apply_converters(value, **self.config)
bound_args.arguments[name] = self.apply_converters(
value, **self.config
)

return func_or_cls(*bound.args, **bound.kwargs)
bound_args.apply_defaults()
return func_or_cls(*bound_args.args, **bound_args.kwargs)

return wrapper

Expand Down
1 change: 1 addition & 0 deletions src/cdf/injector/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
# If a dependency is found, inject it
if obj is not None:
bound_args.arguments[name] = obj
bound_args.apply_defaults()
return func_or_cls(*bound_args.args, **bound_args.kwargs)

return wrapper
Expand Down
25 changes: 21 additions & 4 deletions src/cdf/nextgen/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,29 @@ def __class_getitem__(cls, _):


Service = Component[t.Any]
"""A service that the workspace provides."""
"""A service that the workspace provides. IE an API, database, requests client, etc."""

Source = Component["DltSource"]
"""A dlt source that the workspace provides."""
"""A dlt source which we can extract data from."""

Destination = Component["DltDestination"]
"""A dlt destination that the workspace provides."""
"""A dlt destination which we can load data into."""

DataPipeline = Component[t.Optional["LoadInfo"]]
"""A data pipeline that the workspace provides."""
"""A data pipeline which loads data from a source to a destination."""

DataPublisher = Component[t.Any] # TODO: track intervals
"""A data publisher which pushes data to an operational system."""

Operation = Component[int]
"""A generic callable that returns an exit code."""

ServiceDef = t.Union[Service, _ComponentProperties[t.Any]]
SourceDef = t.Union[Source, _ComponentProperties["DltSource"]]
DestinationDef = t.Union[Destination, _ComponentProperties["DltDestination"]]
DataPipelineDef = t.Union[DataPipeline, _ComponentProperties[t.Optional["LoadInfo"]]]
DataPublisherDef = t.Union[DataPublisher, _ComponentProperties[t.Any]]
OperationDef = t.Union[Operation, _ComponentProperties[int]]


TComponent = t.TypeVar("TComponent", bound=Component)
Expand All @@ -103,4 +111,13 @@ def __class_getitem__(cls, _):
"ServiceDef",
"SourceDef",
"DestinationDef",
"DataPipeline",
"DataPipelineDef",
"DataPublisher",
"DataPublisherDef",
"Operation",
"OperationDef",
"ServiceLevelAgreement",
"Component",
"TComponent",
]
37 changes: 35 additions & 2 deletions src/cdf/nextgen/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,26 @@ class Workspace:
"""An iterable of destination definitions that the workspace provides."""
data_pipelines: t.Iterable[model.DataPipelineDef] = field(default_factory=tuple)
"""An iterable of data pipelines that the workspace provides."""
data_publishers: t.Iterable[model.DataPublisherDef] = field(default_factory=tuple)
"""An iterable of data publishers that the workspace provides."""
operation_definitions: t.Iterable[model.OperationDef] = field(default_factory=tuple)
"""An iterable of generic operations that the workspace provides."""

def __post_init__(self) -> None:
"""Initialize the workspace."""
for source in self.configuration_sources:
self.conf_resolver.import_(source)
self.conf_resolver.set_environment(self.environment)
self.container.add_definition(
"cdf_workspace",
injector.Dependency.instance(self),
override=True,
)
self.container.add_definition(
"cdf_environment",
injector.Dependency.instance(self.environment),
override=True,
)
self.container.add_definition(
"cdf_config",
injector.Dependency.instance(self.conf_resolver),
Expand Down Expand Up @@ -107,6 +121,16 @@ def pipelines(self) -> t.Dict[str, model.DataPipeline]:
"""Return the data pipelines of the workspace."""
return self._parse_definitions(self.data_pipelines, model.DataPipeline)

@cached_property
def publishers(self) -> t.Dict[str, model.DataPublisher]:
"""Return the data publishers of the workspace."""
return self._parse_definitions(self.data_publishers, model.DataPublisher)

@cached_property
def operations(self) -> t.Dict[str, model.Operation]:
"""Return the operations of the workspace."""
return self._parse_definitions(self.operation_definitions, model.Operation)

# TODO: this is a stub
def run_pipeline(self, pipeline: str) -> None:
"""Run a data pipeline by name."""
Expand Down Expand Up @@ -147,22 +171,31 @@ def invoke(self, func_or_cls: t.Callable[P, T], *args: t.Any, **kwargs: t.Any) -

if __name__ == "__main__":
import dlt
import duckdb

def some_pipeline(source_a, temp_duckdb):
def some_pipeline(source_a, temp_duckdb, cdf_environment):
pipeline = dlt.pipeline("some_pipeline", destination=memory_duckdb)
print("Running pipeline")
load_info = pipeline.run(source_a)
print("Pipeline finished")
with pipeline.sql_client() as client:
print("Querying DuckDB in " + cdf_environment)
print(
client.execute_sql("SELECT * FROM some_pipeline_dataset.test_resource")
)
return load_info

@dlt.source
def test_source(a: int, prod_bigquery: str):

@dlt.resource
def test_resource():
print("Reading from API")
yield from [{"a": a, "prod_bigquery": prod_bigquery}]

return [test_resource]

memory_duckdb = dlt.destinations.duckdb(":memory:")
memory_duckdb = dlt.destinations.duckdb(duckdb.connect(":memory:"))

# Define a workspace
datateam = Workspace(
Expand Down

0 comments on commit a0533b1

Please sign in to comment.