diff --git a/Makefile b/Makefile index 9a68fd52c..3528c703e 100644 --- a/Makefile +++ b/Makefile @@ -179,7 +179,7 @@ uninstall-server-package: @$(PYTHON_PIP) uninstall elyra -y install-server-package: uninstall-server-package - $(PYTHON_PIP) install --upgrade --upgrade-strategy $(UPGRADE_STRATEGY) "$(shell find dist -name "elyra-*-py3-none-any.whl")[kfp-tekton]" + $(PYTHON_PIP) install --upgrade --upgrade-strategy $(UPGRADE_STRATEGY) "$(shell find dist -name "elyra-*-py3-none-any.whl")" install-server: build-dependencies lint-server build-server install-server-package ## Build and install backend diff --git a/elyra/metadata/schemasproviders.py b/elyra/metadata/schemasproviders.py index 03fdbb7e9..ab135d263 100644 --- a/elyra/metadata/schemasproviders.py +++ b/elyra/metadata/schemasproviders.py @@ -23,12 +23,6 @@ import entrypoints from traitlets import log # noqa H306 -try: - from kfp_tekton import TektonClient -except ImportError: - # We may not have kfp-tekton available and that's okay! - TektonClient = None - from elyra.metadata.schema import SchemasProvider from elyra.metadata.schemaspaces import CodeSnippets from elyra.metadata.schemaspaces import ComponentCatalogs @@ -93,16 +87,12 @@ def get_schemas(self) -> List[Dict]: ) if kfp_schema_present: # Update the kfp engine enum to reflect current packages... - # If TektonClient package is missing, navigate to the engine property - # and remove 'tekton' entry if present and return updated result. - if not TektonClient: - # locate the schema and update the enum - for schema in runtime_schemas: - if schema["name"] == "kfp": - engine_enum: list = schema["properties"]["metadata"]["properties"]["engine"]["enum"] - if "Tekton" in engine_enum: - engine_enum.remove("Tekton") - schema["properties"]["metadata"]["properties"]["engine"]["enum"] = engine_enum + for schema in runtime_schemas: + if schema["name"] == "kfp": + engine_enum: list = schema["properties"]["metadata"]["properties"]["engine"]["enum"] + if "Tekton" in engine_enum: + engine_enum.remove("Tekton") + schema["properties"]["metadata"]["properties"]["engine"]["enum"] = engine_enum # For KFP schemas replace placeholders: # - properties.metadata.properties.auth_type.enum ({AUTH_PROVIDER_PLACEHOLDERS}) diff --git a/elyra/pipeline/kfp/PipelineConf.py b/elyra/pipeline/kfp/PipelineConf.py new file mode 100644 index 000000000..1e4f1aae6 --- /dev/null +++ b/elyra/pipeline/kfp/PipelineConf.py @@ -0,0 +1,157 @@ +from typing import Union +from kubernetes.client.models import V1PodDNSConfig + +class PipelineConf(): + """PipelineConf contains pipeline level settings.""" + + def __init__(self): + self.image_pull_secrets = [] + self.timeout = 0 + self.ttl_seconds_after_finished = -1 + self._pod_disruption_budget_min_available = None + self.op_transformers = [] + self.default_pod_node_selector = {} + self.image_pull_policy = None + self.parallelism = None + self._data_passing_method = None + self.dns_config = None + + def set_image_pull_secrets(self, image_pull_secrets): + """Configures the pipeline level imagepullsecret. + + Args: + image_pull_secrets: a list of Kubernetes V1LocalObjectReference For + detailed description, check Kubernetes V1LocalObjectReference definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1LocalObjectReference.md + """ + self.image_pull_secrets = image_pull_secrets + return self + + def set_timeout(self, seconds: int): + """Configures the pipeline level timeout. + + Args: + seconds: number of seconds for timeout + """ + self.timeout = seconds + return self + + def set_parallelism(self, max_num_pods: int): + """Configures the max number of total parallel pods that can execute at + the same time in a workflow. + + Args: + max_num_pods: max number of total parallel pods. + """ + if max_num_pods < 1: + raise ValueError( + 'Pipeline max_num_pods set to < 1, allowed values are > 0') + + self.parallelism = max_num_pods + return self + + def set_ttl_seconds_after_finished(self, seconds: int): + """Configures the ttl after the pipeline has finished. + + Args: + seconds: number of seconds for the workflow to be garbage collected after + it is finished. + """ + self.ttl_seconds_after_finished = seconds + return self + + def set_pod_disruption_budget(self, min_available: Union[int, str]): + """PodDisruptionBudget holds the number of concurrent disruptions that + you allow for pipeline Pods. + + Args: + min_available (Union[int, str]): An eviction is allowed if at least + "minAvailable" pods selected by "selector" will still be available after + the eviction, i.e. even in the absence of the evicted pod. So for + example you can prevent all voluntary evictions by specifying "100%". + "minAvailable" can be either an absolute number or a percentage. + """ + self._pod_disruption_budget_min_available = min_available + return self + + def set_default_pod_node_selector(self, label_name: str, value: str): + """Add a constraint for nodeSelector for a pipeline. + + Each constraint is a key-value pair label. + + For the container to be eligible to run on a node, the node must have each + of the constraints appeared as labels. + + Args: + label_name: The name of the constraint label. + value: The value of the constraint label. + """ + self.default_pod_node_selector[label_name] = value + return self + + def set_image_pull_policy(self, policy: str): + """Configures the default image pull policy. + + Args: + policy: the pull policy, has to be one of: Always, Never, IfNotPresent. + For more info: + https://github.com/kubernetes-client/python/blob/10a7f95435c0b94a6d949ba98375f8cc85a70e5a/kubernetes/docs/V1Container.md + """ + self.image_pull_policy = policy + return self + + def add_op_transformer(self, transformer): + """Configures the op_transformers which will be applied to all ops in + the pipeline. The ops can be ResourceOp, VolumeOp, or ContainerOp. + + Args: + transformer: A function that takes a kfp Op as input and returns a kfp Op + """ + self.op_transformers.append(transformer) + + def set_dns_config(self, dns_config: V1PodDNSConfig): + """Set the dnsConfig to be given to each pod. + + Args: + dns_config: Kubernetes V1PodDNSConfig For detailed description, check + Kubernetes V1PodDNSConfig definition + https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1PodDNSConfig.md + + Example: + :: + + import kfp + from kubernetes.client.models import V1PodDNSConfig, V1PodDNSConfigOption + pipeline_conf = kfp.dsl.PipelineConf() + pipeline_conf.set_dns_config(dns_config=V1PodDNSConfig( + nameservers=["1.2.3.4"], + options=[V1PodDNSConfigOption(name="ndots", value="2")], + )) + """ + self.dns_config = dns_config + + @property + def data_passing_method(self): + return self._data_passing_method + + @data_passing_method.setter + def data_passing_method(self, value): + """Sets the object representing the method used for intermediate data + passing. + + Example: + :: + + from kfp.dsl import PipelineConf, data_passing_methods + from kubernetes.client.models import V1Volume, V1PersistentVolumeClaimVolumeSource + pipeline_conf = PipelineConf() + pipeline_conf.data_passing_method = + data_passing_methods.KubernetesVolume( + volume=V1Volume( + name='data', + persistent_volume_claim=V1PersistentVolumeClaimVolumeSource('data-volume'), + ), + path_prefix='artifact_data/', + ) + """ + self._data_passing_method = value \ No newline at end of file diff --git a/elyra/pipeline/kfp/kfp_authentication.py b/elyra/pipeline/kfp/kfp_authentication.py index 3ca44519a..c369fc3eb 100644 --- a/elyra/pipeline/kfp/kfp_authentication.py +++ b/elyra/pipeline/kfp/kfp_authentication.py @@ -27,9 +27,9 @@ from typing import Tuple from urllib.parse import urlsplit -from kfp.auth import KF_PIPELINES_SA_TOKEN_ENV -from kfp.auth import KF_PIPELINES_SA_TOKEN_PATH -from kfp.auth import ServiceAccountTokenVolumeCredentials +from kfp.client import KF_PIPELINES_SA_TOKEN_ENV +from kfp.client import KF_PIPELINES_SA_TOKEN_PATH +from kfp.client import ServiceAccountTokenVolumeCredentials import requests diff --git a/elyra/pipeline/kfp/processor_kfp.py b/elyra/pipeline/kfp/processor_kfp.py index 4e7f9be57..d259b9186 100644 --- a/elyra/pipeline/kfp/processor_kfp.py +++ b/elyra/pipeline/kfp/processor_kfp.py @@ -39,19 +39,11 @@ from kfp import Client as ArgoClient from kfp import compiler as kfp_argo_compiler from kfp import components as components -from kfp.dsl import PipelineConf -from kfp.dsl import RUN_ID_PLACEHOLDER from kubernetes import client as k8s_client from traitlets import default from traitlets import Unicode -try: - from kfp_tekton import compiler as kfp_tekton_compiler - from kfp_tekton import TektonClient -except ImportError: - # We may not have kfp-tekton available and that's okay! - kfp_tekton_compiler = None - TektonClient = None +RUN_ID_PLACEHOLDER = "random-placeholder" from elyra._version import __version__ from elyra.metadata.schemaspaces import RuntimeImages @@ -81,6 +73,8 @@ from elyra.util.kubernetes import sanitize_label_value from elyra.util.path import get_absolute_path +from elyra.pipeline.kfp.PipelineConf import PipelineConf + @unique class WorkflowEngineType(Enum): @@ -113,8 +107,6 @@ def get_instance_by_value(value: str) -> "WorkflowEngineType": CRIO_VOL_MOUNT_PATH = "/opt/app-root/src" CRIO_VOL_WORKDIR_PATH = f"{CRIO_VOL_MOUNT_PATH}/jupyter-work-dir" CRIO_VOL_PYTHON_PATH = f"{CRIO_VOL_WORKDIR_PATH}/python3" - - class KfpPipelineProcessor(RuntimePipelineProcessor): _type = RuntimeProcessorType.KUBEFLOW_PIPELINES _name = "kfp" @@ -173,11 +165,6 @@ def process(self, pipeline): api_password = runtime_configuration.metadata.get("api_password") user_namespace = runtime_configuration.metadata.get("user_namespace") workflow_engine = WorkflowEngineType.get_instance_by_value(runtime_configuration.metadata.get("engine", "argo")) - if workflow_engine == WorkflowEngineType.TEKTON and not TektonClient: - raise ValueError( - "Python package `kfp-tekton` is not installed. " - "Please install using `elyra[kfp-tekton]` to use Tekton engine." - ) # unpack Cloud Object Storage configs cos_endpoint = runtime_configuration.metadata["cos_endpoint"] @@ -206,22 +193,13 @@ def process(self, pipeline): # Create Kubeflow Client ############# try: - if workflow_engine == WorkflowEngineType.TEKTON: - client = TektonClient( - host=api_endpoint, - cookies=auth_info.get("cookies", None), - credentials=auth_info.get("credentials", None), - existing_token=auth_info.get("existing_token", None), - namespace=user_namespace, - ) - else: - client = ArgoClient( - host=api_endpoint, - cookies=auth_info.get("cookies", None), - credentials=auth_info.get("credentials", None), - existing_token=auth_info.get("existing_token", None), - namespace=user_namespace, - ) + client = ArgoClient( + host=api_endpoint, + cookies=auth_info.get("cookies", None), + credentials=auth_info.get("credentials", None), + existing_token=auth_info.get("existing_token", None), + namespace=user_namespace, + ) except Exception as ex: # a common cause of these errors is forgetting to include `/pipeline` or including it with an 's' api_endpoint_obj = urlsplit(api_endpoint) @@ -275,7 +253,7 @@ def process(self, pipeline): with tempfile.TemporaryDirectory() as temp_dir: self.log.debug(f"Created temporary directory at: {temp_dir}") - pipeline_path = os.path.join(temp_dir, f"{pipeline_name}.tar.gz") + pipeline_path = os.path.join(temp_dir, f"{pipeline_name}.yaml") ############# # Get Pipeline ID @@ -351,11 +329,15 @@ def process(self, pipeline): ) # extract the ID of the pipeline we created - pipeline_id = kfp_pipeline.id + pipeline_id = kfp_pipeline.pipeline_id # the initial "pipeline version" has the same id as the pipeline itself - version_id = pipeline_id - + version_details = client.list_pipeline_versions(pipeline_id=pipeline_id) + version_list = version_details.pipeline_versions + if isinstance(version_list, list): + version_id = version_list[0].pipeline_version_id + else: + version_id = None # CASE 2: pipeline already exists else: # upload the "pipeline version" @@ -366,7 +348,7 @@ def process(self, pipeline): ) # extract the id of the "pipeline version" that was created - version_id = kfp_pipeline.id + version_id = kfp_pipeline.pipeline_version_id except Exception as ex: # a common cause of these errors is forgetting to include `/pipeline` or including it with an 's' @@ -416,7 +398,7 @@ def process(self, pipeline): # create pipeline run (or specified pipeline version) run = client.run_pipeline( - experiment_id=experiment.id, job_name=job_name, pipeline_id=pipeline_id, version_id=version_id + experiment_id=experiment.experiment_id, job_name=job_name, pipeline_id=pipeline_id, version_id=version_id ) except Exception as ex: @@ -435,7 +417,7 @@ def process(self, pipeline): self.log_pipeline_info( pipeline_name, - f"pipeline submitted: {public_api_endpoint}/#/runs/details/{run.id}", + f"pipeline submitted: {public_api_endpoint}/#/runs/details/{run.run_id}", duration=time.time() - t0, ) @@ -450,8 +432,8 @@ def process(self, pipeline): object_storage_path = None return KfpPipelineProcessorResponse( - run_id=run.id, - run_url=f"{public_api_endpoint}/#/runs/details/{run.id}", + run_id=run.run_id, + run_url=f"{public_api_endpoint}/#/runs/details/{run.run_id}", object_storage_url=object_storage_url, object_storage_path=object_storage_path, ) @@ -494,8 +476,6 @@ def export( ) workflow_engine = WorkflowEngineType.get_instance_by_value(runtime_configuration.metadata.get("engine", "argo")) - if workflow_engine == WorkflowEngineType.TEKTON and not TektonClient: - raise ValueError("kfp-tekton not installed. Please install using elyra[kfp-tekton] to use Tekton engine.") if Path(absolute_pipeline_export_path).exists() and not overwrite: raise ValueError("File " + absolute_pipeline_export_path + " already exists.") @@ -565,7 +545,7 @@ def _generate_pipeline_dsl( code_generation_options = {} # Load Kubeflow Pipelines Python DSL template - loader = PackageLoader("elyra", "templates/kubeflow/v1") + loader = PackageLoader("elyra", "templates/kubeflow/v2") template_env = Environment(loader=loader) # Add filter that produces a Python-safe variable name template_env.filters["python_safe"] = lambda x: re.sub(r"[" + re.escape(string.punctuation) + "\\s]", "_", x) @@ -668,12 +648,7 @@ def _compile_pipeline_dsl( # in the generated Python DSL "generated_pipeline" pipeline_function = getattr(mod, "generated_pipeline") # compile the DSL - if workflow_engine == WorkflowEngineType.TEKTON: - kfp_tekton_compiler.TektonCompiler().compile( - pipeline_function, output_file, pipeline_conf=pipeline_conf - ) - else: - kfp_argo_compiler.Compiler().compile(pipeline_function, output_file, pipeline_conf=pipeline_conf) + kfp_argo_compiler.Compiler().compile(pipeline_function, output_file) except Exception as ex: raise RuntimeError( f"Failed to compile pipeline with workflow_engine '{workflow_engine.value}' to '{output_file}'" @@ -729,7 +704,7 @@ def _generate_workflow_tasks( pipeline.pipeline_properties.get(pipeline_constants.COS_OBJECT_PREFIX), pipeline_instance_id ) # - load the generic component definition template - template_env = Environment(loader=PackageLoader("elyra", "templates/kubeflow/v1")) + template_env = Environment(loader=PackageLoader("elyra", "templates/kubeflow/v2")) generic_component_template = template_env.get_template("generic_component_definition_template.jinja2") # Add filter that escapes the " character in strings template_env.filters["string_delimiter_safe"] = lambda string: re.sub('"', '\\\\\\\\"', string) diff --git a/elyra/templates/kubeflow/v2/generic_component_definition_template.jinja2 b/elyra/templates/kubeflow/v2/generic_component_definition_template.jinja2 new file mode 100644 index 000000000..85c8eb086 --- /dev/null +++ b/elyra/templates/kubeflow/v2/generic_component_definition_template.jinja2 @@ -0,0 +1,24 @@ +name: Run a file +description: Run a Jupyter notebook or Python/R script +{% if task_parameters %} +inputs: +{%- for parameter in task_parameters %} +- {name: {{ parameter.name }}, type: {{ parameter.input_type.component_input_type }}{% if parameter.description %}, description: "{{ parameter.description | string_delimiter_safe}}"{% endif %}{% if parameter.default_value is not none %}, default: {% if parameter.selected_type == 'String' %} "{{ parameter.default_value|string_delimiter_safe }}"{% else %}{{ parameter.default_value }}{% endif %}{% endif %}, optional: {{ (not parameter.required)|tojson }}} +{%- endfor %} +{% endif %} +implementation: + container: + image: {{ container_image }} + command: [sh, -c] + args: + - | + {%- for parameter in task_parameters %} + {{ parameter.name }}="${{ loop.index0 }}" + {%- endfor %} + {%- for command in command_args %} + sh -c "{{command}}" + {%- endfor %} + + {%- for parameter in task_parameters %} + - {inputValue: {{ parameter.name }}} + {%- endfor %} diff --git a/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 b/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 new file mode 100644 index 000000000..f01fb2961 --- /dev/null +++ b/elyra/templates/kubeflow/v2/python_dsl_template.jinja2 @@ -0,0 +1,164 @@ +# +# Generated by Elyra {{ elyra_version }} +# +import kfp +from kubernetes.client import * +from kubernetes.client.models import * +from kfp.kubernetes import secret, volume + +from typing import Optional + +# ------------------------------------------------------------------ +# kfp-kubernetes 1.1.0 is misisng these function, explicity using them, +# TODO: remove these function once a new release of kfp-kubernetes is made. +# ------------------------------------------------------------------ + +from google.protobuf import json_format +from kfp.dsl import PipelineTask +from kfp.kubernetes import common +from kfp.kubernetes import kubernetes_executor_config_pb2 as pb + +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal + + +def add_pod_label( + task: PipelineTask, + label_key: str, + label_value: str, +) -> PipelineTask: + + msg = common.get_existing_kubernetes_config_as_message(task) + msg.pod_metadata.labels.update({label_key: label_value}) + task.platform_config['kubernetes'] = json_format.MessageToDict(msg) + + return task + + +def add_pod_annotation( + task: PipelineTask, + annotation_key: str, + annotation_value: str, +) -> PipelineTask: + + msg = common.get_existing_kubernetes_config_as_message(task) + msg.pod_metadata.annotations.update({annotation_key: annotation_value}) + task.platform_config['kubernetes'] = json_format.MessageToDict(msg) + + return task + +# ------------------------------------------------------------------ +# end of missing functions +# ------------------------------------------------------------------ + +{# Load statements for custom components -#} +{# component_hash = """""" -#} +{# factory_hash = kfp.components.load_component_from_text(component_hash) -#} +{% for hash, component_definition in component_definitions.items() %} +component_def_{{ hash | python_safe }} = """ +{{ component_definition }} +""" + +factory_{{ hash | python_safe }} = kfp.components.load_component_from_text(component_def_{{ hash | python_safe }}) +{% endfor %} + +{# Define pipeline -#} +{% if pipeline_description %} +@kfp.dsl.pipeline(name="{{ pipeline_name }}", description="{{ pipeline_description | string_delimiter_safe }}") +{% else %} +@kfp.dsl.pipeline(name="{{ pipeline_name }}") +{% endif %} +def generated_pipeline( +{% if pipeline_parameters %} +{% for parameter in pipeline_parameters %} + {{ parameter.name }}{% if parameter.input_type.type_hint %}: {{ parameter.input_type.type_hint }}{% endif %} = {{ parameter|param_val_to_python_var }}, +{% endfor %} +{% endif %} +): +{% for workflow_task in workflow_tasks.values() %} + {% set task_name = "task_" + workflow_task.escaped_task_id %} + # Task for node '{{ workflow_task.name }}' + {{ task_name }} = factory_{{ workflow_task.component_definition_hash | python_safe }}( +{% for task_input_name, task_input_spec in workflow_task.task_inputs.items() %} +{% if task_input_spec.task_output_reference %} + {{ task_input_name }}=task_{{ task_input_spec.task_output_reference.task_id }}.outputs["{{ task_input_spec.task_output_reference.output_id }}"], +{% elif task_input_spec.pipeline_parameter_reference %} + {{ task_input_name }}={{ task_input_spec.pipeline_parameter_reference }}, +{% elif task_input_spec.requires_quoted_rendering %} + {{ task_input_name }}="""{{ task_input_spec.value | string_delimiter_safe }}""", +{% else %} + {{ task_input_name }}={{ task_input_spec.value }}, +{% endif %} +{% endfor %} + ) +{% if workflow_task.task_modifiers.object_storage_secret %} + secret.use_secret_as_env({{ task_name }}, "{{ workflow_task.task_modifiers.object_storage_secret }}", { "AWS_ACCESS_KEY_ID": "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY": "AWS_SECRET_ACCESS_KEY" }) +{% endif %} + {{ task_name }}.set_display_name("{{ workflow_task.name | string_delimiter_safe }}") +{% if workflow_task.doc %} + add_pod_annotation({{ task_name }}, "elyra/node-user-doc","""{{ workflow_task.doc| string_delimiter_safe }}""") +{% endif %} +{% if workflow_task.task_modifiers.cpu_request %} + {{ task_name }}.set_cpu_request(cpu="{{ workflow_task.task_modifiers.cpu_request }}") +{% endif %} +{% if workflow_task.task_modifiers.mem_request and workflow_task.task_modifiers.mem_request.size %} + {{ task_name }}.set_memory_request(memory="{{ workflow_task.task_modifiers.mem_request.size }}{{ workflow_task.task_modifiers.mem_request.units }}") +{% endif %} +{% if workflow_task.task_modifiers.cpu_limit %} + {{ task_name }}.set_cpu_limit(cpu="{{ workflow_task.task_modifiers.cpu_limit }}") +{% endif %} +{% if workflow_task.task_modifiers.memory_limit and workflow_task.task_modifiers.memory_limit.size %} + {{ task_name }}.set_memory_limit(memory="{{ workflow_task.task_modifiers.memory_limit.size }}{{ workflow_task.task_modifiers.memory_limit.units }}") +{% endif %} +{% if workflow_task.task_modifiers.gpu_limit and workflow_task.task_modifiers.gpu_limit.size %} + {{ task_name }}.set_gpu_limit("{{ workflow_task.task_modifiers.gpu_limit.size }}") +{% endif %} +{% if workflow_task.task_modifiers.env_variables %} +{% for env_var_name, env_var_value in workflow_task.task_modifiers.env_variables.items() %} + {{ task_name }}.set_env_variable(name="{{ env_var_name }}", value="{{ env_var_value | string_delimiter_safe }}") +{% endfor %} +{% endif %} +{% if workflow_task.task_modifiers.set_run_name %} + {{ task_name }}.set_env_variable(name="ELYRA_RUN_NAME", value="{{ workflow_task.task_modifiers.set_run_name }}") +{% endif %} +{% if workflow_task.task_modifiers.disable_node_caching %} + {{ task_name }}.execution_options.caching_strategy.max_cache_staleness = "P0D" +{% endif %} +{% if workflow_task.task_modifiers.pod_labels %} +{% for pod_label_key, pod_label_value in workflow_task.task_modifiers.pod_labels.items() %} + add_pod_label({{ task_name }}, "{{ pod_label_key }}", "{{ pod_label_value }}") +{% endfor %} +{% endif %} +{% if workflow_task.task_modifiers.pod_annotations %} +{% for pod_annotation_key, pod_annotation_value in workflow_task.task_modifiers.pod_annotations.items() %} + add_pod_annotation({{ task_name }}, "{{ pod_annotation_key }}" , """{{ pod_annotation_value | string_delimiter_safe }}""") +{% endfor %} +{% endif %} +{% if workflow_task.task_modifiers.kubernetes_secrets %} +{% for env_var, secret_dict in workflow_task.task_modifiers.kubernetes_secrets.items() %} + secret.use_secret_as_env({{ task_name }}, "{{ secret_dict.name }}", { "{{ secret_dict.key }}" : "{{ env_var }}" }) +{% endfor %} +{% endif %} +{% if workflow_task.task_modifiers.kubernetes_volumes %} +{% for volume_path, volume_dict in workflow_task.task_modifiers.kubernetes_volumes.items() %} + volume.mount_pvc({{ task_name }}, "{{ volume_dict.pvc_name }}", "{{ volume_path }}") +{% endfor %} +{% endif %} +{% if workflow_task.task_modifiers.kubernetes_tolerations %} +{% endif %} +{# declare upstream dependencies -#} +{% if workflow_task.upstream_workflow_task_ids %} +{% for upstream_workflow_task_id in workflow_task.upstream_workflow_task_ids %} + {{ task_name }}.after(task_{{ upstream_workflow_task_id | python_safe }}) +{% endfor %} +{% endif %} +{% endfor %} + +if __name__ == "__main__": + from pathlib import Path + kfp.compiler.Compiler().compile( + pipeline_func=generated_pipeline, + package_path=Path(__file__).with_suffix(".yaml").name, + ) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index f8445c26d..8efc51be9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,14 +26,14 @@ dependencies = [ "jupyter-packaging>=0.10", "jupyter_server>=1.7.0", "jupyterlab>=3.4.6,<4.0", # comment out to use local jupyterlab - "jupyterlab-lsp>=3.8.0", # comment out to use local jupyterlab - "jupyterlab-git~=0.32", # Avoid breaking 1.x changes + "jupyterlab-lsp<=4.2.0", # comment out to use local jupyterlab + "jupyterlab-git<=0.44.0", # Avoid breaking 1.x changes "jupyter-resource-usage>=0.5.1,<1", "MarkupSafe>=2.1", "minio>=7.0.0,!=7.2.1", "nbclient>=0.5.1", "nbconvert>=6.5.1", - "nbdime~=3.1", # Cap from jupyterlab-git + "nbdime~=3.2.1", # Cap from jupyterlab-git "nbformat>=5.1.2", "networkx>=2.5.1", "papermill>=2.3.4", @@ -50,7 +50,8 @@ dependencies = [ "yaspin", # see: https://stackoverflow.com/questions/76175487/sudden-importerror-cannot-import-name-appengine-from-requests-packages-urlli "appengine-python-standard", - "kfp>=1.7.0,<2.0,!=1.7.2", # We cap the SDK to <2.0 due to possible breaking changes + "kfp>=2.0.0", # Cap kfp for protobuff compatibility + "kfp-kubernetes>=1.0.0", "pygithub", "black>=22.8.0", ] @@ -85,10 +86,6 @@ test = [ "pytest_virtualenv", "requests-mock", "requests-unixsocket", - "kfp-tekton" -] -kfp-tekton = [ - "kfp-tekton>=1.5.2" # requires kfp >= 1.8.19, which contains fix for Jupyterlab ] kfp-examples = [ "elyra-examples-kfp-catalog" @@ -98,7 +95,6 @@ gitlab = [ ] # The following is a collection of "non-test" extra dependencies from above. all = [ - "kfp-tekton>=1.5.2", "elyra-examples-kfp-catalog", "python-gitlab", ]