diff --git a/elyra/metadata/metadata.py b/elyra/metadata/metadata.py index 286a5f470..389630a63 100644 --- a/elyra/metadata/metadata.py +++ b/elyra/metadata/metadata.py @@ -33,7 +33,6 @@ class Metadata(object): resource = None display_name = None schema_name = None - version = None metadata = {} reason = None @@ -41,7 +40,6 @@ def __init__(self, **kwargs: Any) -> None: self.name = kwargs.get('name') self.display_name = kwargs.get('display_name') self.schema_name = kwargs.get('schema_name') - self.version = kwargs.get('version', 0) self.metadata = kwargs.get('metadata', {}) self.resource = kwargs.get('resource') self.reason = kwargs.get('reason') @@ -112,8 +110,7 @@ def from_dict(cls: Type[M], schemaspace: str, metadata_dict: dict) -> M: def to_dict(self, trim: bool = False) -> dict: # Exclude resource, and reason only if trim is True since we don't want to persist that information. # Method prepare_write will be used to remove name prior to writes. - d = dict(name=self.name, display_name=self.display_name, metadata=self.metadata, - schema_name=self.schema_name, version=self.version or 0) + d = dict(name=self.name, display_name=self.display_name, metadata=self.metadata, schema_name=self.schema_name) if not trim: if self.resource: d['resource'] = self.resource diff --git a/elyra/metadata/schemas/airflow.json b/elyra/metadata/schemas/airflow.json index 8ccf7cb9f..eacfd052f 100644 --- a/elyra/metadata/schemas/airflow.json +++ b/elyra/metadata/schemas/airflow.json @@ -6,6 +6,8 @@ "display_name": "Apache Airflow", "schemaspace": "runtimes", "schemaspace_id": "130b8e00-de7c-4b32-b553-b4a52824a3b5", + "metadata_class_name": "elyra.pipeline.runtimes_metadata.RuntimesMetadata", + "runtime_type": "APACHE_AIRFLOW", "uihints": { "title": "Apache Airflow runtimes", "icon": "elyra:runtimes", @@ -28,6 +30,12 @@ "description": "Additional data specific to this metadata", "type": "object", "properties": { + "runtime_type": { + "title": "Runtime Type", + "description": "The runtime associated with this instance", + "type": "string", + "const": "APACHE_AIRFLOW" + }, "description": { "title": "Description", "description": "Description of this Apache Airflow configuration", diff --git a/elyra/metadata/schemas/kfp.json b/elyra/metadata/schemas/kfp.json index a4b5a76f4..15638fd6d 100644 --- a/elyra/metadata/schemas/kfp.json +++ b/elyra/metadata/schemas/kfp.json @@ -7,6 +7,7 @@ "schemaspace": "runtimes", "schemaspace_id": "130b8e00-de7c-4b32-b553-b4a52824a3b5", "metadata_class_name": "elyra.pipeline.kfp.kfp_metadata.KfpMetadata", + "runtime_type": "KUBEFLOW_PIPELINES", "uihints": { "title": "Kubeflow Pipelines runtimes", "icon": "elyra:runtimes", @@ -29,6 +30,12 @@ "description": "Additional data specific to this metadata", "type": "object", "properties": { + "runtime_type": { + "title": "Runtime Type", + "description": "The runtime associated with this instance", + "type": "string", + "const": "KUBEFLOW_PIPELINES" + }, "description": { "title": "Description", "description": "Description of this Kubeflow Pipelines configuration", diff --git a/elyra/metadata/schemas/local-directory-catalog.json b/elyra/metadata/schemas/local-directory-catalog.json index 5e2d602e1..7207ef4bb 100644 --- a/elyra/metadata/schemas/local-directory-catalog.json +++ b/elyra/metadata/schemas/local-directory-catalog.json @@ -24,12 +24,6 @@ "type": "string", "minLength": 1 }, - "version": { - "title": "Version", - "description": "The version associated with this instance", - "type": "integer", - "const": 1 - }, "metadata": { "description": "Additional data specific to this metadata", "type": "object", @@ -39,12 +33,11 @@ "description": "Description of this Component Catalog", "type": "string" }, - "runtime": { - "title": "Runtime", - "description": "The runtime associated with this Component Catalog", + "runtime_type": { + "title": "Runtime Type", + "description": "The type of runtime associated with this Component Catalog", "type": "string", - "$comment": "This enum is dynamically generated to contain the available runtime values.", - "enum": ["{currently-configured-runtimes}"], + "enum": ["KUBEFLOW_PIPELINES", "APACHE_AIRFLOW"], "uihints": { "field_type": "dropdown", "category": "Runtime" @@ -86,8 +79,8 @@ } } }, - "required": ["runtime", "paths"] + "required": ["runtime_type", "paths"] } }, - "required": ["schema_name", "display_name", "version", "metadata"] + "required": ["schema_name", "display_name", "metadata"] } diff --git a/elyra/metadata/schemas/local-file-catalog.json b/elyra/metadata/schemas/local-file-catalog.json index 52b106ce3..41211ac47 100644 --- a/elyra/metadata/schemas/local-file-catalog.json +++ b/elyra/metadata/schemas/local-file-catalog.json @@ -24,12 +24,6 @@ "type": "string", "minLength": 1 }, - "version": { - "title": "Version", - "description": "The version associated with this instance", - "type": "integer", - "const": 1 - }, "metadata": { "description": "Additional data specific to this metadata", "type": "object", @@ -39,12 +33,11 @@ "description": "Description of this Component Catalog", "type": "string" }, - "runtime": { - "title": "Runtime", - "description": "The runtime associated with this Component Catalog", + "runtime_type": { + "title": "Runtime Type", + "description": "The type of runtime associated with this Component Catalog", "type": "string", - "$comment": "This enum is dynamically generated to contain the available runtime values.", - "enum": ["{currently-configured-runtimes}"], + "enum": ["KUBEFLOW_PIPELINES", "APACHE_AIRFLOW"], "uihints": { "field_type": "dropdown", "category": "Runtime" @@ -84,8 +77,8 @@ } } }, - "required": ["runtime", "paths"] + "required": ["runtime_type", "paths"] } }, - "required": ["schema_name", "display_name", "version", "metadata"] + "required": ["schema_name", "display_name", "metadata"] } diff --git a/elyra/metadata/schemas/url-catalog.json b/elyra/metadata/schemas/url-catalog.json index f364e4a30..b2b632124 100644 --- a/elyra/metadata/schemas/url-catalog.json +++ b/elyra/metadata/schemas/url-catalog.json @@ -24,12 +24,6 @@ "type": "string", "minLength": 1 }, - "version": { - "title": "Version", - "description": "The version associated with this instance", - "type": "integer", - "const": 1 - }, "metadata": { "description": "Additional data specific to this metadata", "type": "object", @@ -39,12 +33,11 @@ "description": "Description of this Component Catalog", "type": "string" }, - "runtime": { - "title": "Runtime", - "description": "The runtime associated with this Component Catalog", + "runtime_type": { + "title": "Runtime Type", + "description": "The type of runtime associated with this Component Catalog", "type": "string", - "$comment": "This enum is dynamically generated to contain the available runtime values.", - "enum": ["{currently-configured-runtimes}"], + "enum": ["KUBEFLOW_PIPELINES", "APACHE_AIRFLOW"], "uihints": { "field_type": "dropdown", "category": "Runtime" @@ -77,8 +70,8 @@ } } }, - "required": ["runtime", "paths"] + "required": ["runtime_type", "paths"] } }, - "required": ["schema_name", "display_name", "version", "metadata"] + "required": ["schema_name", "display_name", "metadata"] } diff --git a/elyra/metadata/schemaspaces.py b/elyra/metadata/schemaspaces.py index c13fcd1fe..40dc9f901 100644 --- a/elyra/metadata/schemaspaces.py +++ b/elyra/metadata/schemaspaces.py @@ -15,8 +15,6 @@ from typing import Dict -import entrypoints - from elyra.metadata.schema import Schemaspace @@ -67,27 +65,19 @@ def __init__(self, *args, **kwargs): display_name=ComponentRegistries.COMPONENT_REGISTRIES_SCHEMASPACE_DISPLAY_NAME, description="Schemaspace for instances of Elyra component registries configurations") - # get set of registered runtimes - self._runtime_processor_names = set() - for processor in entrypoints.get_group_all('elyra.pipeline.processors'): - # load the names of the runtime processors (skip 'local') - if processor.name == 'local': - continue - self._runtime_processor_names.add(processor.name) - def filter_schema(self, schema: Dict) -> Dict: """Replace contents of Runtimes value with set of runtimes if using templated value.""" - # Component-registry requires that `runtime` be a defined property so ensure its existence. - instance_properties = schema.get('properties', {}).get('metadata', {}).get('properties', {}) - runtime = instance_properties.get('runtime') - if not runtime: - raise ValueError(f"{ComponentRegistries.COMPONENT_REGISTRIES_SCHEMASPACE_DISPLAY_NAME} schemas are " - f"required to define a 'runtime' (string-valued) property and schema " - f"\'{schema.get('name')}\' does not define 'runtime'.") - - if runtime.get('enum') == ["{currently-configured-runtimes}"]: - runtime['enum'] = list(self._runtime_processor_names) + # Component-registry requires that `runtime_type` be a defined property so ensure its existence. + # Since schema 'component-registry' is deprecated, skip its check. + is_deprecated = schema.get('deprecated', False) + if not is_deprecated: # Skip deprecated schemas + instance_properties = schema.get('properties', {}).get('metadata', {}).get('properties', {}) + runtime_type = instance_properties.get('runtime_type') + if not runtime_type: + raise ValueError(f"{ComponentRegistries.COMPONENT_REGISTRIES_SCHEMASPACE_DISPLAY_NAME} schemas are " + f"required to define a 'runtime_type' (string-valued) property and schema " + f"\'{schema.get('name')}\' does not define 'runtime_type'.") # Component catalogs should have an associated 'metadata' class name # If none is provided, use the ComponentCatalogMetadata class, which implements diff --git a/elyra/pipeline/airflow/component_parser_airflow.py b/elyra/pipeline/airflow/component_parser_airflow.py index 2f269a874..c7dcf9616 100644 --- a/elyra/pipeline/airflow/component_parser_airflow.py +++ b/elyra/pipeline/airflow/component_parser_airflow.py @@ -23,11 +23,12 @@ from elyra.pipeline.component import Component from elyra.pipeline.component import ComponentParameter from elyra.pipeline.component import ComponentParser +from elyra.pipeline.runtime_type import RuntimeProcessorType class AirflowComponentParser(ComponentParser): - _component_platform = "airflow" - _file_types = [".py"] + _component_platform: RuntimeProcessorType = RuntimeProcessorType.APACHE_AIRFLOW + _file_types: List[str] = [".py"] def parse(self, registry_entry: SimpleNamespace) -> Optional[List[Component]]: components: List[Component] = list() @@ -51,7 +52,7 @@ def parse(self, registry_entry: SimpleNamespace) -> Optional[List[Component]]: catalog_type=registry_entry.catalog_type, source_identifier=registry_entry.component_identifier, definition=self.get_class_def_as_string(component_content), - runtime=self.component_platform, + runtime_type=self.component_platform.name, categories=registry_entry.categories, properties=component_properties ) diff --git a/elyra/pipeline/airflow/processor_airflow.py b/elyra/pipeline/airflow/processor_airflow.py index 6d831e26c..c4ab643ae 100644 --- a/elyra/pipeline/airflow/processor_airflow.py +++ b/elyra/pipeline/airflow/processor_airflow.py @@ -39,12 +39,14 @@ from elyra.pipeline.processor import PipelineProcessor from elyra.pipeline.processor import PipelineProcessorResponse from elyra.pipeline.processor import RuntimePipelineProcessor +from elyra.pipeline.runtime_type import RuntimeProcessorType from elyra.util.git import GithubClient from elyra.util.path import get_absolute_path class AirflowPipelineProcessor(RuntimePipelineProcessor): - _type = 'airflow' + _type = RuntimeProcessorType.APACHE_AIRFLOW + _name = 'airflow' # Provide users with the ability to identify a writable directory in the # running container where the notebook | script is executed. The location @@ -73,10 +75,6 @@ class AirflowPipelineProcessor(RuntimePipelineProcessor): # Contains mappings from class to import statement for each available Airflow operator class_import_map = {} - @property - def type(self): - return self._type - def __init__(self, root_dir, **kwargs): super().__init__(root_dir, component_parser=AirflowComponentParser(), **kwargs) if not self.class_import_map: # Only need to load once @@ -432,16 +430,13 @@ def _process_list_value(self, value: str) -> Union[List, str]: class AirflowPipelineProcessorResponse(PipelineProcessorResponse): - _type = 'airflow' + _type = RuntimeProcessorType.APACHE_AIRFLOW + _name = 'airflow' def __init__(self, git_url, run_url, object_storage_url, object_storage_path): super().__init__(run_url, object_storage_url, object_storage_path) self.git_url = git_url - @property - def type(self): - return self._type - def to_json(self): response = super().to_json() response['git_url'] = self.git_url diff --git a/elyra/pipeline/component.py b/elyra/pipeline/component.py index fb97c503c..83054bad9 100644 --- a/elyra/pipeline/component.py +++ b/elyra/pipeline/component.py @@ -22,6 +22,8 @@ from traitlets.config import LoggingConfigurable +from elyra.pipeline.runtime_type import RuntimeProcessorType + class ComponentParameter(object): """ @@ -122,7 +124,7 @@ def __init__(self, catalog_type: str, source_identifier: Any, definition: Optional[str] = None, - runtime: Optional[str] = None, + runtime_type: Optional[RuntimeProcessorType] = None, op: Optional[str] = None, categories: Optional[List[str]] = None, properties: Optional[List[ComponentParameter]] = None, @@ -136,7 +138,7 @@ def __init__(self, location; one of ['url', filename', 'directory] :param source_identifier: Source information to help locate the component definition :param definition: The content of the specification file for this component - :param runtime: The runtime of the component (e.g. KFP or Airflow) + :param runtime_type: The runtime type of the component (e.g. KUBEFLOW_PIPELINES, APACHE_AIRFLOW, etc.) :param op: The operation name of the component; used by generic components in rendering the palette :param categories: A list of categories that this component belongs to; used to organize component in the palette @@ -156,7 +158,7 @@ def __init__(self, self._source_identifier = source_identifier self._definition = definition - self._runtime = runtime + self._runtime_type = runtime_type self._op = op self._categories = categories or [] self._properties = properties @@ -209,8 +211,8 @@ def definition(self) -> str: return self._definition @property - def runtime(self) -> Optional[str]: - return self._runtime + def runtime_type(self) -> Optional[RuntimeProcessorType]: + return self._runtime_type @property def op(self) -> Optional[str]: @@ -244,10 +246,11 @@ def _log_warning(msg: str, logger: Optional[Logger] = None): class ComponentParser(LoggingConfigurable): # ABC - _component_platform = None + _component_platform: RuntimeProcessorType = None + _file_types: List[str] = None @property - def component_platform(self) -> str: + def component_platform(self) -> RuntimeProcessorType: return self._component_platform @property diff --git a/elyra/pipeline/component_metadata.py b/elyra/pipeline/component_metadata.py index 229c8f34a..da519e62e 100644 --- a/elyra/pipeline/component_metadata.py +++ b/elyra/pipeline/component_metadata.py @@ -56,26 +56,37 @@ class ComponentCatalogMetadata(Metadata): This class contains methods to trigger cache updates on modification and deletion of component registry metadata instances. """ + def on_load(self, **kwargs: Any) -> None: + """Check for runtime property and update to runtime_type """ + if 'runtime' in self.metadata: + if self.metadata['runtime'] == 'kfp': + self.metadata['runtime_type'] = 'KUBEFLOW_PIPELINES' + else: # self.metadata['runtime'] == 'airflow' + self.metadata['runtime_type'] = 'APACHE_AIRFLOW' + self.metadata.pop('runtime', None) + getLogger('ServerApp').info(f"Upgrading component-registry {self.schema_name} instance '{self.name}' " + f"to include runtime_type '{self.metadata['runtime_type']}'...") + MetadataManager(schemaspace="component-registries").update(self.name, self, for_migration=True) def post_save(self, **kwargs: Any) -> None: - processor_type = self.metadata.get('runtime') + processor_type = self.metadata.get('runtime_type') - # Get processor instance and update its cache - try: - processor = PipelineProcessorRegistry.instance().get_processor(processor_type=processor_type) - if processor.component_registry.caching_enabled: - processor.component_registry.update_cache(catalog=self, operation='modify') + # Get component catalog and update its cache + try: # TODO: This should move to ComponentCatalog once made a singleton + component_catalog = PipelineProcessorRegistry.instance().get_catalog(processor_type) + if component_catalog.caching_enabled: + component_catalog.update_cache(catalog=self, operation='modify') except Exception: pass def post_delete(self, **kwargs: Any) -> None: - processor_type = self.metadata.get('runtime') + processor_type = self.metadata.get('runtime_type') - # Get processor instance and update its cache - try: - processor = PipelineProcessorRegistry.instance().get_processor(processor_type=processor_type) - if processor.component_registry.caching_enabled: - processor.component_registry.update_cache(catalog=self, operation='delete') + # Get component catalog and update its cache + try: # TODO: This should move to ComponentCatalog once made a singleton + component_catalog = PipelineProcessorRegistry.instance().get_catalog(processor_type) + if component_catalog.caching_enabled: + component_catalog.update_cache(catalog=self, operation='delete') except Exception: pass diff --git a/elyra/pipeline/component_registry.py b/elyra/pipeline/component_registry.py index 42a597454..459b52ae7 100644 --- a/elyra/pipeline/component_registry.py +++ b/elyra/pipeline/component_registry.py @@ -126,7 +126,7 @@ def get_component(self, component_id: str) -> Optional[Component]: if component is None: self.log.error(f"Component with ID '{component_id}' could not be found in any " - f"{self._parser.component_platform} catalog.") + f"{self._parser.component_platform.name} catalog.") return component @@ -282,9 +282,10 @@ def _get_catalogs_for_runtime(self) -> List[Metadata]: registries = MetadataManager(schemaspace=ComponentRegistries.COMPONENT_REGISTRIES_SCHEMASPACE_ID)\ .get_all() - # Filter registries according to processor type - runtime_catalogs = [r for r in registries if r.metadata['runtime'] == self._parser.component_platform] + # Filter catalogs according to processor type + runtime_catalogs = \ + [r for r in registries if r.metadata['runtime_type'] == self._parser.component_platform.name] except Exception: - self.log.error(f"Could not access registries for processor: {self._parser.component_platform}") + self.log.error(f"Could not access registries for processor type: {self._parser.component_platform.name}") return runtime_catalogs diff --git a/elyra/pipeline/kfp/component_parser_kfp.py b/elyra/pipeline/kfp/component_parser_kfp.py index 6ac65fb35..0c27f9ca0 100644 --- a/elyra/pipeline/kfp/component_parser_kfp.py +++ b/elyra/pipeline/kfp/component_parser_kfp.py @@ -24,11 +24,12 @@ from elyra.pipeline.component import Component from elyra.pipeline.component import ComponentParameter from elyra.pipeline.component import ComponentParser +from elyra.pipeline.runtime_type import RuntimeProcessorType class KfpComponentParser(ComponentParser): - _component_platform = "kfp" - _file_types = [".yaml"] + _component_platform: RuntimeProcessorType = RuntimeProcessorType.KUBEFLOW_PIPELINES + _file_types: List[str] = [".yaml"] def parse(self, registry_entry: SimpleNamespace) -> Optional[List[Component]]: # Get YAML object from component definition @@ -51,7 +52,7 @@ def parse(self, registry_entry: SimpleNamespace) -> Optional[List[Component]]: catalog_type=registry_entry.catalog_type, source_identifier=registry_entry.component_identifier, definition=registry_entry.component_definition, - runtime=self.component_platform, + runtime_type=self.component_platform.name, categories=registry_entry.categories, properties=component_properties ) diff --git a/elyra/pipeline/kfp/kfp_metadata.py b/elyra/pipeline/kfp/kfp_metadata.py index b0f71ea69..fb78f8052 100644 --- a/elyra/pipeline/kfp/kfp_metadata.py +++ b/elyra/pipeline/kfp/kfp_metadata.py @@ -27,6 +27,7 @@ class KfpMetadata(RuntimesMetadata): """ def on_load(self, **kwargs: Any) -> None: + super().on_load(**kwargs) if self.metadata.get('auth_type') is None: # Inject auth_type property for metadata persisted using Elyra < 3.3: diff --git a/elyra/pipeline/kfp/processor_kfp.py b/elyra/pipeline/kfp/processor_kfp.py index dd169c0e0..4db119c6f 100644 --- a/elyra/pipeline/kfp/processor_kfp.py +++ b/elyra/pipeline/kfp/processor_kfp.py @@ -52,11 +52,13 @@ from elyra.pipeline.processor import PipelineProcessor from elyra.pipeline.processor import PipelineProcessorResponse from elyra.pipeline.processor import RuntimePipelineProcessor +from elyra.pipeline.runtime_type import RuntimeProcessorType from elyra.util.path import get_absolute_path class KfpPipelineProcessor(RuntimePipelineProcessor): - _type = 'kfp' + _type = RuntimeProcessorType.KUBEFLOW_PIPELINES + _name = 'kfp' # Provide users with the ability to identify a writable directory in the # running container where the notebook | script is executed. The location @@ -64,10 +66,6 @@ class KfpPipelineProcessor(RuntimePipelineProcessor): # Defaults to `/tmp` WCD = os.getenv('ELYRA_WRITABLE_CONTAINER_DIR', '/tmp').strip().rstrip('/') - @property - def type(self): - return self._type - def __init__(self, root_dir, **kwargs): super().__init__(root_dir, component_parser=KfpComponentParser(), **kwargs) @@ -734,12 +732,5 @@ def _sanitize_operation_name(name: str) -> str: class KfpPipelineProcessorResponse(PipelineProcessorResponse): - - _type = 'kfp' - - def __init__(self, run_url, object_storage_url, object_storage_path): - super().__init__(run_url, object_storage_url, object_storage_path) - - @property - def type(self): - return self._type + _type = RuntimeProcessorType.KUBEFLOW_PIPELINES + _name = 'kfp' diff --git a/elyra/pipeline/local/processor_local.py b/elyra/pipeline/local/processor_local.py index 2c5e97a4c..16930c918 100644 --- a/elyra/pipeline/local/processor_local.py +++ b/elyra/pipeline/local/processor_local.py @@ -34,6 +34,7 @@ from elyra.pipeline.pipeline import GenericOperation from elyra.pipeline.processor import PipelineProcessor from elyra.pipeline.processor import PipelineProcessorResponse +from elyra.pipeline.runtime_type import RuntimeProcessorType from elyra.util.path import get_absolute_path @@ -50,7 +51,8 @@ class LocalPipelineProcessor(PipelineProcessor): Note: Execution happens in-place and a ledger of runs will be available at $TMPFILE/elyra/pipeline-name- """ _operation_processor_registry: Dict - _type = 'local' + _type = RuntimeProcessorType.LOCAL + _name = 'local' def __init__(self, root_dir, **kwargs): super().__init__(root_dir, **kwargs) @@ -63,10 +65,6 @@ def __init__(self, root_dir, **kwargs): r_op_processor.operation_name: r_op_processor, } - @property - def type(self): - return self._type - def get_components(self): return ComponentRegistry.get_generic_components() @@ -115,15 +113,12 @@ def export(self, pipeline, pipeline_export_format, pipeline_export_path, overwri class LocalPipelineProcessorResponse(PipelineProcessorResponse): - _type = 'local' + _type = RuntimeProcessorType.LOCAL + _name = 'local' def __init__(self): super().__init__('', '', '') - @property - def type(self): - return self._type - class OperationProcessor(ABC): diff --git a/elyra/pipeline/pipeline.py b/elyra/pipeline/pipeline.py index 23d7aed44..6b0b7660a 100644 --- a/elyra/pipeline/pipeline.py +++ b/elyra/pipeline/pipeline.py @@ -361,14 +361,14 @@ def source(self) -> str: @property def runtime(self) -> str: """ - Describe the runtime type where the pipeline will be executed + The runtime processor name that will execute the pipeline """ return self._runtime @property def runtime_config(self) -> str: """ - Describe the runtime configuration that should be used to submit the pipeline to execution + The runtime configuration that should be used to submit the pipeline for execution """ return self._runtime_config diff --git a/elyra/pipeline/pipeline_definition.py b/elyra/pipeline/pipeline_definition.py index 60e9fc55f..17d9883b0 100644 --- a/elyra/pipeline/pipeline_definition.py +++ b/elyra/pipeline/pipeline_definition.py @@ -20,6 +20,8 @@ from typing import List from typing import Optional +from elyra.pipeline.runtime_type import RuntimeProcessorType + class AppDataBase(object): # ABC """ @@ -92,7 +94,7 @@ def version(self) -> int: @property def runtime(self) -> str: """ - The runtime associated with the pipeline + The runtime processor name associated with the pipeline :return: The runtime keyword """ return self._node['app_data'].get('runtime') @@ -111,17 +113,18 @@ def type(self): The pipeline type :return: The runtime keyword associated with the pipeline or `generic` """ - type_description_to_type = {'Kubeflow Pipelines': 'kfp', - 'Apache Airflow': 'airflow', - 'Generic': 'generic'} - - if 'properties' in self._node['app_data']: - pipeline_type_description = self._node['app_data']['properties'].get('runtime', 'Generic') - if pipeline_type_description not in type_description_to_type.keys(): - raise ValueError(f'Unsupported pipeline runtime: {pipeline_type_description}') - return type_description_to_type[pipeline_type_description] - else: - return type_description_to_type['Generic'] + if 'runtime_type' in self._node['app_data']: + runtime_type = self._node['app_data'].get('runtime_type') + try: + RuntimeProcessorType.get_instance_by_name(runtime_type) + except (KeyError, TypeError): + # Check for 'generic'... + if runtime_type.lower() != 'generic': + raise ValueError(f'Unsupported pipeline runtime: {runtime_type}') + runtime_type = 'generic' + return runtime_type + + return 'generic' @property def name(self) -> str: diff --git a/elyra/pipeline/processor.py b/elyra/pipeline/processor.py index 46f7f5568..2e4508dee 100644 --- a/elyra/pipeline/processor.py +++ b/elyra/pipeline/processor.py @@ -40,6 +40,7 @@ from elyra.pipeline.pipeline import GenericOperation from elyra.pipeline.pipeline import Operation from elyra.pipeline.pipeline import Pipeline +from elyra.pipeline.runtime_type import RuntimeProcessorType from elyra.util.archive import create_temp_archive from elyra.util.cos import CosClient from elyra.util.path import get_expanded_path @@ -48,7 +49,7 @@ class PipelineProcessorRegistry(SingletonConfigurable): - _processors = {} + _processors: Dict[str, 'PipelineProcessor'] = {} def __init__(self, **kwargs): super().__init__(**kwargs) @@ -67,17 +68,29 @@ def __init__(self, **kwargs): f'"{processor.module_name}.{processor.object_name}" - {err}') def add_processor(self, processor): - self.log.debug(f'Registering processor {processor.type}') - self._processors[processor.type] = processor + self.log.debug(f"Registering {processor.type.value} runtime processor '{processor.name}'") + self._processors[processor.name] = processor - def get_processor(self, processor_type: str): - if self.is_valid_processor(processor_type): - return self._processors[processor_type] + def get_processor(self, processor_name: str): + if self.is_valid_processor(processor_name): + return self._processors[processor_name] else: - raise RuntimeError('Could not find pipeline processor for [{}]'.format(processor_type)) + raise RuntimeError(f"Could not find pipeline processor '{processor_name}'") + + # TODO: This should move to ComponentCatalog once made a singleton + def get_catalog(self, processor_type: str): + # This should be updated when we decouple the catalog from the processors. For now + # (and because there will be few processors) we will just walk the list until we find + # a processor with a matching type, then confirm the instances is RuntimePipelineProcessor. + for name, processor in self._processors.items(): + if processor.type.name == processor_type and isinstance(processor, RuntimePipelineProcessor): + runtime_processor: RuntimePipelineProcessor = processor + return runtime_processor.component_registry + else: + raise RuntimeError(f"Could not find component catalog associated with type '{processor_type}'!") - def is_valid_processor(self, processor_type: str) -> bool: - return processor_type in self._processors.keys() + def is_valid_processor(self, processor_name: str) -> bool: + return processor_name in self._processors.keys() class PipelineProcessorManager(SingletonConfigurable): @@ -88,21 +101,25 @@ def __init__(self, **kwargs): self.root_dir = get_expanded_path(kwargs.get('root_dir')) self._registry = PipelineProcessorRegistry.instance() - def _get_processor_for_runtime(self, processor_type: str): - processor = self._registry.get_processor(processor_type) + def _get_processor_for_runtime(self, runtime_name: str): + processor = self._registry.get_processor(runtime_name) return processor - def is_supported_runtime(self, processor_type: str) -> bool: - return self._registry.is_valid_processor(processor_type) + def is_supported_runtime(self, runtime_name: str) -> bool: + return self._registry.is_valid_processor(runtime_name) + + def get_runtime_type(self, runtime_name: str) -> RuntimeProcessorType: + processor = self._get_processor_for_runtime(runtime_name) + return processor.type - async def get_components(self, processor_type): - processor = self._get_processor_for_runtime(processor_type) + async def get_components(self, runtime_name): + processor = self._get_processor_for_runtime(runtime_name) res = await asyncio.get_event_loop().run_in_executor(None, processor.get_components) return res - async def get_component(self, processor_type, component_id): - processor = self._get_processor_for_runtime(processor_type) + async def get_component(self, runtime_name, component_id): + processor = self._get_processor_for_runtime(runtime_name) res = await asyncio.get_event_loop().\ run_in_executor(None, functools.partial(processor.get_component, component_id=component_id)) @@ -124,7 +141,8 @@ async def export(self, pipeline, pipeline_export_format, pipeline_export_path, o class PipelineProcessorResponse(ABC): - _type = None + _type: RuntimeProcessorType = None + _name: str = None def __init__(self, run_url, object_storage_url, object_storage_path): self._run_url = run_url @@ -132,9 +150,16 @@ def __init__(self, run_url, object_storage_url, object_storage_path): self._object_storage_path = object_storage_path @property - @abstractmethod - def type(self): - raise NotImplementedError() + def type(self) -> str: # Return the string value of the name so that JSON serialization works + if self._type is None: + raise NotImplementedError("_type must have a value!") + return self._type.name + + @property + def name(self) -> str: + if self._name is None: + raise NotImplementedError("_name must have a value!") + return self._name @property def run_url(self): @@ -169,7 +194,8 @@ def to_json(self): class PipelineProcessor(LoggingConfigurable): # ABC - _type: str = None + _type: RuntimeProcessorType = None + _name: str = None _component_registry: ComponentRegistry = None @@ -185,9 +211,16 @@ def __init__(self, root_dir, **kwargs): self.root_dir = root_dir @property - @abstractmethod - def type(self) -> str: - raise NotImplementedError() + def type(self): + if self._type is None: + raise NotImplementedError("_type must have a value!") + return self._type + + @property + def name(self): + if self._name is None: + raise NotImplementedError("_name must have a value!") + return self._name def get_components(self) -> List[Component]: """ @@ -244,7 +277,7 @@ def log_pipeline_info(self, pipeline_name: str, action_clause: str, **kwargs): operation_name = kwargs.get('operation_name') op_clause = f":'{operation_name}'" if operation_name else "" - self.log.info(f"{self._type} '{pipeline_name}'{op_clause} - {action_clause} {duration_clause}") + self.log.info(f"{self._name} '{pipeline_name}'{op_clause} - {action_clause} {duration_clause}") @staticmethod def _propagate_operation_inputs_outputs(pipeline: Pipeline, sorted_operations: List[Operation]) -> None: @@ -300,10 +333,6 @@ def _sort_operation_dependencies(operations_by_id: dict, ordered_operations: lis class RuntimePipelineProcessor(PipelineProcessor): - @property - def component_parser(self) -> ComponentParser: - return self._component_parser - @property def component_registry(self) -> ComponentRegistry: return self._component_registry @@ -311,7 +340,9 @@ def component_registry(self) -> ComponentRegistry: def __init__(self, root_dir: str, component_parser: ComponentParser, **kwargs): super().__init__(root_dir, **kwargs) - self._component_parser = component_parser + # TODO - we should look into decoupling the registry/parser from the proessor + # TODO - make ComponentRegistry a singleton that loads all component catalogs + # associated with the types corresponding to each registered runtime processor. self._component_registry = ComponentRegistry(component_parser, parent=self.parent) def _get_dependency_archive_name(self, operation): @@ -402,7 +433,7 @@ def _collect_envs(self, operation: GenericOperation, **kwargs) -> Dict: """ envs: Dict = operation.env_vars_as_dict(logger=self.log) - envs['ELYRA_RUNTIME_ENV'] = self.type + envs['ELYRA_RUNTIME_ENV'] = self.name if 'cos_secret' not in kwargs or not kwargs['cos_secret']: envs['AWS_ACCESS_KEY_ID'] = kwargs['cos_username'] diff --git a/elyra/pipeline/runtime_type.py b/elyra/pipeline/runtime_type.py new file mode 100644 index 000000000..c42e3ba65 --- /dev/null +++ b/elyra/pipeline/runtime_type.py @@ -0,0 +1,52 @@ +# +# Copyright 2018-2021 Elyra Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from enum import Enum +from enum import unique + + +@unique +class RuntimeProcessorType(Enum): + """RuntimeProcessorType enumerates the set of platforms targeted by runtime processors. + + Each runtime processor implementation (subclass of PipelineProcessor) will reflect one + of these values. Users implementing their own runtime processor that corresponds to a + type not listed in this enumeration are responsible for appropriately extending this + enumeration and reflecting that entry in the corresponding runtime schema in order to + fully integrate their processor with Elyra. + """ + LOCAL = 'Local' + KUBEFLOW_PIPELINES = 'Kubeflow Pipelines' + APACHE_AIRFLOW = 'Apache Airflow' + ARGO = 'Argo' + + @staticmethod + def get_instance_by_name(name: str) -> 'RuntimeProcessorType': + """Returns an instance of RuntimeProcessorType corresponding to the given name. + + Raises KeyError if parameter is not a name in the enumeration. + """ + return RuntimeProcessorType.__members__[name] + + @staticmethod + def get_instance_by_value(value: str) -> 'RuntimeProcessorType': + """Returns an instance of RuntimeProcessorType corresponding to the given value. + + Raises KeyError if parameter is not a value in the enumeration. + """ + for instance in RuntimeProcessorType.__members__.values(): + if instance.value == value: + return instance + raise KeyError(f"'{value}'") diff --git a/elyra/pipeline/runtimes_metadata.py b/elyra/pipeline/runtimes_metadata.py index cf30cc445..fea541b6d 100644 --- a/elyra/pipeline/runtimes_metadata.py +++ b/elyra/pipeline/runtimes_metadata.py @@ -13,8 +13,33 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from logging import getLogger +from typing import Any + +from elyra.metadata.manager import MetadataManager from elyra.metadata.metadata import Metadata class RuntimesMetadata(Metadata): - pass + """ + This class will be instantiated for any instance for a schema within the + Runtimes schemaspace. + """ + + def on_load(self, **kwargs: Any) -> None: + """Perform any necessary adjustments, migrations when instance is loaded.""" + + # If there's no runtime_type property in the instance metadata, infer from schema_name + if 'runtime_type' not in self.metadata: + if self.schema_name == 'kfp': + self.metadata['runtime_type'] = 'KUBEFLOW_PIPELINES' + elif self.schema_name == 'airflow': + self.metadata['runtime_type'] = 'APACHE_AIRFLOW' + elif self.schema_name == 'argo': + self.metadata['runtime_type'] = 'ARGO' + else: + raise ValueError(f"Unknown Runtimes schema name detected: '{self.schema_name}'! Skipping...") + + getLogger('ServerApp').info(f"Upgrading runtime {self.schema_name} instance '{self.name}' " + f"to include runtime_type '{self.metadata['runtime_type']}'...") + MetadataManager(schemaspace="runtimes").update(self.name, self, for_migration=True) diff --git a/elyra/pipeline/validation.py b/elyra/pipeline/validation.py index 9c1bbc096..4b84294e4 100644 --- a/elyra/pipeline/validation.py +++ b/elyra/pipeline/validation.py @@ -25,6 +25,8 @@ import networkx as nx from traitlets.config import SingletonConfigurable +from elyra.metadata.schema import SchemaManager +from elyra.metadata.schemaspaces import Runtimes from elyra.pipeline.component import Component from elyra.pipeline.component_registry import ComponentRegistry from elyra.pipeline.pipeline import Operation @@ -32,6 +34,7 @@ from elyra.pipeline.pipeline import PIPELINE_CURRENT_VERSION from elyra.pipeline.pipeline_definition import PipelineDefinition from elyra.pipeline.processor import PipelineProcessorManager +# from elyra.pipeline.runtime_type import RuntimeProcessorType from elyra.util.path import get_expanded_path @@ -194,6 +197,19 @@ def _validate_pipeline_structure(self, pipeline_definition: PipelineDefinition, data={"supported_version": PIPELINE_CURRENT_VERSION, "detected_version": pipeline_version}) + @staticmethod + def _is_compatible_pipeline(runtime_name: str, runtime_type: str): + """Returns true if the pipeline's runtime name is compatible to its type. """ + if runtime_type.lower() == 'generic': + return True # TODO: this won't always be true as some runtime impls won't support generics + # We need to make the "local" runtimes a real runtime someday! Until then, we have this... + if runtime_name.lower() == 'local': + runtime_type_from_schema = runtime_name.upper() # use the up-cased value since runtime_types are up-cased + else: # fetch the metadata instance corresponding to runtime_name and compare its runtime_type + runtime_schema = SchemaManager.instance().get_schema(Runtimes.RUNTIMES_SCHEMASPACE_ID, runtime_name) + runtime_type_from_schema = runtime_schema.get('runtime_type') + return runtime_type_from_schema == runtime_type + async def _validate_compatibility(self, pipeline_definition: PipelineDefinition, pipeline_type: str, pipeline_runtime: str, @@ -202,7 +218,7 @@ async def _validate_compatibility(self, pipeline_definition: PipelineDefinition, Checks that the pipeline payload is compatible with this version of elyra (ISSUE #938) as well as verifying all nodes in the pipeline are supported by the runtime :param pipeline_definition: the pipeline definition to be validated - :param pipeline_type: name of the pipeline runtime being used e.g. kfp, airflow, generic + :param pipeline_type: type of the pipeline runtime being used e.g. KUBEFLOW_PIPELINES, APACHE_AIRFLOW, generic :param pipeline_runtime: name of the pipeline runtime for execution e.g. kfp, airflow, local :param response: ValidationResponse containing the issue list to be updated """ @@ -211,7 +227,7 @@ async def _validate_compatibility(self, pipeline_definition: PipelineDefinition, supported_ops = [] if pipeline_runtime: - if pipeline_runtime != pipeline_type and pipeline_type != 'generic': + if not PipelineValidationManager._is_compatible_pipeline(pipeline_runtime, pipeline_type): response.add_message(severity=ValidationSeverity.Error, message_type="invalidRuntime", message="Pipeline runtime platform is not compatible " @@ -259,7 +275,7 @@ async def _validate_node_properties(self, pipeline_definition: PipelineDefinitio """ if pipeline_runtime: # don't check if incompatible pipeline type and runtime - if pipeline_runtime != pipeline_type and pipeline_type != 'generic': + if not PipelineValidationManager._is_compatible_pipeline(pipeline_runtime, pipeline_type): return for pipeline in pipeline_definition.pipelines: @@ -304,7 +320,7 @@ async def _validate_node_properties(self, pipeline_definition: PipelineDefinitio # Validate runtime components against specific node properties in component registry else: # This is the full dict of properties for the operation e.g. current params, optionals etc - property_dict = await self._get_component_properties(pipeline_type, components, node.op) + property_dict = await self._get_component_properties(pipeline_runtime, components, node.op) cleaned_property_list = list(map(lambda x: str(x).replace('elyra_', ''), property_dict['current_parameters'].keys())) @@ -582,7 +598,7 @@ def _get_pipeline_id(self, pipeline: dict, node_id: str) -> str: if node['id'] == node_id: return single_pipeline['id'] - async def _get_component_properties(self, pipeline_type: str, components: dict, node_op: str) -> Dict: + async def _get_component_properties(self, pipeline_runtime: str, components: dict, node_op: str) -> Dict: """ Retrieve the full dict of properties associated with the node_op :param components: list of components associated with the pipeline runtime being used e.g. kfp, airflow @@ -600,32 +616,12 @@ async def _get_component_properties(self, pipeline_type: str, components: dict, for node_type in category['node_types']: if node_op == node_type['op']: component: Component = \ - await PipelineProcessorManager.instance().get_component(pipeline_type, node_op) + await PipelineProcessorManager.instance().get_component(pipeline_runtime, node_op) component_properties = ComponentRegistry.to_canvas_properties(component) return component_properties return {} - def _get_runtime_schema(self, pipeline: dict, response: ValidationResponse) -> str: - pipeline_json = json.loads(json.dumps(pipeline)) - if not self._is_legacy_pipeline(pipeline): - runtime = pipeline_json['pipelines'][0]['app_data']['properties'].get('runtime') - else: - # Assume Generic since properties field doesnt exist = older version of pipeline schema - runtime = "Generic" - - if runtime == "Kubeflow Pipelines": - return "kfp" - elif runtime == "Apache Airflow": - return "airflow" - elif runtime == "Generic": - return "generic" - else: - response.add_message(severity=ValidationSeverity.Error, - message_type="invalidRuntime", - message="Unsupported pipeline runtime selected in this pipeline.", - data={"pipelineRuntime": runtime}) - def _get_node_names(self, pipeline: dict, node_id_list: list) -> List: """ Given a node_id_list, will return the node's name for each node_id in the list, respectively diff --git a/elyra/templates/components/canvas_palette_template.jinja2 b/elyra/templates/components/canvas_palette_template.jinja2 index b7666512b..0f25ec6a3 100644 --- a/elyra/templates/components/canvas_palette_template.jinja2 +++ b/elyra/templates/components/canvas_palette_template.jinja2 @@ -15,6 +15,9 @@ "id": "{{ component.id }}", "image": "", "label": "{{ component.name }}", + {% if component.runtime_type %} + "runtime_type": "{{ component.runtime_type }}", + {% endif %} "type": "execution_node", "inputs": [ { diff --git a/elyra/tests/cli/resources/kfp_3_node_custom.pipeline b/elyra/tests/cli/resources/kfp_3_node_custom.pipeline index 1ade751cb..28f93ae02 100644 --- a/elyra/tests/cli/resources/kfp_3_node_custom.pipeline +++ b/elyra/tests/cli/resources/kfp_3_node_custom.pipeline @@ -177,9 +177,9 @@ }, "version": 5, "runtime": "kfp", + "runtime_type": "KUBEFLOW_PIPELINES", "properties": { "name": "kfp_custom", - "runtime": "Kubeflow Pipelines", "description": "3-node custom component pipeline" } }, diff --git a/elyra/tests/cli/test_pipeline_app.py b/elyra/tests/cli/test_pipeline_app.py index a216e830b..bcce465a7 100644 --- a/elyra/tests/cli/test_pipeline_app.py +++ b/elyra/tests/cli/test_pipeline_app.py @@ -285,7 +285,7 @@ def test_describe_with_kfp_components(): result = runner.invoke(pipeline, ['describe', pipeline_file_path]) assert "Description: 3-node custom component pipeline" in result.output - assert "Type: kfp" in result.output + assert "Type: KUBEFLOW_PIPELINES" in result.output assert "Nodes: 3" in result.output assert "File Dependencies:\n None Listed" in result.output assert "- https://raw.githubusercontent.com/kubeflow/pipelines/1.6.0/components/" \ @@ -323,7 +323,7 @@ def test_describe_with_missing_kfp_component(): result = runner.invoke(pipeline, ['describe', pipeline_file_path]) assert "Description: 3-node custom component pipeline" in result.output - assert "Type: kfp" in result.output + assert "Type: KUBEFLOW_PIPELINES" in result.output assert "Nodes: 3" in result.output assert result.exit_code == 0 diff --git a/elyra/tests/metadata/test_metadata.py b/elyra/tests/metadata/test_metadata.py index 9cd2d72c0..627b8dbae 100644 --- a/elyra/tests/metadata/test_metadata.py +++ b/elyra/tests/metadata/test_metadata.py @@ -189,7 +189,6 @@ def test_manager_get_none(tests_manager, schemaspace_location): def test_manager_get_all_none(tests_manager, schemaspace_location): - # TODO - there is no schemaspace removal - test requires update # Delete the schemaspace contents and attempt listing metadata _remove_schemaspace(tests_manager.metadata_store, schemaspace_location) assert tests_manager.schemaspace_exists() is False @@ -203,7 +202,6 @@ def test_manager_get_all_none(tests_manager, schemaspace_location): def test_manager_add_remove_valid(tests_manager, schemaspace_location): metadata_name = 'valid_add_remove' - # TODO - there is no schemaspace removal - test requires update # Remove schemaspace_location and ensure it gets created _remove_schemaspace(tests_manager.metadata_store, schemaspace_location) @@ -652,7 +650,6 @@ def test_manager_hierarchy_remove(tests_hierarchy_manager, factory_location, sha # ########################## MetadataStore Tests ########################### def test_store_schemaspace(store_manager, schemaspace_location): - # TODO - there is no schemaspace removal - test requires update # Delete the metadata dir contents and attempt listing metadata _remove_schemaspace(store_manager, schemaspace_location) assert store_manager.schemaspace_exists() is False @@ -668,7 +665,6 @@ def test_store_fetch_instances(store_manager): def test_store_fetch_no_schemaspace(store_manager, schemaspace_location): - # TODO - there is no schemaspace removal - test requires update # Delete the schemaspace contents and attempt listing metadata _remove_schemaspace(store_manager, schemaspace_location) @@ -689,7 +685,6 @@ def test_store_fetch_missing(store_manager): def test_store_store_instance(store_manager, schemaspace_location): - # TODO - there is no schemaspace removal - test requires update # Remove schemaspace to test raw creation and confirm perms _remove_schemaspace(store_manager, schemaspace_location) @@ -744,9 +739,6 @@ def test_store_delete_instance(store_manager, schemaspace_location): assert not os.path.exists(metadata_file) -# ########################## SchemaManager Tests ########################### -# TODO - add tests for SchemaManagr, Schemaspaces, and SchemaProviders - # ########################## Error Tests ########################### def test_error_metadata_not_found(): schemaspace = METADATA_TEST_SCHEMASPACE diff --git a/elyra/tests/metadata/test_utils.py b/elyra/tests/metadata/test_utils.py index 118081f0a..c79fff153 100644 --- a/elyra/tests/metadata/test_utils.py +++ b/elyra/tests/metadata/test_utils.py @@ -45,8 +45,7 @@ 'uri_test': 'http://localhost:31823/v1/models?version=2017-02-13', 'number_range_test': 8, 'required_test': "required_value" - }, - "version": 0 + } } valid_metadata2_json = { diff --git a/elyra/tests/pipeline/airflow/test_component_parser_airflow.py b/elyra/tests/pipeline/airflow/test_component_parser_airflow.py index 27653e982..952344d81 100644 --- a/elyra/tests/pipeline/airflow/test_component_parser_airflow.py +++ b/elyra/tests/pipeline/airflow/test_component_parser_airflow.py @@ -68,7 +68,7 @@ def test_modify_component_registries(): instance_metadata = { "description": "A test registry", - "runtime": "airflow", + "runtime_type": "APACHE_AIRFLOW", "categories": ["New Components"], "paths": urls } @@ -137,7 +137,7 @@ def test_directory_based_component_registry(): registry_path = _get_resource_path('') instance_metadata = { "description": "A test registry", - "runtime": "airflow", + "runtime_type": "APACHE_AIRFLOW", "categories": ["New Components"], "paths": [registry_path] } diff --git a/elyra/tests/pipeline/airflow/test_processor_airflow.py b/elyra/tests/pipeline/airflow/test_processor_airflow.py index 6f154d9cc..59becf151 100644 --- a/elyra/tests/pipeline/airflow/test_processor_airflow.py +++ b/elyra/tests/pipeline/airflow/test_processor_airflow.py @@ -26,6 +26,7 @@ from elyra.pipeline.airflow.processor_airflow import AirflowPipelineProcessor from elyra.pipeline.parser import PipelineParser from elyra.pipeline.pipeline import GenericOperation +from elyra.pipeline.runtime_type import RuntimeProcessorType from elyra.tests.pipeline.test_pipeline_parser import _read_pipeline_resource from elyra.util import git @@ -116,12 +117,12 @@ def string_to_list(stringed_list): def test_processor_type(processor): - assert processor.type == "airflow" + assert processor.type == RuntimeProcessorType.APACHE_AIRFLOW def test_fail_processor_type(processor): with pytest.raises(Exception): - assert processor.type == "kfp" + assert processor.type == RuntimeProcessorType.KUBEFLOW_PIPELINES @pytest.mark.parametrize('parsed_pipeline', [PIPELINE_FILE_COMPLEX], indirect=True) diff --git a/elyra/tests/pipeline/kfp/test_component_parser_kfp.py b/elyra/tests/pipeline/kfp/test_component_parser_kfp.py index 4fbfe527c..91e9d4933 100644 --- a/elyra/tests/pipeline/kfp/test_component_parser_kfp.py +++ b/elyra/tests/pipeline/kfp/test_component_parser_kfp.py @@ -61,7 +61,7 @@ def test_modify_component_registries(): instance_metadata = { "description": "A test registry", - "runtime": "kfp", + "runtime_type": "KUBEFLOW_PIPELINES", "categories": ["New Components"], "paths": paths } @@ -127,7 +127,7 @@ def test_directory_based_component_registry(): registry_path = _get_resource_path('') instance_metadata = { "description": "A test registry", - "runtime": "kfp", + "runtime_type": "KUBEFLOW_PIPELINES", "categories": ["New Components"], "paths": [registry_path] } diff --git a/elyra/tests/pipeline/resources/validation_pipelines/generic_invalid_node_property_structure.pipeline b/elyra/tests/pipeline/resources/validation_pipelines/generic_invalid_node_property_structure.pipeline index b0f1d0289..6e63cbd46 100644 --- a/elyra/tests/pipeline/resources/validation_pipelines/generic_invalid_node_property_structure.pipeline +++ b/elyra/tests/pipeline/resources/validation_pipelines/generic_invalid_node_property_structure.pipeline @@ -369,9 +369,9 @@ "comments": [] }, "version": 5, + "runtime_type": "GENERIC", "properties": { - "name": "generic_invalid_node_property_structure", - "runtime": "Generic" + "name": "generic_invalid_node_property_structure" } }, "runtime_ref": "" diff --git a/elyra/tests/pipeline/resources/validation_pipelines/kf_inputpath_parameter.pipeline b/elyra/tests/pipeline/resources/validation_pipelines/kf_inputpath_parameter.pipeline index e86bb47ef..ca2221094 100644 --- a/elyra/tests/pipeline/resources/validation_pipelines/kf_inputpath_parameter.pipeline +++ b/elyra/tests/pipeline/resources/validation_pipelines/kf_inputpath_parameter.pipeline @@ -183,9 +183,9 @@ }, "version": 5, "runtime": "kfp", + "runtime_type": "KUBEFLOW_PIPELINES", "properties": { - "name": "kf_inputpath_parameter", - "runtime": "Kubeflow Pipelines" + "name": "kf_inputpath_parameter" } }, "runtime_ref": "" diff --git a/elyra/tests/pipeline/resources/validation_pipelines/kf_invalid_inputpath_missing_connection.pipeline b/elyra/tests/pipeline/resources/validation_pipelines/kf_invalid_inputpath_missing_connection.pipeline index 75c993cc2..46d98aa1c 100644 --- a/elyra/tests/pipeline/resources/validation_pipelines/kf_invalid_inputpath_missing_connection.pipeline +++ b/elyra/tests/pipeline/resources/validation_pipelines/kf_invalid_inputpath_missing_connection.pipeline @@ -174,9 +174,9 @@ }, "version": 5, "runtime": "kfp", + "runtime_type": "KUBEFLOW_PIPELINES", "properties": { - "name": "kf_invalid_inputpath_missing_connection", - "runtime": "Kubeflow Pipelines" + "name": "kf_invalid_inputpath_missing_connection" } }, "runtime_ref": "" diff --git a/elyra/tests/pipeline/resources/validation_pipelines/kf_invalid_inputpath_parameter.pipeline b/elyra/tests/pipeline/resources/validation_pipelines/kf_invalid_inputpath_parameter.pipeline index af12b22c7..a781dab36 100644 --- a/elyra/tests/pipeline/resources/validation_pipelines/kf_invalid_inputpath_parameter.pipeline +++ b/elyra/tests/pipeline/resources/validation_pipelines/kf_invalid_inputpath_parameter.pipeline @@ -182,9 +182,9 @@ }, "version": 5, "runtime": "kfp", + "runtime_type": "KUBEFLOW_PIPELINES", "properties": { - "name": "kf_invalid_inputpath_parameter", - "runtime": "Kubeflow Pipelines" + "name": "kf_invalid_inputpath_parameter" } }, "runtime_ref": "" diff --git a/elyra/tests/pipeline/test_validation.py b/elyra/tests/pipeline/test_validation.py index 109bdae5e..7c0fdc04d 100644 --- a/elyra/tests/pipeline/test_validation.py +++ b/elyra/tests/pipeline/test_validation.py @@ -107,7 +107,7 @@ async def test_invalid_runtime_node_kubeflow(validation_manager, load_pipeline): pipeline_definition = PipelineDefinition(pipeline_definition=pipeline) await validation_manager._validate_compatibility(pipeline_definition=pipeline_definition, response=response, - pipeline_type='kfp', + pipeline_type='KUBEFLOW_PIPELINES', pipeline_runtime='kfp') issues = response.to_json().get('issues') @@ -125,9 +125,10 @@ async def test_invalid_runtime_node_kubeflow_with_supernode(validation_manager, pipeline_definition = PipelineDefinition(pipeline_definition=pipeline) await validation_manager._validate_compatibility(pipeline_definition=pipeline_definition, response=response, - pipeline_type='kfp', + pipeline_type='KUBEFLOW_PIPELINES', pipeline_runtime='kfp') issues = response.to_json().get('issues') + print(issues) assert len(issues) == 1 assert issues[0]['severity'] == 1 assert issues[0]['type'] == 'invalidNodeType' @@ -141,7 +142,7 @@ async def test_invalid_pipeline_runtime_with_kubeflow_execution(validation_manag pipeline_definition = PipelineDefinition(pipeline_definition=pipeline) await validation_manager._validate_compatibility(pipeline_definition=pipeline_definition, response=response, - pipeline_type='airflow', + pipeline_type='APACHE_AIRFLOW', pipeline_runtime='kfp') issues = response.to_json().get('issues') assert len(issues) == 1 @@ -155,13 +156,13 @@ async def test_invalid_pipeline_runtime_with_local_execution(validation_manager, pipeline_definition = PipelineDefinition(pipeline_definition=pipeline) await validation_manager._validate_compatibility(pipeline_definition=pipeline_definition, response=response, - pipeline_type='airflow', + pipeline_type='APACHE_AIRFLOW', pipeline_runtime='local') issues = response.to_json().get('issues') assert len(issues) == 1 assert issues[0]['severity'] == 1 assert issues[0]['type'] == 'invalidRuntime' - assert issues[0]['data']['pipelineType'] == 'airflow' + assert issues[0]['data']['pipelineType'] == 'APACHE_AIRFLOW' async def test_invalid_node_op_with_airflow(validation_manager, load_pipeline): @@ -171,7 +172,7 @@ async def test_invalid_node_op_with_airflow(validation_manager, load_pipeline): pipeline_definition = PipelineDefinition(pipeline_definition=pipeline) await validation_manager._validate_compatibility(pipeline_definition=pipeline_definition, response=response, - pipeline_type='airflow', + pipeline_type='APACHE_AIRFLOW', pipeline_runtime='airflow') issues = response.to_json().get('issues') assert len(issues) == 1 @@ -195,7 +196,7 @@ async def test_invalid_node_property_structure(monkeypatch, load_pipeline): pipeline_definition = PipelineDefinition(pipeline_definition=pipeline) await pvm._validate_node_properties(pipeline_definition=pipeline_definition, response=response, - pipeline_type='generic', + pipeline_type='GENERIC', pipeline_runtime='kfp') issues = response.to_json().get('issues') @@ -217,7 +218,7 @@ async def test_missing_node_property_for_kubeflow_pipeline(monkeypatch, load_pip pipeline_definition = PipelineDefinition(pipeline_definition=pipeline) await pvm._validate_node_properties(pipeline_definition=pipeline_definition, response=response, - pipeline_type='kfp', + pipeline_type='KUBEFLOW_PIPELINES', pipeline_runtime='kfp') issues = response.to_json().get('issues') @@ -332,7 +333,7 @@ async def test_valid_node_property_pipeline_filepath(monkeypatch, validation_man pipeline_definition = PipelineDefinition(pipeline_definition=pipeline) await validation_manager._validate_node_properties(pipeline_definition=pipeline_definition, response=response, - pipeline_type='generic', + pipeline_type='GENERIC', pipeline_runtime='kfp') assert not response.has_fatal @@ -517,7 +518,7 @@ async def test_pipeline_kfp_inputpath_parameter(validation_manager, load_pipelin pipeline_definition = PipelineDefinition(pipeline_definition=pipeline) await validation_manager._validate_node_properties(pipeline_definition=pipeline_definition, response=response, - pipeline_type='kfp', + pipeline_type='KUBEFLOW_PIPELINES', pipeline_runtime='kfp') issues = response.to_json().get('issues') @@ -531,7 +532,7 @@ async def test_pipeline_invalid_kfp_inputpath_parameter(validation_manager, load pipeline_definition = PipelineDefinition(pipeline_definition=pipeline) await validation_manager._validate_node_properties(pipeline_definition=pipeline_definition, response=response, - pipeline_type='kfp', + pipeline_type='KUBEFLOW_PIPELINES', pipeline_runtime='kfp') issues = response.to_json().get('issues') @@ -551,7 +552,7 @@ async def test_pipeline_invalid_kfp_inputpath_missing_connection(validation_mana pipeline_definition = PipelineDefinition(pipeline_definition=pipeline) await validation_manager._validate_node_properties(pipeline_definition=pipeline_definition, response=response, - pipeline_type='kfp', + pipeline_type='KUBEFLOW_PIPELINES', pipeline_runtime='kfp') issues = response.to_json().get('issues') diff --git a/etc/config/metadata/component-registries/elyra-airflow-filesystem-preconfig.json b/etc/config/metadata/component-registries/elyra-airflow-filesystem-preconfig.json index cba8576bd..4d44f0758 100644 --- a/etc/config/metadata/component-registries/elyra-airflow-filesystem-preconfig.json +++ b/etc/config/metadata/component-registries/elyra-airflow-filesystem-preconfig.json @@ -2,7 +2,7 @@ "display_name": "Airflow Preloaded Components - Filesystem", "metadata": { "description": "Preloaded filesystem-based components that are supported by Apache Airflow", - "runtime": "airflow", + "runtime_type": "APACHE_AIRFLOW", "categories": ["Preloaded Airflow"], "paths": ["airflow/slack_operator.py"] }, diff --git a/etc/config/metadata/component-registries/elyra-airflow-url-preconfig.json b/etc/config/metadata/component-registries/elyra-airflow-url-preconfig.json index 947c0b513..c74d00651 100644 --- a/etc/config/metadata/component-registries/elyra-airflow-url-preconfig.json +++ b/etc/config/metadata/component-registries/elyra-airflow-url-preconfig.json @@ -2,7 +2,7 @@ "display_name": "Airflow Preloaded Components - URL", "metadata": { "description": "Preloaded URL-based components that are supported by Apache Airflow", - "runtime": "airflow", + "runtime_type": "APACHE_AIRFLOW", "categories": ["Preloaded Airflow"], "paths": [ "https://raw.githubusercontent.com/apache/airflow/1.10.15/airflow/operators/bash_operator.py", diff --git a/etc/config/metadata/component-registries/elyra-kfp-directory-preconfig.json b/etc/config/metadata/component-registries/elyra-kfp-directory-preconfig.json index e18451cc8..69f4b60a2 100644 --- a/etc/config/metadata/component-registries/elyra-kfp-directory-preconfig.json +++ b/etc/config/metadata/component-registries/elyra-kfp-directory-preconfig.json @@ -2,7 +2,7 @@ "display_name": "KFP Preloaded Components - Directory", "metadata": { "description": "Preloaded directory-based components that are supported by Kubeflow Pipelines", - "runtime": "kfp", + "runtime_type": "KUBEFLOW_PIPELINES", "categories": ["Preloaded KFP"], "paths": ["kfp"] }, diff --git a/etc/config/metadata/component-registries/elyra-kfp-url-preconfig.json b/etc/config/metadata/component-registries/elyra-kfp-url-preconfig.json index 0d783948c..2b1b29962 100644 --- a/etc/config/metadata/component-registries/elyra-kfp-url-preconfig.json +++ b/etc/config/metadata/component-registries/elyra-kfp-url-preconfig.json @@ -2,7 +2,7 @@ "display_name": "KFP Preloaded Components - URL", "metadata": { "description": "Preloaded URL-based components that are supported by Kubeflow Pipelines", - "runtime": "kfp", + "runtime_type": "KUBEFLOW_PIPELINES", "categories": ["Preloaded KFP"], "paths": [ "https://raw.githubusercontent.com/kubeflow/pipelines/1.6.0/components/web/Download/component.yaml", diff --git a/packages/pipeline-editor/src/FileSubmissionDialog.tsx b/packages/pipeline-editor/src/FileSubmissionDialog.tsx index d927abcb7..3245512cf 100644 --- a/packages/pipeline-editor/src/FileSubmissionDialog.tsx +++ b/packages/pipeline-editor/src/FileSubmissionDialog.tsx @@ -148,7 +148,7 @@ export class FileSubmissionDialog extends React.Component { > {validSchemas.map(schema => ( ))} diff --git a/packages/pipeline-editor/src/PipelineEditorWidget.tsx b/packages/pipeline-editor/src/PipelineEditorWidget.tsx index e1c134dd1..f4d37a5df 100644 --- a/packages/pipeline-editor/src/PipelineEditorWidget.tsx +++ b/packages/pipeline-editor/src/PipelineEditorWidget.tsx @@ -116,6 +116,14 @@ const getRuntimeDisplayName = ( return schema?.display_name; }; +const getRuntimeTypeFromSchema = ( + schemas: { name: string; runtime_type: string }[] | undefined, + runtime: string | undefined +): string | undefined => { + const schema = schemas?.find(s => s.name === runtime); + return schema?.runtime_type; +}; + class PipelineEditorWidget extends ReactWidget { browserFactory: IFileBrowserFactory; shell: ILabShell; @@ -187,6 +195,11 @@ const PipelineWrapper: React.FC = ({ pipelineRuntimeName ); + const pipelineRuntimeTypeFromSchema = getRuntimeTypeFromSchema( + runtimesSchema, + pipelineRuntimeName + ); + useEffect(() => { if (runtimeImages?.length === 0) { RequestErrors.noMetadataError('runtime image'); @@ -255,9 +268,9 @@ const PipelineWrapper: React.FC = ({ pipeline_path, PathExt.extname(pipeline_path) ); + pipelineJson.pipelines[0].app_data.runtime_type = + pipelineRuntimeTypeFromSchema ?? 'Generic'; pipelineJson.pipelines[0].app_data.properties.name = pipeline_name; - pipelineJson.pipelines[0].app_data.properties.runtime = - pipelineRuntimeDisplayName ?? 'Generic'; } setPipeline(pipelineJson); setLoading(false); @@ -269,7 +282,7 @@ const PipelineWrapper: React.FC = ({ return (): void => { currentContext.model.contentChanged.disconnect(changeHandler); }; - }, [pipelineRuntimeDisplayName, runtimeImages]); + }, [pipelineRuntimeTypeFromSchema, runtimeImages]); const onChange = useCallback( (pipelineJson: any): void => { @@ -601,6 +614,10 @@ const PipelineWrapper: React.FC = ({ const runtime_config = dialogResult.value.runtime_config; const runtime = PipelineService.getRuntimeName(runtime_config, runtimes); + const runtime_type = PipelineService.getRuntimeType( + runtime_config, + runtimes + ); PipelineService.setNodePathsRelativeToWorkspace( pipelineJson.pipelines[0], @@ -611,6 +628,7 @@ const PipelineWrapper: React.FC = ({ pipelineJson.pipelines[0].app_data.name = pipeline_name; pipelineJson.pipelines[0].app_data.runtime = runtime; + pipelineJson.pipelines[0].app_data['runtime_type'] = runtime_type; pipelineJson.pipelines[0].app_data['runtime-config'] = runtime_config; pipelineJson.pipelines[0].app_data.source = PathExt.basename( contextRef.current.path @@ -686,13 +704,15 @@ const PipelineWrapper: React.FC = ({ const localRuntime: IRuntime = { name: 'local', display_name: 'Run in-place locally', - schema_name: 'local' + schema_name: 'local', + runtime_type: 'Generic' }; runtimes.unshift(JSON.parse(JSON.stringify(localRuntime))); const localSchema: ISchema = { name: 'local', - display_name: 'Local Runtime' + display_name: 'Local Runtime', + runtime_type: 'Generic' }; schema.unshift(JSON.parse(JSON.stringify(localSchema))); @@ -747,6 +767,8 @@ const PipelineWrapper: React.FC = ({ const runtime_config = dialogResult.value.runtime_config; const runtime = PipelineService.getRuntimeName(runtime_config, runtimes) || 'local'; + const runtime_type = + PipelineService.getRuntimeType(runtime_config, runtimes) || 'LOCAL'; PipelineService.setNodePathsRelativeToWorkspace( pipelineJson.pipelines[0], @@ -758,6 +780,7 @@ const PipelineWrapper: React.FC = ({ pipelineJson.pipelines[0]['app_data']['name'] = dialogResult.value.pipeline_name; pipelineJson.pipelines[0]['app_data']['runtime'] = runtime; + pipelineJson.pipelines[0]['app_data']['runtime_type'] = runtime_type; pipelineJson.pipelines[0]['app_data']['runtime-config'] = runtime_config; pipelineJson.pipelines[0]['app_data']['source'] = PathExt.basename( contextRef.current.path @@ -933,16 +956,16 @@ const PipelineWrapper: React.FC = ({ rightBar: [ { action: '', - label: `Runtime: ${pipelineRuntimeDisplayName ?? 'Generic'}`, + label: `Runtime: ${pipelineRuntimeTypeFromSchema ?? 'GENERIC'}`, incLabelWithIcon: 'before', enable: false, kind: 'tertiary', iconEnabled: IconUtil.encode( - pipelineRuntimeName === 'kfp' + pipelineRuntimeTypeFromSchema === 'KUBEFLOW_PIPELINES' ? kubeflowIcon - : pipelineRuntimeName === 'airflow' + : pipelineRuntimeTypeFromSchema === 'APACHE_AIRFLOW' ? airflowIcon - : pipelineRuntimeName === 'argo' + : pipelineRuntimeTypeFromSchema === 'ARGO' ? argoIcon : pipelineIcon ) diff --git a/packages/pipeline-editor/src/PipelineExportDialog.tsx b/packages/pipeline-editor/src/PipelineExportDialog.tsx index dae99bce3..eee4add08 100644 --- a/packages/pipeline-editor/src/PipelineExportDialog.tsx +++ b/packages/pipeline-editor/src/PipelineExportDialog.tsx @@ -23,6 +23,7 @@ import { PipelineService } from './PipelineService'; +// TODO - these (xxx_FILE_TYPES) should eventually come from processor implementations const KFP_FILE_TYPES = [ { label: 'KFP domain-specific language Python code', key: 'py' }, { label: 'KFP static configuration file (YAML formatted)', key: 'yaml' } @@ -68,6 +69,7 @@ export class PipelineExportDialog extends React.Component { return filteredRuntimeOptions; }; + // TODO - this needs updating for BYO runtimes updateFileTypeOptions = ( platformSelection: string ): Record[] => { @@ -120,7 +122,7 @@ export class PipelineExportDialog extends React.Component { > {validSchemas.map(schema => ( ))} diff --git a/packages/pipeline-editor/src/PipelineService.tsx b/packages/pipeline-editor/src/PipelineService.tsx index 0371fe867..12a6f6df2 100644 --- a/packages/pipeline-editor/src/PipelineService.tsx +++ b/packages/pipeline-editor/src/PipelineService.tsx @@ -31,11 +31,13 @@ export interface IRuntime { name: string; display_name: string; schema_name: string; + runtime_type: string; } export interface ISchema { name: string; display_name: string; + runtime_type: string; } enum ContentType { @@ -177,6 +179,17 @@ export class PipelineService { return metadataArr.find(r => r['name'] === name)?.['schema_name']; } + /** + * The runtime type is found from the runtime (named) schema + * @param name + * @param metadataArr + */ + static getRuntimeType(name: string, metadataArr: IDictionary[]): string { + return metadataArr.find(r => r['name'] === name)?.metadata?.[ + 'runtime_type' + ]; + } + /** * Creates a Dialog for passing to makeServerRequest */ @@ -213,7 +226,7 @@ export class PipelineService { dialogTitle = 'Job submission to ' + runtimeName + ' succeeded'; dialogBody = (

- {response['platform'] == 'airflow' ? ( + {response['platform'] == 'APACHE_AIRFLOW' ? (

Apache Airflow DAG has been pushed to the{' '} { > {validSchemas.map(schema => ( ))} diff --git a/packages/pipeline-editor/src/index.ts b/packages/pipeline-editor/src/index.ts index 6787cc062..abf696438 100644 --- a/packages/pipeline-editor/src/index.ts +++ b/packages/pipeline-editor/src/index.ts @@ -174,7 +174,7 @@ const extension: JupyterFrontEndPlugin = { if (args['isPalette']) { return undefined; } else { - return getRuntimeIcon(args.runtime?.name); + return getRuntimeIcon(args.runtime?.runtime_type); } }, execute: (args: any) => { @@ -202,7 +202,8 @@ const extension: JupyterFrontEndPlugin = { comments: [] }, version: PIPELINE_CURRENT_VERSION, - runtime: args.runtime?.name + runtime: args.runtime?.name, + runtime_type: args.runtime?.runtime_type }, runtime_ref: '' } @@ -247,7 +248,11 @@ const extension: JupyterFrontEndPlugin = { category: 'Elyra', args: { runtime }, rank: - runtime.name === 'kfp' ? 2 : runtime.name === 'airflow' ? 3 : 4 + runtime.runtime_type === 'KUBEFLOW_PIPELINES' + ? 2 + : runtime.runtime_type === 'APACHE_AIRFLOW' + ? 3 + : 4 }); menu.fileMenu.newMenu.addGroup( [ @@ -256,7 +261,11 @@ const extension: JupyterFrontEndPlugin = { args: { runtime, isMenu: true } } ], - runtime.name === 'kfp' ? 31 : runtime.name === 'airflow' ? 32 : 33 + runtime.runtime_type === 'KUBEFLOW_PIPELINES' + ? 31 + : runtime.runtime_type === 'APACHE_AIRFLOW' + ? 32 + : 33 ); } } diff --git a/packages/pipeline-editor/src/pipeline-hooks.ts b/packages/pipeline-editor/src/pipeline-hooks.ts index 46d3e0a03..f076a8ec8 100644 --- a/packages/pipeline-editor/src/pipeline-hooks.ts +++ b/packages/pipeline-editor/src/pipeline-hooks.ts @@ -86,6 +86,7 @@ export interface IRuntimeComponent { op: string; id: string; label: string; + runtime_type?: string; type: 'execution_node'; inputs: { app_data: any }[]; outputs: { app_data: any }[]; @@ -176,15 +177,18 @@ const componentFetcher = async (runtime: string): Promise => { // inject properties for (const category of palette.categories) { - // TODO: The server will provide this in a later release - switch (category.id) { - case 'kfp': + // Use the runtime_type from the first node of the category to determine category + // icon. TODO: Ideally, this would be included in the category. + const category_runtime_type = + category.node_types?.[0]?.runtime_type ?? 'GENERIC'; + switch (category_runtime_type) { + case 'KUBEFLOW_PIPELINES': category.image = IconUtil.encode(kubeflowIcon); break; - case 'airflow': + case 'APACHE_AIRFLOW': category.image = IconUtil.encode(airflowIcon); break; - case 'argo': + case 'ARGO': category.image = IconUtil.encode(argoIcon); break; default: @@ -221,10 +225,10 @@ const NodeIcons: Map = new Map([ ] ]); -export const getRuntimeIcon = (runtime?: string): LabIcon => { +export const getRuntimeIcon = (runtime_type?: string): LabIcon => { const runtimeIcons = [kubeflowIcon, airflowIcon, argoIcon]; for (const runtimeIcon of runtimeIcons) { - if (`elyra:${runtime}` === runtimeIcon.name) { + if (`elyra:${runtime_type}` === runtimeIcon.name) { return runtimeIcon; } } @@ -249,7 +253,7 @@ export const usePalette = (pipelineRuntime = 'local'): IReturn => { if (nodeIcon === undefined || nodeIcon === '') { nodeIcon = 'data:image/svg+xml;utf8,' + - encodeURIComponent(getRuntimeIcon(pipelineRuntime).svgstr); + encodeURIComponent(getRuntimeIcon(node.runtime_type).svgstr); } // Not sure which is needed... diff --git a/packages/pipeline-editor/src/pipeline-template.json b/packages/pipeline-editor/src/pipeline-template.json index 8489e9907..741ff6c7e 100644 --- a/packages/pipeline-editor/src/pipeline-template.json +++ b/packages/pipeline-editor/src/pipeline-template.json @@ -30,6 +30,7 @@ "app_data": { "name": "{{name}}", "runtime": "{{runtime}}", + "runtime_type": "{{runtime_type}}", "runtime-config": "{{runtime-config}}", "version": "{{version}}", "ui_data": { diff --git a/packages/pipeline-editor/src/utils.ts b/packages/pipeline-editor/src/utils.ts index bf645b2ca..4567b1482 100644 --- a/packages/pipeline-editor/src/utils.ts +++ b/packages/pipeline-editor/src/utils.ts @@ -68,12 +68,12 @@ export default class Utils { template.pipelines[0].app_data.name = artifactName; template.pipelines[0].app_data.runtime = runtime_platform; + template.pipelines[0].app_data.runtime_type = 'Generic'; // These are always generic template.pipelines[0].app_data['runtime-config'] = runtime_config; template.pipelines[0].app_data.version = PIPELINE_CURRENT_VERSION; template.pipelines[0].app_data.source = PathExt.basename(filename); template.pipelines[0].app_data['properties'] = {}; template.pipelines[0].app_data['properties']['name'] = 'generic'; - template.pipelines[0].app_data['properties']['runtime'] = 'Generic'; return template; } diff --git a/packages/services/src/test/application.spec.ts b/packages/services/src/test/application.spec.ts index bdf18e90f..08725f346 100644 --- a/packages/services/src/test/application.spec.ts +++ b/packages/services/src/test/application.spec.ts @@ -29,8 +29,7 @@ const codeSnippetMetadata = { metadata: { language: 'Python', code: ['hello_world'] - }, - version: 0 + } }; beforeAll(async () => { @@ -58,12 +57,7 @@ describe('@elyra/services', () => { const schemaNames = schemas.map((schema: any) => { return schema.name; }); - const knownSchemaNames = [ - 'code-snippet', - 'runtime-image', - 'kfp', - 'airflow' - ]; + const knownSchemaNames = ['code-snippet', 'runtime-image']; for (const schemaName of knownSchemaNames) { expect(schemaNames).toContain(schemaName); } diff --git a/packages/ui-components/src/icons.tsx b/packages/ui-components/src/icons.tsx index c6c35f27c..5b65ddfe3 100644 --- a/packages/ui-components/src/icons.tsx +++ b/packages/ui-components/src/icons.tsx @@ -42,11 +42,11 @@ export const importIcon = new LabIcon({ svgstr: importSvg }); export const airflowIcon = new LabIcon({ - name: 'elyra:airflow', + name: 'elyra:APACHE_AIRFLOW', svgstr: airflowSvg }); export const argoIcon = new LabIcon({ - name: 'elyra:argo', + name: 'elyra:ARGO', svgstr: argoSvg }); export const codeSnippetIcon = new LabIcon({ @@ -58,7 +58,7 @@ export const dragDropIcon = new LabIcon({ svgstr: dragDropSvg }); export const kubeflowIcon = new LabIcon({ - name: 'elyra:kfp', + name: 'elyra:KUBEFLOW_PIPELINES', svgstr: kubeflowSvg }); export const elyraIcon = new LabIcon({ name: 'elyra:elyra', svgstr: elyraSvg }); diff --git a/tests/assets/helloworld.pipeline b/tests/assets/helloworld.pipeline index 017d5682b..0f8499afc 100644 --- a/tests/assets/helloworld.pipeline +++ b/tests/assets/helloworld.pipeline @@ -121,9 +121,9 @@ "comments": [] }, "version": 5, + "runtime_type": "Generic", "properties": { - "name": "helloworld", - "runtime": "Generic" + "name": "helloworld" } }, "runtime_ref": "" diff --git a/tests/assets/invalid.pipeline b/tests/assets/invalid.pipeline index f49aa5a2c..07541bdc0 100644 --- a/tests/assets/invalid.pipeline +++ b/tests/assets/invalid.pipeline @@ -136,9 +136,9 @@ "comments": [] }, "version": 5, + "runtime_type": "Generic", "properties": { - "name": "invalid", - "runtime": "Generic" + "name": "invalid" } }, "runtime_ref": "" diff --git a/tests/assets/pipelines/producer-consumer.pipeline b/tests/assets/pipelines/producer-consumer.pipeline index b3ded772b..d14e404f3 100644 --- a/tests/assets/pipelines/producer-consumer.pipeline +++ b/tests/assets/pipelines/producer-consumer.pipeline @@ -310,9 +310,9 @@ "comments": [] }, "version": 5, + "runtime_type": "Generic", "properties": { - "name": "producer-consumer", - "runtime": "Generic" + "name": "producer-consumer" } }, "runtime_ref": "" diff --git a/tests/snapshots/pipeline-editor-tests/matches-complex-pipeline-snapshot.1.snap b/tests/snapshots/pipeline-editor-tests/matches-complex-pipeline-snapshot.1.snap index ec211aad6..302864a36 100644 --- a/tests/snapshots/pipeline-editor-tests/matches-complex-pipeline-snapshot.1.snap +++ b/tests/snapshots/pipeline-editor-tests/matches-complex-pipeline-snapshot.1.snap @@ -8,8 +8,8 @@ Object { "app_data": Object { "properties": Object { "name": "complex", - "runtime": "Generic", }, + "runtime_type": "Generic", "ui_data": Object { "comments": Array [], }, diff --git a/tests/snapshots/pipeline-editor-tests/matches-empty-pipeline-snapshot.1.snap b/tests/snapshots/pipeline-editor-tests/matches-empty-pipeline-snapshot.1.snap index ebc9cddf5..b3bd26e3e 100644 --- a/tests/snapshots/pipeline-editor-tests/matches-empty-pipeline-snapshot.1.snap +++ b/tests/snapshots/pipeline-editor-tests/matches-empty-pipeline-snapshot.1.snap @@ -8,8 +8,8 @@ Object { "app_data": Object { "properties": Object { "name": "empty", - "runtime": "Generic", }, + "runtime_type": "Generic", "ui_data": Object { "comments": Array [], }, diff --git a/tests/snapshots/pipeline-editor-tests/matches-simple-pipeline-snapshot.1.snap b/tests/snapshots/pipeline-editor-tests/matches-simple-pipeline-snapshot.1.snap index 456a6fa76..3eafed911 100644 --- a/tests/snapshots/pipeline-editor-tests/matches-simple-pipeline-snapshot.1.snap +++ b/tests/snapshots/pipeline-editor-tests/matches-simple-pipeline-snapshot.1.snap @@ -8,8 +8,8 @@ Object { "app_data": Object { "properties": Object { "name": "simple", - "runtime": "Generic", }, + "runtime_type": "Generic", "ui_data": Object { "comments": Array [], },