diff --git a/README.md b/README.md index 0dd388e..8f8b6ad 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,32 @@ After the last executor, the `filer` is called once more to process the outputs and push them to remote locations from the PVC. The PVC is the scrubbed, deleted and the taskmaster ends, completing the task. +┌─────────────────────────────────────────────────────────┐ +│ Kubernetes │ +│ │ +│ ┌────────────────────────────┐ ┌───────────────────┐ │ +│ │ Secret: ftp-secret │ │ ConfigMap/PVC │ │ +│ │ - username │ │ - JSON_INPUT.gz │ │ +│ │ - password │ │ │ │ +│ └──────────▲─────────────────┘ └───────▲───────────┘ │ +│ │ | │ +│ │ | │ +│ │ | │ +│ ┌─────────┴────────────────────────────┴────────────┐ │ +│ │ Job: taskmaster │ │ +│ │ ┌───────────────────────────────────────────────┐ │ │ +│ │ │ Pod: taskmaster │ │ │ +│ │ │ - Container: taskmaster │ │ │ +│ │ │ - Env: TESK_FTP_USERNAME │ │ │ +│ │ │ - Env: TESK_FTP_PASSWORD │ │ │ +│ │ │ - Args: -f /jsoninput/JSON_INPUT.gz │ │ │ +│ │ │ - Mounts: /podinfo │ │ │ +│ │ │ /jsoninput │ │ │ +│ │ └───────────────────────────────────────────────┘ │ │ +│ └───────────────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────┘ + ## Requirements - A working [Kubernetes](https://kubernetes.io/) cluster version 1.9 and later. diff --git a/compressed_data b/compressed_data new file mode 100644 index 0000000..134cf15 --- /dev/null +++ b/compressed_data @@ -0,0 +1 @@ +{"inputs": ["{\"name\": null, \"description\": null, \"url\": \"s3://my-object-store/file1\", \"path\": \"/data/file1\", \"type\": \"FILE\", \"content\": null, \"streamable\": null}"], "outputs": ["{\"name\": null, \"description\": null, \"url\": \"s3://my-object-store/outfile-1\", \"path\": \"/data/outfile\", \"path_prefix\": null, \"type\": \"FILE\"}"], "volumes": ["/vol/A/"], "resources": {"disk_gb": 40.0}, "executors": [{"api_version": "batch/v1", "kind": "Job", "metadata": {"annotations": {"tes-task-name": "string"}, "creation_timestamp": null, "deletion_grace_period_seconds": null, "deletion_timestamp": null, "finalizers": null, "generate_name": null, "generation": null, "labels": {"job-type": "executor", "taskmaster-name": "task-fef96900-6da7-4672-a1ec-14bf3d720c8f", "executor-no": "0"}, "managed_fields": null, "name": "newname", "namespace": null, "owner_references": null, "resource_version": null, "self_link": null, "uid": null}, "spec": {"active_deadline_seconds": null, "backoff_limit": null, "backoff_limit_per_index": null, "completion_mode": null, "completions": null, "manual_selector": null, "max_failed_indexes": null, "parallelism": null, "pod_failure_policy": null, "pod_replacement_policy": null, "selector": null, "suspend": null, "template": {"metadata": {"annotations": null, "creation_timestamp": null, "deletion_grace_period_seconds": null, "deletion_timestamp": null, "finalizers": null, "generate_name": null, "generation": null, "labels": null, "managed_fields": null, "name": "newname", "namespace": null, "owner_references": null, "resource_version": null, "self_link": null, "uid": null}, "spec": {"active_deadline_seconds": null, "affinity": null, "automount_service_account_token": null, "containers": [{"args": null, "command": ["/bin/sh", "-c", " < /data/file1 > /tmp/stdout.log 2> /tmp/stderr.log"], "env": [{"name": "BLASTDB", "value": "/data/GRC38", "value_from": null}, {"name": "HMMERDB", "value": "/data/hmmer", "value_from": null}], "env_from": null, "image": null, "image_pull_policy": null, "lifecycle": null, "liveness_probe": null, "name": "newname", "ports": null, "readiness_probe": null, "resize_policy": null, "resources": {"claims": null, "limits": null, "requests": {"cpu": 4.0, "memory": 8589934592.0}}, "restart_policy": null, "security_context": null, "startup_probe": null, "stdin": null, "stdin_once": null, "termination_message_path": null, "termination_message_policy": null, "tty": null, "volume_devices": null, "volume_mounts": null, "working_dir": "/data/"}], "dns_config": null, "dns_policy": null, "enable_service_links": null, "ephemeral_containers": null, "host_aliases": null, "host_ipc": null, "host_network": null, "host_pid": null, "host_users": null, "hostname": null, "image_pull_secrets": null, "init_containers": null, "node_name": null, "node_selector": null, "os": null, "overhead": null, "preemption_policy": null, "priority": null, "priority_class_name": null, "readiness_gates": null, "resource_claims": null, "restart_policy": "Never", "runtime_class_name": null, "scheduler_name": null, "scheduling_gates": null, "security_context": null, "service_account": null, "service_account_name": null, "set_hostname_as_fqdn": null, "share_process_namespace": null, "subdomain": null, "termination_grace_period_seconds": null, "tolerations": null, "topology_spread_constraints": null, "volumes": null}}, "ttl_seconds_after_finished": null}, "status": null}]} \ No newline at end of file diff --git a/deployment/charts/tesk/values.yaml b/deployment/charts/tesk/values.yaml index dae8b61..e2a957c 100644 --- a/deployment/charts/tesk/values.yaml +++ b/deployment/charts/tesk/values.yaml @@ -9,7 +9,7 @@ host_name: "" # # 'openstack' or 's3' -storage: none +storage: s3 # Configurable storage class. storageClass: diff --git a/deployment/config.yaml b/deployment/config.yaml index 0006f11..1f57896 100644 --- a/deployment/config.yaml +++ b/deployment/config.yaml @@ -72,6 +72,79 @@ custom: tesResources_backend_parameters: - VmSize - ParamToRecogniseDataComingFromConfig + # Taskmaster configuration + taskmaster_template: + apiVersion: batch/v1 + kind: Job + metadata: + name: taskmaster + labels: + app: taskmaster + spec: + template: + metadata: + name: taskmaster + spec: + serviceAccount: taskmaster + serviceAccountName: taskmaster + containers: + - name: taskmaster + image: docker.io/lvarin/tesk-core-taskmaster:testing + args: + - -f + - /jsoninput/JSON_INPUT.gz + env: + - name: TESK_FTP_USERNAME + valueFrom: + secretKeyRef: + name: ftp-secret + key: username + optional: true + - name: TESK_FTP_PASSWORD + valueFrom: + secretKeyRef: + name: ftp-secret + key: password + optional: true + volumeMounts: + - name: podinfo + mountPath: /podinfo + readOnly: true + - name: jsoninput + mountPath: /jsoninput + readOnly: true + volumes: + - name: podinfo + downwardAPI: + items: + - path: labels + fieldRef: + fieldPath: metadata.labels + restartPolicy: Never + # Taskmaster environment properties + taskmaster_env_properties: + # Taskmaster image name + imageName: docker.io/lvarin/tesk-core-taskmaster + # Taskmaster image version + imageVersion: testing + # Filer image name + filerImageName: docker.io/elixircloud/tesk-core-filer + # Filer image version + filerImageVersion: v0.10.2 + # Test FTP account settings + ftp: + # Name of the secret with FTP account credentials + secretName: account-secret + # If FTP account enabled (based on non-emptiness of secretName) + enabled: true + # If verbose (debug) mode of taskmaster is on (passes additional flag to taskmaster and sets image pull policy to Always) + debug: false + # Environment variables, that will be passed to taskmaster + environment: + key: value + # Service Account name for taskmaster + serviceAccountName: taskmaster + # Logging configuration # Cf. https://foca.readthedocs.io/en/latest/modules/foca.models.html#foca.models.config.LogConfig diff --git a/encoded_data.txt b/encoded_data.txt new file mode 100644 index 0000000..33aa3d4 --- /dev/null +++ b/encoded_data.txt @@ -0,0 +1 @@ +H4sIAJUer2YC/9VWS3PbNhD+KxqeQ1N+xLYync4kbfqapIe0tzqDAYElhQivAKAsx+P/3gVJieDD7qWXXDTCt+A+v93FYya0bYLP3qz+yR7vMk0V3OFBN1K+Wt1lHDxzwgZhdAo3TsbjXeYv3xSFeshN+QVYyH0wDopKSDi/y+JFS8O2u1lwGmgqCg8WOtEvv39432HM6AA6pKZ8cEAVLeXJr6fs86tVZprwvzuOOqOD+TPO9+JBSKyDShxSA9OoOmf3RjYKOmcLPBRvixZ34E3jWCt5zLjwO1KX+P9qfbZ+QjkcgDXoWvvlY0atIHtwHoNCICtpYNtif47+ZDuhecT+MGU8Kgg0+tzqpVqbQGMqOjsBfB6o3+UxafEjzLHQdRYtMsx2vEmCQIcDVfYUHOZUQiurHWVALDhhOPGAVeN+4dqCikpoKsU3cMn9GjQ4GoD07ozhLtYek7QE2QXxxZR5THb0/5imGHkMTFEfwJ3Ca2OtoNpcb9br/JrTm/zq+uYip+fA8vOrsrrkNxdrdltlScpzbeK36zYrimpaAyeVAJmGerSg4b7920PeYn6GW+YeIyFIFXCgu2L3kmP9k6r2Eg+yIlLo3QA1gh87IF6wwLrqsiD2QDhQjvdhXo+Ssp2pojYlwjNwLCZBCsFhuMCMsn0lleGwJEisYIoaKtG6xH7CUiSCA6ko9g3vDKTxW+qolCCFVwmIpIofNA45ZqRgD2OZAysxvwoHxUw+N+8bzJTmAxAAvUe6tcl7oU+OwX4fDTFU4fslKq0wGyIk5aTYico0WGgPbi/QPmWsPQezA51SUgeKSk+T0tV+RFhMDO/Gbyl04bcxAzmLv6vVD6tkOa1+XBVB2cIHjvP+TJp6dTFA4FyE2uENet8ZOyb33Ye3f/3987uodE9l02Kd5l8//XR5e8JJ5YwaMjQo+O3jx/efFhVslQK3rKB3JcUQEQp5MDkSi4dZy0hRAXtgElJoj1zzHvebKeFFFlnjwognlIulT5FAyPKZ8fECZJIK5VM/cDSNtH9tsIH6y7aJm/Js3a47ZVxUe/v6drO5vHq9ucAF+tQZCNQtDQrWOOQaaZ8ch2QutvcbOw0Aiy/05EiMTtsHd47Cdu4mJuagTTk+E/7jxsSzkDZA93DAnonk9zO87Y0EvjcOHwI14cIN3MlainDtY6yVqJPBhNjUPOj41jr1W+zsxADYLQ5eHNpk1HG9dGt8IDjOqIcpKiybIBpCdHeCWsEnSONnNsZzMeE2VtVBmpA4TxZd1bjRJvO1heYLxCRfGRx/W6R4spAc4D5pCzrbVbgN3GicHRGCRMcWGZsfWqfG2b80e6ftMeN29ifsuzHhkBe4bBYNebYF3kic9It45M/EgxeaZTyWnxVMTUEgx0IS6kn1lae9taVx9TvDoHd+sqd8U3KjaNqPaWe9vI2Dkf0aHYGYRFM/EG9jHWKg+ChGC2HWdEegHS8hyKMFQit0gsQV5jGR6QbER0Vz+uzz07/JtQSDdQ0AAA== diff --git a/pyproject.toml b/pyproject.toml index 4979503..05534c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,9 +4,10 @@ requires = ["poetry-core"] [tool.bandit] skips = [ + "B101", # Use of asserts is discouraged as it is removed during byte generation, we don't need that! + "B108", # Insecure usage of temp file/directory, false positive. "B321", # FTP-related functions are being called. - "B402", # A FTP-related module is being imported. - "B108" # Insecure usage of temp file/directory, false positive. + "B402" # A FTP-related module is being imported. ] [tool.poetry] diff --git a/tesk/api/ga4gh/tes/controllers.py b/tesk/api/ga4gh/tes/controllers.py index 466053d..8a95857 100644 --- a/tesk/api/ga4gh/tes/controllers.py +++ b/tesk/api/ga4gh/tes/controllers.py @@ -1,11 +1,16 @@ """Controllers for GA4GH TES API endpoints.""" import logging - -# from connexion import request # type: ignore +import json from foca.utils.logging import log_traffic # type: ignore +from tesk.api.ga4gh.tes.models import TesTask from tesk.api.ga4gh.tes.service_info.service_info import ServiceInfo +from tesk.api.ga4gh.tes.task.create_task import CreateTesTask +from tesk.exceptions import BadRequest, InternalServerError +from tesk.api.ga4gh.tes.task.get_task import GetTesTask +from tesk.api.ga4gh.tes.models import TaskView +from tesk.utils import enum_to_string # Get logger instance logger = logging.getLogger(__name__) @@ -26,19 +31,24 @@ def CancelTask(id, *args, **kwargs) -> dict: # type: ignore # POST /tasks @log_traffic -def CreateTask(*args, **kwargs) -> dict: # type: ignore +def CreateTask(**kwargs) -> dict: """Create task. Args: - *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. """ - pass + try: + request_body = kwargs.get("body") + tes_task = TesTask(**request_body) + response = CreateTesTask(tes_task).response() + return response + except Exception as e: + raise InternalServerError from e # GET /tasks/service-info @log_traffic -def GetServiceInfo() -> dict: # type: ignore +def GetServiceInfo() -> dict: """Get service info.""" service_info = ServiceInfo() return service_info.response() @@ -46,14 +56,22 @@ def GetServiceInfo() -> dict: # type: ignore # GET /tasks @log_traffic -def ListTasks(*args, **kwargs) -> dict: # type: ignore +def ListTasks(*args, **kwargs): """List all available tasks. Args: *args: Variable length argument list. **kwargs: Arbitrary keyword arguments. """ - pass + get_tes_task = GetTesTask() + view = TaskView(kwargs.get("view")) + name_prefix = kwargs.get("name_prefix") + page_size = kwargs.get("page_size") + page_token = kwargs.get("page_token") + response = get_tes_task.list_tasks( + view=view, name_prefix=name_prefix, page_size=page_size, page_token=page_token + ) + return json.loads(response.json()) # GET /tasks diff --git a/tesk/api/ga4gh/tes/models.py b/tesk/api/ga4gh/tes/models.py index c442e1e..a479d7c 100644 --- a/tesk/api/ga4gh/tes/models.py +++ b/tesk/api/ga4gh/tes/models.py @@ -275,7 +275,7 @@ class TesResources(BaseModel): example={"VmSize": "Standard_D64_v3"}, ) backend_parameters_strict: Optional[bool] = Field( - False, + default=False, description="If set to true, backends should fail the task if any " "backend_parameters\nkey/values are unsupported, otherwise, backends should " "attempt to run the task", @@ -568,3 +568,11 @@ class TesListTasksResponse(BaseModel): description="Token used to return the next page of results. This value can be " "used\nin the `page_token` field of the next ListTasks request.", ) + + +class TaskView(str, Enum): + """View of task for API response.""" + + MINIMAL = "MINIMAL" + BASIC = "BASIC" + FULL = "FULL" diff --git a/tesk/api/ga4gh/tes/task/__init__.py b/tesk/api/ga4gh/tes/task/__init__.py new file mode 100644 index 0000000..b8e71d8 --- /dev/null +++ b/tesk/api/ga4gh/tes/task/__init__.py @@ -0,0 +1 @@ +"""Task API controller logic.""" diff --git a/tesk/api/ga4gh/tes/task/create_task.py b/tesk/api/ga4gh/tes/task/create_task.py new file mode 100644 index 0000000..3da339c --- /dev/null +++ b/tesk/api/ga4gh/tes/task/create_task.py @@ -0,0 +1,106 @@ +"""TESK API module for creating a task.""" + +import logging + +from pydantic import BaseModel + +from tesk.api.ga4gh.tes.models import TesCreateTaskResponse, TesResources, TesTask +from tesk.api.ga4gh.tes.task.task_request import TesTaskRequest +from tesk.exceptions import KubernetesError +import os +from datetime import datetime +logger = logging.getLogger(__name__) + + +class CreateTesTask(TesTaskRequest): + """Create TES task.""" + + # TODO: Add user to the class when auth implemented in FOCA + def __init__( + self, + task: TesTask, + # user: User + ): + """Initialize the CreateTask class. + + Args: + task: TES task to create. + """ + super().__init__() + self.task = task + # self.user = user + + def create_task(self) -> TesCreateTaskResponse: + """Create TES task.""" + attempts_no = 0 + while attempts_no < self.constants.job_create_attempts_no: + try: + attempts_no += 1 + resources = self.task.resources + limits = self.kubernetes_client_wrapper.list_limits() + print("limits", limits) + minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb() + + if not self.task.resources: + self.task.resources = TesResources(cpu_cores=int(minimum_ram_gb)) + if resources and resources.ram_gb and resources.ram_gb < minimum_ram_gb: + self.task.resources.ram_gb = minimum_ram_gb + + taskmaster_job = self.tes_kubernetes_converter.from_tes_task_to_k8s_job( + self.task, + # self.user + ) + + taskmaster_config_map = ( + self.tes_kubernetes_converter.from_tes_task_to_k8s_config_map( + self.task, + taskmaster_job, + # self.user + ) + ) + + configmap = self.kubernetes_client_wrapper.create_config_map(taskmaster_config_map) + created_job = self.kubernetes_client_wrapper.create_job(taskmaster_job) + + os.makedirs("/tmp/tesk", exist_ok=True) + output_log_path = f"/tmp/tesk/output-at-{datetime.now().strftime('%H:%M:%S')}.log" + with open(output_log_path, "w") as f: + f.write("*********************************\n") + f.write(f"Manifest consumed to create taskmaster job as {type(taskmaster_job)}\n") + f.write(str(taskmaster_job) + "\n") + f.write("*********************************\n") + + f.write("*********************************\n") + f.write(f"Manifest consumed to create config map as {type(taskmaster_config_map)}\n") + f.write(str(taskmaster_config_map) + "\n") + f.write("*********************************\n") + + # Create ConfigMap and Job + f.write("*********************************\n") + f.write(f"Created ConfigMap as {type(configmap)}\n") + f.write(str(configmap) + "\n") + f.write("*********************************\n") + + f.write("*********************************\n") + f.write(f"Created taskmaster job as {type(created_job)}\n") + f.write(str(created_job) + "\n") + f.write("*********************************\n") + assert created_job.metadata is not None + assert created_job.metadata.name is not None + + return TesCreateTaskResponse(id=created_job.metadata.name) + + except KubernetesError as e: + if ( + not e.is_object_name_duplicated() + or attempts_no >= self.constants.job_create_attempts_no + ): + raise e + + except Exception as exc: + logging.error("ERROR: In createTask", exc_info=True) + raise exc + return {} # Dummy return to silence mypy + + def handle_request(self) -> BaseModel: + return self.create_task() diff --git a/tesk/api/ga4gh/tes/task/get_task.py b/tesk/api/ga4gh/tes/task/get_task.py new file mode 100644 index 0000000..e4d5620 --- /dev/null +++ b/tesk/api/ga4gh/tes/task/get_task.py @@ -0,0 +1,141 @@ +"""Module for getting task.""" + +from typing import List + +from kubernetes.client.models import V1JobList +from pydantic import BaseModel + +from tesk.api.ga4gh.tes.models import TaskView, TesListTasksResponse, TesTask +from tesk.api.ga4gh.tes.task.task_request import TesTaskRequest +from tesk.api.kubernetes.convert.data.task import Task +from tesk.api.kubernetes.convert.data.task_builder import TaskBuilder + + +class GetTesTaskInterface(TesTaskRequest): + """Base class for listing task request.""" + + def __init__(self): + """Initialise base class for listing task.""" + super().__init__() + + def get_task( + self, + task_id: str, + view: TaskView, + # user: User + ) -> TesTask: + taskmaster_job = self.kubernetes_client_wrapper.read_taskmaster_job(task_id) + executor_jobs = self.kubernetes_client_wrapper.list_single_task_executor_jobs( + taskmaster_job.metadata.name + ) + taskmaster_pods = self.kubernetes_client_wrapper.list_single_job_pods( + taskmaster_job + ) + task_builder = ( + TaskBuilder.new_single_task() + .add_job(taskmaster_job) + .add_job_list(executor_jobs.items) + .add_pod_list(taskmaster_pods.items) + ) + + for executor_job in executor_jobs.items: + executor_job_pods = self.kubernetes_client_wrapper.list_single_job_pods( + executor_job + ) + task_builder.add_pod_list(executor_job_pods.items) + + output_filer_job = ( + self.kubernetes_client_wrapper.get_single_task_output_filer_job( + taskmaster_job.metadata.name + ) + ) + if output_filer_job: + task_builder.add_job(output_filer_job) + + return self._get_task(task_builder.get_task(), view, False) + + def _get_task(self, task_objects: Task, view: TaskView, is_list: bool) -> TesTask: + if view == TaskView.MINIMAL: + return self.tes_kubernetes_converter.from_k8s_jobs_to_tes_task_minimal( + task_objects, is_list + ) + + task = self.tes_kubernetes_converter.from_k8s_jobs_to_tes_task_basic( + task_objects, view == TaskView.BASIC + ) + + if view == TaskView.BASIC: + return task + + for i, executor_job in enumerate(task_objects.get_executors()): + if executor_job.has_pods(): + executor_log = task.logs[0].logs[i] + if view == TaskView.FULL: + executor_pod_log = self.kubernetes_client_wrapper.read_pod_log( + executor_job.get_first_pod().metadata.name + ) + if executor_pod_log is not None: + executor_log.stdout = executor_pod_log + + if task_objects.taskmaster.has_pods(): + taskmaster_pod_log = self.kubernetes_client_wrapper.read_pod_log( + task_objects.get_taskmaster().get_first_pod().metadata.name + ) + if taskmaster_pod_log is not None: + task.logs[0].system_logs.append(taskmaster_pod_log) + + return task + + def list_tasks( + self, + name_prefix: str, + page_size: int, + page_token: str, + view: TaskView, + # user: User, + ) -> TesListTasksResponse: + taskmaster_jobs: V1JobList = ( + self.kubernetes_client_wrapper.list_all_taskmaster_jobs_for_user( + page_token, + page_size, + # user + ) + ) + executor_jobs = self.kubernetes_client_wrapper.list_all_task_executor_jobs() + filer_jobs = self.kubernetes_client_wrapper.list_all_filer_jobs() + job_pods = self.kubernetes_client_wrapper.list_all_job_pods() + task_list_builder = ( + TaskBuilder.new_task_list() + .add_job_list(taskmaster_jobs.items) + .add_job_list(executor_jobs.items) + .add_job_list(filer_jobs.items) + .add_pod_list(job_pods.items) + ) + + tasks: List[TesTask] = [ + self._get_task(task, view, True) + for task in task_list_builder.get_task_list() + ] + + response = TesListTasksResponse( + tasks=tasks, next_page_token=taskmaster_jobs.metadata._continue + ) + + return response + + +class GetTesTask(GetTesTaskInterface): + def handle_request( + self, + name_prefix: str, + page_size: int, + page_token: str, + view: TaskView, + # user: User, + ) -> BaseModel: + return self.list_tasks( + name_prefix=name_prefix, + page_size=page_size, + page_token=page_token, + view=view, + ) diff --git a/tesk/api/ga4gh/tes/task/task_request.py b/tesk/api/ga4gh/tes/task/task_request.py new file mode 100644 index 0000000..23a6b53 --- /dev/null +++ b/tesk/api/ga4gh/tes/task/task_request.py @@ -0,0 +1,37 @@ +"""Base class for tesk request.""" + +import json +import logging +from abc import ABC, abstractmethod + +from pydantic import BaseModel + +from tesk.api.kubernetes.client_wrapper import KubernetesClientWrapper +from tesk.api.kubernetes.constants import Constants +from tesk.api.kubernetes.convert.converter import TesKubernetesConverter + +logger = logging.getLogger(__name__) + + +class TesTaskRequest(ABC): + """Base class for tesk request ecapsulating common methods and members.""" + + def __init__(self): + """Initialise base class for tesk request.""" + self.kubernetes_client_wrapper = KubernetesClientWrapper() + self.tes_kubernetes_converter = TesKubernetesConverter() + self.constants = Constants() + + @abstractmethod + def handle_request(self) -> BaseModel: + """Business logic for the request.""" + pass + + def response(self) -> dict: + """Get response for the request.""" + response: BaseModel = self.handle_request() + try: + return json.load(json.dumps(response)) + except (TypeError, ValueError) as e: + logger.info(e) + return response.dict() diff --git a/tesk/api/kubernetes/__init__.py b/tesk/api/kubernetes/__init__.py new file mode 100644 index 0000000..ffa7082 --- /dev/null +++ b/tesk/api/kubernetes/__init__.py @@ -0,0 +1 @@ +"""Kubernetes API module.""" diff --git a/tesk/api/kubernetes/client_wrapper.py b/tesk/api/kubernetes/client_wrapper.py new file mode 100644 index 0000000..515abbf --- /dev/null +++ b/tesk/api/kubernetes/client_wrapper.py @@ -0,0 +1,322 @@ +"""Wrapper Abstraction of Kubernetes Python client API for TESK.""" + +import logging +from typing import Optional + +from kubernetes import client, config +from kubernetes.client import ( + V1ConfigMap, + V1Job, + V1JobList, + V1LabelSelector, + V1LimitRangeList, + V1PodList, +) +from kubernetes.utils.quantity import parse_quantity # type: ignore + +from tesk.api.kubernetes.constants import Constants +from tesk.constants import TeskConstants +from tesk.exceptions import KubernetesError, NotFound + +logger = logging.getLogger(__name__) + + +class KubernetesClientWrapper: + """Kubernetes client wrapper class.""" + + def __init__(self): + """Initialize the Kubernetes client wrapper. + + Args: + namespace: Namespace to use for Kubernetes. + """ + config.load_kube_config() + self.batch_api = client.BatchV1Api() + self.core_api = client.CoreV1Api() + self.namespace = TeskConstants.tesk_namespace + self.constant = Constants() + + def create_job(self, job: V1Job) -> V1Job: + """Create a job in the Kubernetes cluster. + + Returns: + Job object created in the Kubernetes cluster. + """ + try: + v1_job: V1Job = self.batch_api.create_namespaced_job( + namespace=self.namespace, body=job + ) + return v1_job + except KubernetesError as e: + logger.error(f"Exception when creating job: {e}") + raise + + def create_config_map(self, config_map: V1ConfigMap) -> V1ConfigMap: + """Create a config map in the Kubernetes cluster. + + Args: + config_map: ConfigMap object to create. + """ + try: + v1_config_map: V1ConfigMap = self.core_api.create_namespaced_config_map( + namespace=self.namespace, body=config_map + ) + return v1_config_map + except KubernetesError as e: + logger.error(f"Exception when creating config map: {e}") + raise + + def read_taskmaster_job(self, task_id: str) -> V1Job: + """Read a taskmaster job from the Kubernetes cluster. + + task_id: Task identifier. + + Returns: + Job object read from the Kubernetes cluster + + Raises: + Exception: If the task is not found. + """ + try: + job: V1Job = self.batch_api.read_namespaced_job( + name=task_id, namespace=self.namespace + ) + if ( + job.metadata + and job.metadata.labels + and self.constant.label_jobtype_key in job.metadata.labels + and job.metadata.labels[self.constant.label_jobtype_key] + == self.constant.label_jobtype_value_taskm + ): + return job + except KubernetesError as e: + if e.status != NotFound.code: + logger.error(f"Exception when reading job: {e}") + raise + raise Exception(f"Task {task_id} not found") + + def list_jobs(self, page_token: str = None, label_selector=None, limit=None): + """List jobs in the Kubernetes cluster. + + Args: + page_token: pageToken supplied by user (from previous result; points to + next page of results) + label_selector: Label selector to filter jobs. + limit: Maximum number of jobs to return. + """ + try: + return self.batch_api.list_namespaced_job( + namespace=self.namespace, + label_selector=label_selector, + limit=limit, + _continue=page_token, + ) + except KubernetesError as e: + logger.error(f"Exception when listing jobs: {e}") + raise + + def list_limits(self, label_selector=None, limit=None) -> V1LimitRangeList: + """List limit ranges in the Kubernetes cluster. + + Args: + label_selector: Label selector to filter limit ranges. + limit: Maximum number of limit ranges to return. + """ + try: + limits: V1LimitRangeList = self.core_api.list_namespaced_limit_range( + namespace=self.namespace, label_selector=label_selector, limit=limit + ) + return limits + except KubernetesError as e: + logger.error(f"Exception when listing limits: {e}") + raise + + def minimum_ram_gb(self) -> float: + """Get the minimum amount of RAM in the cluster. + + Returns: + Minimum amount of RAM in the cluster in GB. + """ + try: + min_ram = 0 + limits = self.list_limits().items + for limit in limits: + if limit.spec: + for item in limit.spec.limits: + if item.min and "memory" in item.min: + mem_quantity = item.min["memory"] + mem_bytes = self.quantity_to_bytes(mem_quantity) + min_ram = max(min_ram, mem_bytes) + return min_ram / (1024**3) + except (ValueError, TypeError) as e: + logger.error(f"Error in minimum_ram_gb: {e}") + return 0.0 + except Exception as e: + logger.error(f"Unexpected error in minimum_ram_gb: {e}") + raise + + def quantity_to_bytes(self, quantity: str) -> int: + """Convert quantity(resource) to bytes.""" + parsed_quantity: int = parse_quantity(quantity) + return parsed_quantity + + def list_all_taskmaster_jobs_for_user( + self, + page_token: str, + items_per_page: int, + # user: str + ) -> V1JobList: + """Gets all Taskmaster job objects, a User is allowed to see. + + Args: + page_token: pageToken supplied by user (from previous result; points to + next page of results) + items_per_page: Value submitted by user, limiting number of results. + user: User identifier. + + Returns: + Job list of Taskmaster jobs that user is allowed to see. + """ + # TODO: Implement this method when auth is implemented in FOCA. + label_selector = f"{self.constant.label_jobtype_key}={self.constant.label_jobtype_value_taskm}" + # if user.get_label_selector(): + # label_selector += f",{user.get_label_selector()}" + + result = self.list_jobs(page_token, label_selector, items_per_page) + + # if user.is_member_in_non_managed_groups(): + # filtered_job_list = [ + # job for job in result.items + # if user.is_group_manager( + # job.metadata.labels.get(self.constant.label_groupname_key) + # ) or user.get_username() + # == job.metadata.labels.get(self.constant.label_userid_key) + # ] + # result.items = filtered_job_list + + return result + + def list_single_task_executor_jobs(self, task_id: str) -> V1JobList: + """List single task executor job.""" + label_selector = (self.constant.label_testask_id_key + "=" + task_id,) + return self.list_jobs(label_selector=label_selector) + + def get_single_task_output_filer_job(self, task_id: str) -> Optional[V1Job]: + """Get single task output filer job.""" + try: + job: V1Job = self.batch_api.read_namespaced_job( + name=task_id + self.constant.job_name_filer_suf, + namespace=self.namespace, + ) + return job + except KubernetesError as e: + if e.status != NotFound.code: + logger.error(f"Exception when reading output filer job: {e}") + raise + return None + + def list_all_taskmaster_jobs(self) -> V1JobList: + """List all taskmaster jobs in the Kubernetes cluster.""" + label_selector = ( + self.constant.label_jobtype_key + + "=" + + self.constant.label_jobtype_value_taskm + ) + return self.list_jobs(label_selector=label_selector) + + def list_all_task_executor_jobs(self) -> V1JobList: + """List all executor jobs in the Kubernetes cluster.""" + label_selector = ( + self.constant.label_jobtype_key + + "=" + + self.constant.label_jobtype_value_exec + ) + return self.list_jobs(label_selector=label_selector) + + def list_all_filer_jobs(self) -> V1JobList: + """List all output filer jobs in the Kubernetes cluster.""" + label_selector = "!" + self.constant.label_jobtype_key + return self.list_jobs(label_selector=label_selector) + + def list_single_job_pods(self, job: V1Job) -> V1PodList: + """List pods associated with a single job. + + Args: + job: Job object to list pods for. + """ + try: + if ( + job.spec + and job.spec.selector + and isinstance(job.spec.selector, V1LabelSelector) + and job.spec.selector.match_labels + ): + label_selector = ",".join( + f"{k}={v}" for k, v in job.spec.selector.match_labels.items() + ) + namespaced_pods: V1PodList = self.core_api.list_namespaced_pod( + namespace=self.namespace, label_selector=label_selector + ) + return namespaced_pods + else: + logger.error("Job spec, selector, or match_labels is None or invalid") + return V1PodList(items=[]) + except KubernetesError as e: + logger.error(f"Exception when listing pods: {e}") + raise + + def list_all_job_pods(self): + """List all job pods.""" + label_selector = "job-name" + try: + return self.core_api.list_namespaced_pod( + namespace=self.namespace, label_selector=label_selector + ) + except KubernetesError as e: + logger.error(f"Couldn't list job of {self.namespace} namespace.") + raise + + def read_pod_log(self, pod_name: str) -> Optional[str]: + """Read logs from a pod. + + Args: + pod_name: Name of the pod to read logs from. + """ + try: + pod_log: str = self.core_api.read_namespaced_pod_log( + name=pod_name, namespace=self.namespace + ) + return pod_log + except KubernetesError as e: + logger.error(f"Exception when reading pod log: {e}") + return None + + def label_job_as_cancelled(self, task_id: str) -> None: + """Label a job as cancelled. + + Args: + task_id: Task identifier. + """ + try: + patch = {"metadata": {"labels": {"status": "cancelled"}}} + self.batch_api.patch_namespaced_job( + name=task_id, namespace=self.namespace, body=patch + ) + except KubernetesError as e: + logger.error(f"Exception when labeling job as cancelled: {e}") + raise + + def label_pod_as_cancelled(self, pod_name: str) -> None: + """Label a pod as cancelled. + + Args: + pod_name: Pod name. + """ + try: + patch = {"metadata": {"labels": {"status": "cancelled"}}} + self.core_api.patch_namespaced_pod( + name=pod_name, namespace=self.namespace, body=patch + ) + except KubernetesError as e: + logger.error(f"Exception when labeling pod as cancelled: {e}") + raise diff --git a/tesk/api/kubernetes/constants.py b/tesk/api/kubernetes/constants.py new file mode 100644 index 0000000..4386b34 --- /dev/null +++ b/tesk/api/kubernetes/constants.py @@ -0,0 +1,166 @@ +"""Constants for Kubernetes API.""" + +from enum import Enum +from typing import Set + +from pydantic import BaseModel, Field + + +class Constants(BaseModel): + """Constants related to job and tasks.""" + + taskmaster_input: str = Field( + default="JSON_INPUT", + description="ENV var that serves as taskmaster script input (JSON format)", + ) + taskmaster_input_exec_key: str = Field( + default="executors", + description="Key in JSON taskmaster input, which holds list of executors", + ) + volume_name: str = Field(default="PVC", description="Volume name") + job_create_attempts_no: int = Field( + default=5, + description="Number of attempts of job creation in case of name collision", + ) + job_name_taskm_prefix: str = Field( + default="task-", + description="Constant prefix of taskmaster's job name (== TES task ID)", + ) + job_name_exec_prefix: str = Field( + default="-ex-", + description="Part of executor's job name, that follows taskmaster's name", + ) + job_name_taskm_rand_part_length: int = Field( + default=4, + description=( + "No of bytes of random part of task master's name (which end up " + "encoded to hex)" + ), + ) + job_name_exec_no_length: int = Field( + default=2, + description="No of digits reserved for executor number in executor's job name." + " Ends up padded with '0' for numbers < 10", + ) + job_name_filer_suf: str = Field( + default="-outputs-filer", description="Output filer name suffix" + ) + ann_testask_name_key: str = Field( + default="tes-task-name", + description=( + "Key of the annotation, that stores name of TES task in both taskmaster's " + "job and executor's jobs" + ), + ) + ann_json_input_key: str = Field( + default="json-input", + description="Key of the annotation, that stores whole input TES task serialized" + " to JSON", + ) + label_testask_id_key: str = Field( + default="taskmaster-name", + description="Key of the label, that stores taskmaster's name (==TES task " + "generated ID) in executor jobs", + ) + label_jobtype_key: str = Field( + default="job-type", + description="Key of the label, that stores type of a job (taskmaster or " + "executor)", + ) + label_jobtype_value_taskm: str = Field( + default="taskmaster", + description="Value of the label with taskmaster's job type", + ) + label_jobtype_value_exec: str = Field( + default="executor", description="Value of the label with executor's job type" + ) + label_execno_key: str = Field( + default="executor-no", + description="Key of the label, that holds executor number for executor jobs", + ) + label_taskstate_key: str = Field( + default="task-status", + description="Key of the label, that holds executor's state", + ) + label_taskstate_value_canc: str = Field( + default="Cancelled", + description="Value of the label, that holds executor's Cancelled state", + ) + label_userid_key: str = Field( + default="creator-user-id", description="Key of the label, that holds user id" + ) + label_groupname_key: str = Field( + default="creator-group-name", + description="Key of the label, that holds user's group name", + ) + absolute_path_regexp: str = Field( + default="^\\/.*", description="Pattern to validate paths" + ) + absolute_path_message: str = Field( + default="must be an absolute path", + description="Message for absolute path validation (to avoid " + "message.properties)", + ) + resource_disk_default: float = Field( + default=0.1, description="Default resource disk value" + ) + completed_states: Set[str] = Field( + default={"CANCELED", "COMPLETE", "EXECUTOR_ERROR", "SYSTEM_ERROR"}, + description="TES task states, indicating task is not running and cannot be " + "cancelled", + ) + ftp_secret_username_env: str = Field( + default="TESK_FTP_USERNAME", + description="Name of taskmaster's ENV variable with username of FTP account " + "used for storage", + ) + ftp_secret_password_env: str = Field( + default="TESK_FTP_PASSWORD", + description="Name of taskmaster's ENV variable with password of FTP account " + "used for storage", + ) + cancel_patch: str = Field( + default='{"metadata":{"labels":{"task-status":"Cancelled"}}}', + description="Patch object passed to job API, when cancelling task", + ) + executor_backoff_limit: str = Field( + default="EXECUTOR_BACKOFF_LIMIT", + description="Set a number of retries of a job execution.", + ) + filer_backoff_limit: str = Field( + default="FILER_BACKOFF_LIMIT", + description="Set a number of retries of a filer job execution.", + ) + + +class K8sConstants(BaseModel): + """Constants related to Kubernetes.""" + + k8s_batch_api_version: str = Field( + default="batch/v1", description="Kubernetes Batch API version" + ) + k8s_batch_api_job_type: str = Field( + default="Job", description="Kubernetes Job object type" + ) + job_restart_policy: str = Field( + default="Never", description="Kubernetes Job restart policy" + ) + resource_cpu_key: str = Field("cpu", description="Executor CPU resource label") + resource_mem_key: str = Field( + default="memory", description="Executor memory resource label" + ) + resource_mem_unit: str = Field( + default="Gi", description="Executor memory resource unit" + ) + resource_mem_one_gb: int = Field( + default=1073741824, description="One Gibibyte (Gi) in bytes" + ) + + class PodPhase(Enum): + """Pod state.""" + + PENDING = "Pending" + + def get_code(self) -> str: + """Return the pod state.""" + return self.value diff --git a/tesk/api/kubernetes/convert/__init__.py b/tesk/api/kubernetes/convert/__init__.py new file mode 100644 index 0000000..33ac4cc --- /dev/null +++ b/tesk/api/kubernetes/convert/__init__.py @@ -0,0 +1 @@ +"""Module for converting Kubernetes objects to Task objects.""" diff --git a/tesk/api/kubernetes/convert/converter.py b/tesk/api/kubernetes/convert/converter.py new file mode 100644 index 0000000..0bae9a1 --- /dev/null +++ b/tesk/api/kubernetes/convert/converter.py @@ -0,0 +1,511 @@ +"""Module for converting TES tasks to Kubernetes jobs.""" + +import base64 +import gzip +import json +import logging +from decimal import Decimal +from io import BytesIO +from typing import Any, Optional + +from kubernetes.client import ( + V1ConfigMap, + V1ConfigMapVolumeSource, + V1Container, + V1EnvVar, + V1JobSpec, + V1JobStatus, + V1ObjectMeta, + V1PodSpec, + V1PodTemplateSpec, + V1ResourceRequirements, + V1Volume, +) +from kubernetes.client.models import V1Job +from kubernetes.utils.quantity import parse_quantity # type: ignore + +from tesk.api.ga4gh.tes.models import ( + TesExecutor, + TesExecutorLog, + TesResources, + TesState, + TesTask, + TesTaskLog, +) +from tesk.api.kubernetes.client_wrapper import KubernetesClientWrapper +from tesk.api.kubernetes.constants import Constants, K8sConstants +from tesk.api.kubernetes.convert.data.job import Job, JobStatus +from tesk.api.kubernetes.convert.data.task import Task +from tesk.api.kubernetes.convert.executor_command_wrapper import ExecutorCommandWrapper +from tesk.api.kubernetes.convert.template import KubernetesTemplateSupplier +from tesk.custom_config import TaskmasterEnvProperties +from tesk.utils import ( + decimal_to_float, + enum_to_string, + get_taskmaster_env_property, + pydantic_model_list_dict, + time_formatter, +) + +logger = logging.getLogger(__name__) + + +class TesKubernetesConverter: + """Convert TES requests to Kubernetes resources.""" + + def __init__(self): + """Initialize the converter.""" + self.taskmaster_env_properties: TaskmasterEnvProperties = ( + get_taskmaster_env_property() + ) + self.template_supplier = KubernetesTemplateSupplier() + self.constants = Constants() + self.k8s_constants = K8sConstants() + self.kubernetes_client_wrapper = KubernetesClientWrapper() + + def from_tes_task_to_k8s_job(self, task: TesTask): + """Convert TES task to Kubernetes job.""" + taskmaster_job: V1Job = ( + self.template_supplier.get_taskmaster_template_with_value_from_config() + ) + + if taskmaster_job.metadata is None: + taskmaster_job.metadata = V1ObjectMeta() + + if taskmaster_job.metadata.annotations is None: + taskmaster_job.metadata.annotations = {} + + if taskmaster_job.metadata.labels is None: + taskmaster_job.metadata.labels = {} + + # taskmaster_job.metadata.name = task.name + if task.name: + taskmaster_job.metadata.annotations[self.constants.ann_testask_name_key] = ( + task.name + ) + # taskmaster_job.metadata.labels[self.constants.label_userid_key] = user[ + # "username" + # ] + + # if task.tags and "GROUP_NAME" in task.tags: + # taskmaster_job.metadata.labels[self.constants.label_userid_key] = task[ + # "tags" + # ]["GROUP_NAME"] + # elif user["is_member"]: + # taskmaster_job.metadata.labels[self.constants.label_groupname_key] = user[ + # "any_group" + # ] + + # Convert Pydantic model to dictionary + task_dict = task.dict() + + # Serialize to JSON with indentation + json_input = json.dumps(task_dict, indent=2, default=enum_to_string) + + try: + taskmaster_job.metadata.annotations[self.constants.ann_json_input_key] = ( + json_input + ) + except Exception as ex: + logger.info( + f"Serializing task {taskmaster_job.metadata.name} to JSON failed", ex + ) + + volume = V1Volume( + name="jsoninput", + config_map=V1ConfigMapVolumeSource(name=taskmaster_job.metadata.name), + ) + + if taskmaster_job.spec is None: + taskmaster_job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if taskmaster_job.spec.template.spec is None: + taskmaster_job.spec.template.spec = V1PodSpec(containers=[]) + if taskmaster_job.spec.template.spec.volumes is None: + taskmaster_job.spec.template.spec.volumes = [] + + taskmaster_job.spec.template.spec.volumes.append(volume) + return taskmaster_job + + def from_tes_task_to_k8s_config_map( + self, + task: TesTask, + taskmaster_job: V1Job, + # user, + ) -> V1ConfigMap: + """Create a Kubernetes ConfigMap from a TES task.""" + assert taskmaster_job.metadata is not None, ( + "Taskmaster job metadata should have already been set while create" + " taskmaster!" + ) + + taskmaster_config_map = V1ConfigMap( + metadata=V1ObjectMeta(name=taskmaster_job.metadata.name) + ) + + assert ( + taskmaster_config_map.metadata is not None + ), "Taskmaster metadata is should have already been set!" + + if taskmaster_config_map.metadata.labels is None: + taskmaster_config_map.metadata.labels = {} + + if taskmaster_config_map.metadata.annotations is None: + taskmaster_config_map.metadata.annotations = {} + + # FIXME: Figure out what to do if task.name is none. + task_name = task.name or "String" + + taskmaster_config_map.metadata.annotations[ + self.constants.ann_testask_name_key + ] = task_name + + # taskmaster_config_map.metadata.labels[self.constants.label_userid_key] + # = user["username"] + + if task.tags and "GROUP_NAME" in task.tags: + taskmaster_config_map.metadata.labels[ + self.constants.label_groupname_key + ] = task.tags["GROUP_NAME"] + # elif user["is_member"]: + # taskmaster_config_map.metadata.labels[self.constants.label_groupname_key] + # = user["any_group"] + + assert taskmaster_config_map.metadata.name is not None + assert task.resources is not None + + executors_as_jobs = [ + self.from_tes_executor_to_k8s_job( + generated_task_id=taskmaster_config_map.metadata.name, + tes_task_name=task_name, + executor=executor, + executor_index=idx, + resources=task.resources, + # user=user, + ) + for idx, executor in enumerate(task.executors) + ] + + print("nice") + print(task) + print(task.volumes) + print("nice") + + taskmaster_input: dict[str, Any] = { + "inputs": pydantic_model_list_dict(task.inputs) if task.inputs else [], + "outputs": pydantic_model_list_dict(task.outputs) if task.outputs else [], + "volumes": task.volumes or [], + "resources": { + "disk_gb": float(task.resources.disk_gb) + if task.resources.disk_gb + else 10.0 + }, + } + print(taskmaster_input) + taskmaster_input[self.constants.taskmaster_input_exec_key] = [ + exec_job.to_dict() for exec_job in executors_as_jobs + ] + + print(taskmaster_input) + taskmaster_input_as_json = json.loads( + json.dumps(taskmaster_input, default=decimal_to_float) + ) + print(taskmaster_input_as_json) + + try: + with BytesIO() as obj: + with gzip.GzipFile(fileobj=obj, mode="wb") as gzip_file: + json_data = json.dumps(taskmaster_input_as_json) + gzip_file.write(json_data.encode("utf-8")) + taskmaster_config_map.binary_data = { + f"{self.constants.taskmaster_input}.gz": base64.b64encode( + obj.getvalue() + ).decode("utf-8") + } + except Exception as e: + logger.info( + ( + f"Compression of task {taskmaster_config_map.metadata.name}" + f" JSON configmap failed" + ), + e, + ) + + return taskmaster_config_map + + def from_tes_executor_to_k8s_job( # noqa: PLR0913 + self, + generated_task_id: str, + tes_task_name: Optional[str], + executor: TesExecutor, + executor_index: int, + resources: TesResources, + # user: User + ) -> V1Job: + """Create a Kubernetes job from a TES executor.""" + # Get new template executor Job object + executor_job: V1Job = ( + self.template_supplier.get_executor_template_with_value_from_config() + ) + + # Set executors name based on taskmaster's job name + Job(executor_job).change_job_name(Task(taskmaster_name=generated_task_id).get_executor_name(executor_index)) + + if executor_job.metadata is None: + executor_job.metadata = V1ObjectMeta() + + # Put arbitrary labels and annotations + executor_job.metadata.labels = executor_job.metadata.labels or {} + executor_job.metadata.labels[self.constants.label_testask_id_key] = ( + generated_task_id + ) + executor_job.metadata.labels[self.constants.label_execno_key] = str( + executor_index + ) + # job.metadata.labels[self.constants.label_userid_key] = user.username + + if executor_job.metadata is None: + executor_job.metadata = V1ObjectMeta() + if executor_job.metadata.annotations is None: + executor_job.metadata.annotations = {} + + if tes_task_name: + executor_job.metadata.annotations[self.constants.ann_testask_name_key] = ( + tes_task_name + ) + + if executor_job.spec is None: + executor_job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if executor_job.spec.template.spec is None: + executor_job.spec.template.spec = V1PodSpec(containers=[]) + + container: V1Container = executor_job.spec.template.spec.containers[0] + + # TODO: Not sure what to do with this + # Convert potential TRS URI into docker image + container.image = executor.image + + if not container.command: + container.command = [] + + for command in ExecutorCommandWrapper( + executor + ).get_commands_with_stream_redirects(): + container.command.append(command) + + if executor.env: + container.env = [ + V1EnvVar(name=key, value=value) for key, value in executor.env.items() + ] + else: + container.env = [] + + container.working_dir = executor.workdir + container.resources = V1ResourceRequirements(requests={}) + + assert container.resources.requests is not None + + if resources.cpu_cores: + container.resources.requests["cpu"] = parse_quantity( + str(resources.cpu_cores) + ) + + if resources.ram_gb: + container.resources.requests["memory"] = parse_quantity( + f"{resources.ram_gb:.6f}Gi" + ) + + # # Workaround + # # Check if volumes is None and set it to an empty list if it is + # if ( + # executor_job.spec + # and executor_job.spec.spec + # and executor_job.spec.spec.volumes is None + # ): + # executor_job.spec.spec.volumes = [] + + return executor_job + + def is_job_in_status(tested_object: V1JobStatus, test_objective: JobStatus) -> bool: + no_of_pods_in_state = None + if test_objective == JobStatus.ACTIVE: + no_of_pods_in_state = tested_object.active + elif test_objective == JobStatus.SUCCEEDED: + no_of_pods_in_state = tested_object.succeeded + elif test_objective == JobStatus.FAILED: + if tested_object.succeeded is not None and tested_object.succeeded > 0: + return False + no_of_pods_in_state = tested_object.failed + + return no_of_pods_in_state is not None and no_of_pods_in_state > 0 + + def extract_state_from_k8s_jobs(self, taskmaster_with_executors: Task) -> TesState: + # taskmaster_job: V1Job = taskmaster_with_executors.get_taskmaster().get_job() + # last_executor: Optional[Job] = taskmaster_with_executors.get_last_executor() + # output_filer: Optional[Job] = taskmaster_with_executors.get_output_filer() + # taskmaster_cancelled = ( + # taskmaster_job.metadata.labels.get(self.constants.label_taskstate_key) + # == self.constants.label_taskstate_value_canc + # ) + # taskmaster_running = self.is_job_in_status( + # tested_object=taskmaster_job.status, + # test_objective=JobStatus.ACTIVE + # ) + # print(taskmaster_running) + # taskmaster_completed = self.is_job_in_status( + # taskmaster_job.status, JobStatus.SUCCEEDED + # ) + # print(taskmaster_completed) + # executor_present = last_executor is not None + # last_executor_failed = executor_present and self.is_job_in_status( + # last_executor.get_job().status, JobStatus.FAILED + # ) + # print(last_executor_failed) + # last_executor_completed = executor_present and self.is_job_in_status( + # last_executor.get_job().status, JobStatus.SUCCEEDED + # ) + # print(last_executor_completed) + # output_filer_failed = output_filer is not None and self.is_job_in_status( + # output_filer.get_job().status, JobStatus.FAILED + # ) + # print(output_filer_failed) + # pending = self.k8s_constants.PodPhase.PENDING + # taskmaster_pending = any( + # pod.status.phase == pending + # for pod in taskmaster_with_executors.get_taskmaster().get_pods() + # ) + # last_executor_pending = executor_present and any( + # pod.status.phase == pending for pod in last_executor.get_pods() + # ) + + # if taskmaster_cancelled: + # return TesState.CANCELED + # if taskmaster_completed and output_filer_failed: + # return TesState.SYSTEM_ERROR + # if taskmaster_completed and last_executor_completed: + # return TesState.COMPLETE + # if taskmaster_completed and last_executor_failed: + # return TesState.EXECUTOR_ERROR + # if taskmaster_pending: + # return TesState.QUEUED + # if taskmaster_running and not executor_present: + # return TesState.INITIALIZING + # if last_executor_pending: + # return TesState.QUEUED + # if taskmaster_running: + # return TesState.RUNNING + return TesState.SYSTEM_ERROR + + def extract_executor_log_from_k8s_job_and_pod( + self, executor: Job + ) -> TesExecutorLog: + """Extracts TesExecutorLog from executor job and pod objects. + + Does not contain stdout (which needs access to pod log). + """ + log = TesExecutorLog() + executor_job: V1Job = executor.get_job() + start_time = getattr(executor_job.status, "start_time", None) + log.start_time = time_formatter(start_time) + + end_time = getattr(executor_job.status, "completion_time", None) + log.end_time = time_formatter(end_time) + + log.stdout = "" + + if executor.has_pods(): + first_pod_status = getattr(executor.get_first_pod(), "status", None) + exit_code = None + + if first_pod_status: + container_statuses = getattr(first_pod_status, "container_statuses", []) + if container_statuses: + container_status = container_statuses[0] + container_state = getattr(container_status, "state", None) + if container_state: + terminated = getattr(container_state, "terminated", None) + if terminated: + exit_code = getattr(terminated, "exit_code", None) + + log.exit_code = exit_code + + return log + + def from_k8s_jobs_to_tes_task_minimal( + self, taskmaster_with_executors: Task, is_list: bool + ) -> TesTask: + """Convert Kubernetes jobs to TES task.""" + + executors = taskmaster_with_executors.get_executors() + task = TesTask( + # FIXME: I don't think this is it :'( + executors=[ + TesExecutor( + image=executor.image or None, command=executor.command or None + ) + for executor in executors + ] + ) + metadata: V1ObjectMeta = ( + taskmaster_with_executors.get_taskmaster().get_job().metadata + ) + task.id = metadata.name + task.state = self.extract_state_from_k8s_jobs(taskmaster_with_executors) + if not is_list: + # FIXME: This is a hack, we need to get the real values + log = TesTaskLog(logs=TesExecutorLog(exit_code=0)) + task.logs.append(log) + metadata = taskmaster_with_executors.taskmaster.job["metadata"]["labels"] + log.metadata["USER_ID"] = metadata[self.constants.label_userid_key] + if self.constants.label_groupname_key in metadata: + log.metadata["GROUP_NAME"] = metadata[ + self.constants.label_groupname_key + ] + return task + + def from_k8s_jobs_to_tes_task_basic( + self, taskmaster_with_executors: Task, nullify_input_content: bool + ) -> TesTask: + task = TesTask() + taskmaster_job: V1Job = taskmaster_with_executors.taskmaster.job + taskmaster_job_metadata: V1ObjectMeta = taskmaster_job.metadata + input_json = (taskmaster_job_metadata.annotations or {}).get( + self.constants.ann_json_input_key, "" + ) + try: + task = TesTask.parse_raw(input_json) + if nullify_input_content and task.inputs: + for input_item in task.inputs: + input_item["content"] = None + except json.JSONDecodeError as ex: + print( + f"Deserializing task {taskmaster_job_metadata['name']} from JSON failed; {ex}" + ) + + task.id = taskmaster_job_metadata["name"] + task.state = self.extract_state_from_k8s_jobs(taskmaster_with_executors) + task.creation_time = time_formatter( + taskmaster_job_metadata["creation_timestamp"] + ) + log = TesTaskLog() + task.logs.append(log) + log.metadata["USER_ID"] = taskmaster_job_metadata["labels"][ + self.constants.label_userid_key + ] + if self.constants.label_groupname_key in taskmaster_job_metadata["labels"]: + log["GROUP_NAME"] = taskmaster_job_metadata["labels"][ + self.constants.label_groupname_key + ] + log.start_time = ( + time_formatter(taskmaster_job.status.start_time) + if taskmaster_job.status.start_time + else None + ) + log.end_time = ( + time_formatter(taskmaster_job.status.completion_time) + if taskmaster_job.status.completion_time + else None + ) + for executor_job in taskmaster_with_executors.get_executors(): + executor_log = self.extract_executor_log_from_k8s_job_and_pod(executor_job) + task.logs.append(executor_log) + return task diff --git a/tesk/api/kubernetes/convert/data/__init__.py b/tesk/api/kubernetes/convert/data/__init__.py new file mode 100644 index 0000000..080fc38 --- /dev/null +++ b/tesk/api/kubernetes/convert/data/__init__.py @@ -0,0 +1 @@ +"""Data structure module to handle k8s resources of TES.""" diff --git a/tesk/api/kubernetes/convert/data/build_strategy.py b/tesk/api/kubernetes/convert/data/build_strategy.py new file mode 100644 index 0000000..9bf2460 --- /dev/null +++ b/tesk/api/kubernetes/convert/data/build_strategy.py @@ -0,0 +1,58 @@ +"""Part of the toolset aimed at building Kubernetes object structure of a task or tasks. + +Gradually adding to it objects returned by calls to Kubernetes API (jobs and pods). +Implementing classes are responsible of creating, +storing and maintaining the actual `Task` object or `Task` object's list. +""" + +from abc import ABC, abstractmethod +from typing import List, Optional + +from tesk.api.kubernetes.convert.data.job import Job +from tesk.api.kubernetes.convert.data.task import Task + + +class BuildStrategy(ABC): + """Part of the toolset aimed at building Kubernetes object structure of a task/s. + + Gradually adding to it objects returned by calls to Kubernetes API (jobs and pods). + Implementing classes are responsible of creating, + storing and maintaining the actual `Task` object or `Task` object's list. + """ + + @abstractmethod + def add_taskmaster_job(self, taskmaster_job: Job): + """Add taskmaster job. + + Method should optionally filter and place the passed taskmaster's job object + in the resulting structure. + """ + pass + + @abstractmethod + def add_executor_job(self, executor_job: Job): + """Add executor job. + + Method should optionally filter and place the passed executor's job object + in the resulting structure (and match it to appropriate taskmaster). + """ + pass + + @abstractmethod + def add_output_filer_job(self, filer_job: Job): + """Add output filer job. + + Method should optionally filter and place the passed output filer's job object + in the resulting structure (and match it to appropriate taskmaster). + """ + pass + + @abstractmethod + def get_task(self) -> Optional[Task]: + """Return the single task object.""" + pass + + @abstractmethod + def get_task_list(self) -> List[Task]: + """Return the list of task objects.""" + pass diff --git a/tesk/api/kubernetes/convert/data/job.py b/tesk/api/kubernetes/convert/data/job.py new file mode 100644 index 0000000..73251dd --- /dev/null +++ b/tesk/api/kubernetes/convert/data/job.py @@ -0,0 +1,80 @@ +"""A container for a single Kubernetes job object. + +Can be both a taskmaster and an executor, it list of worker pods (Kubernetes +Pod objects). +""" + +from enum import Enum +from typing import List, Optional + +from kubernetes.client import V1Job, V1ObjectMeta, V1Pod + + +class Job: + """Class to list worker pods (Kubernetes Pod objects).""" + + def __init__(self, job: V1Job): + """Initializes the Job with a Kubernetes job object.""" + self.job: V1Job = job + self.pods: List[V1Pod] = [] + + def get_job(self) -> V1Job: + """Returns the Kubernetes job object.""" + return self.job + + def add_pod(self, pod: V1Pod): + """Adds a single pod to the list.""" + self.pods.append(pod) + + def has_pods(self) -> bool: + """Checks if the job has any pods.""" + return bool(self.pods) + + def get_first_pod(self) -> Optional[V1Pod]: + """Returns arbitrarily chosen pod from the list. + + Currently the first one added or None if the job has no pods. + """ + if not self.has_pods(): + return None + return self.pods[0] + + def get_pods(self) -> List[V1Pod]: + """Returns the list of job pods. + + Returns in the order of addition to the list or an empty list if no pods. + """ + return self.pods + + def change_job_name(self, new_name: str): + """Changes the job name. + + Also the names in its metadata and container specs. + """ + if self.job.metadata is None: + self.job.metadata = V1ObjectMeta(name=new_name) + else: + self.job.metadata.name = new_name + + if ( + self.job is not None + and self.job.spec is not None + and self.job.spec.template is not None + and self.job.spec.template.metadata is not None + ): + self.job.spec.template.metadata.name = new_name + + if self.job.spec.template.spec and self.job.spec.template.spec.containers: + self.job.spec.template.spec.containers[0].name = new_name + + def get_job_name(self) -> Optional[str]: + """Returns the job name.""" + return self.job.metadata.name if self.job.metadata else None + + +class JobStatus(Enum): + """State of job.""" + + ACTIVE = "Active" + SUCCEEDED = "Succeeded" + FAILED = "Failed" diff --git a/tesk/api/kubernetes/convert/data/single_task_strategy.py b/tesk/api/kubernetes/convert/data/single_task_strategy.py new file mode 100644 index 0000000..dd84b3e --- /dev/null +++ b/tesk/api/kubernetes/convert/data/single_task_strategy.py @@ -0,0 +1,56 @@ +"""Tool aimed at building Kubernetes object structure of a single task. + +Job objects passed to its methods must be prefiltered and belong to a single task +(the class does not perform job objects filtering itself). +Pods must be added, when all jobs have already been added. +Thus, correct order of calls: +- Taskmaster TaskBuilder#addJobList(List) or TaskBuilder#addJob(V1Job) +- Executors and outputFiler TaskBuilder#addJobList(List) or TaskBuilder#addJob(V1Job) +- Pods by TaskBuilder#addPodList(List) +""" + +from typing import List, Optional + +from tesk.api.kubernetes.convert.data.build_strategy import BuildStrategy +from tesk.api.kubernetes.convert.data.job import Job +from tesk.api.kubernetes.convert.data.task import Task + + +class SingleTaskStrategy(BuildStrategy): + """Tool aimed at building Kubernetes object structure of a single task. + + Job objects passed to its methods must be prefiltered and belong to a single task + (the class does not perform job objects filtering itself). + Pods must be added, when all jobs have already been added. + Thus, correct order of calls: + - Taskmaster TaskBuilder#addJobList(List) or TaskBuilder#addJob(V1Job) + - Executors and outputFiler TaskBuilder#addJobList(List) or + TaskBuilder#addJob(V1Job) + - Pods by TaskBuilder#addPodList(List) + """ + + def __init__(self): + """Initialise SingleTaskStrategy.""" + self.task = None + + def add_taskmaster_job(self, taskmaster_job: Job): + """Add taskmaster job.""" + self.task = Task(taskmaster_job) + + def add_executor_job(self, executor_job: Job): + """Add executor job.""" + if self.task: + self.task.add_executor(executor_job) + + def add_output_filer_job(self, filer_job: Job): + """Add output filer job.""" + if self.task: + self.task.set_output_filer(filer_job) + + def get_task(self) -> Optional[Task]: + """Get task.""" + return self.task + + def get_task_list(self) -> List[Task]: + """Get single task as list.""" + return [self.task] if self.task else [] diff --git a/tesk/api/kubernetes/convert/data/task.py b/tesk/api/kubernetes/convert/data/task.py new file mode 100644 index 0000000..49fc5de --- /dev/null +++ b/tesk/api/kubernetes/convert/data/task.py @@ -0,0 +1,96 @@ +"""A composite that represents Kubernetes object's graph of a single TES task. + +- Taskmaster job with its pods. +- Executor jobs with its pods. +""" + +import re +from typing import Dict, List, Optional + +from kubernetes.client.models import V1Job, V1ObjectMeta + +from tesk.api.kubernetes.constants import Constants +from tesk.api.kubernetes.convert.data.job import Job + + +class Task: + """Task is a composite. + + It represents Kubernetes object's graph of a single TES task. + """ + + def __init__( + self, taskmaster: Optional[Job] = None, taskmaster_name: Optional[str] = None + ): + """Initialize the Task.""" + if taskmaster: + self.taskmaster = taskmaster + elif taskmaster_name: + job = V1Job(metadata=V1ObjectMeta(name=taskmaster_name)) + self.taskmaster = Job(job) + else: + raise ValueError("Either taskmaster or taskmaster_name must be provided") + + self.executors_by_name: Dict[str, Job] = {} + self.output_filer: Optional[Job] = None + self.constants = Constants() + self.MAX_INT = 2**31 - 1 + + def add_executor(self, executor: Job) -> None: + """Add executor to the task.""" + metadata = executor.get_job().metadata + assert metadata is not None + + name = metadata.name + assert name is not None + + self.executors_by_name.setdefault(name, executor) + + def set_output_filer(self, filer: Job): + """Set output filer for the task.""" + self.output_filer = filer + + def get_taskmaster(self) -> Job: + """Get taskmaster job.""" + return self.taskmaster + + def get_executors(self) -> List[Job]: + """Get executors.""" + return sorted(self.executors_by_name.values(), key=self.extract_executor_number) + + def get_last_executor(self) -> Optional[Job]: + """Get last executor.""" + if not self.executors_by_name: + return None + executors = self.get_executors() + return executors[-1] if executors else None + + def get_output_filer(self) -> Optional[Job]: + """Get output filer.""" + return self.output_filer + + def extract_executor_number(self, executor: Job) -> int: + """Extract executor number from the executor's name.""" + taskmaster_name = self.taskmaster.get_job_name() + assert taskmaster_name is not None + + prefix = taskmaster_name + self.constants.job_name_exec_prefix + exec_name = executor.get_job_name() + + if not exec_name: + return self.MAX_INT + + match = re.match(f"{re.escape(prefix)}(\d+)", exec_name) + if match: + return int(match.group(1)) + + return self.MAX_INT + + def get_executor_name(self, executor_index: int) -> str: + """Get executor name based on the taskmaster's job name and executor index.""" + taskmaster_name = self.taskmaster.get_job_name() + return ( + f"{taskmaster_name}" + f"{self.constants.job_name_exec_prefix}" + f"{str(executor_index).zfill(self.constants.job_name_exec_no_length)}" + ) diff --git a/tesk/api/kubernetes/convert/data/task_builder.py b/tesk/api/kubernetes/convert/data/task_builder.py new file mode 100644 index 0000000..0882c18 --- /dev/null +++ b/tesk/api/kubernetes/convert/data/task_builder.py @@ -0,0 +1,72 @@ +"""Module aimed at building Kubernetes object structure of a task or a list of tasks,. + +Gradually adding to it objects returned by calls to Kubernetes API (jobs and pods). +This class takes care of matching jobs with corresponding pods and holds a flat +collection (mapped by name) of resulting Job objects (can be both taskmasters and +executors belonging to the same or different task).Accepts a BuildStrategy BuildStrategy +, which implementing classes are responsible of creating, storing and maintaining the +actual {@link Task} object or {@link Task} object's list by implementing +BuildStrategy#addTaskMasterJob(Job) and BuildStrategy#addExecutorJob(Job). +""" + +from kubernetes.client import V1Job, V1Pod + +from tesk.api.kubernetes.constants import Constants +from tesk.api.kubernetes.convert.data.build_strategy import BuildStrategy +from tesk.api.kubernetes.convert.data.job import Job +from tesk.api.kubernetes.convert.data.single_task_strategy import SingleTaskStrategy +from tesk.api.kubernetes.convert.data.task_list_strategy import TaskListStrategy + + +class TaskBuilder: + @staticmethod + def new_single_task(): + return TaskBuilder(SingleTaskStrategy()) + + @staticmethod + def new_task_list(): + return TaskBuilder(TaskListStrategy()) + + def __init__(self, build_strategy: BuildStrategy): + self.build_strategy = build_strategy + self.all_jobs_by_name = {} + self.constants = Constants() + + def add_job(self, job: V1Job): + wrapped_job = Job(job) + job_type = job.metadata.labels.get(self.constants.label_jobtype_key) + if job_type == self.constants.label_jobtype_value_taskm: + self.build_strategy.add_taskmaster_job(wrapped_job) + elif job_type == self.constants.label_jobtype_value_exec: + self.build_strategy.add_executor_job(wrapped_job) + else: + self.build_strategy.add_output_filer_job(wrapped_job) + self.all_jobs_by_name[job.metadata.name] = wrapped_job + return self + + def add_job_list(self, jobs: list[V1Job]): + for job in jobs: + self.add_job(job) + return self + + def add_pod_list(self, pods: list[V1Pod]): + for pod in pods: + self.add_pod(pod) + return self + + def add_pod(self, pod: V1Pod): + for job in self.all_jobs_by_name.values(): + selectors = job.job.spec.selector.match_labels + labels = pod.metadata.labels + if all(item in labels.items() for item in selectors.items()): + job.add_pod(pod) + break + + def get_task(self): + return self.build_strategy.get_task() + + def get_task_list(self): + return self.build_strategy.get_task_list() + + def get_all_jobs_by_name(self): + return self.all_jobs_by_name diff --git a/tesk/api/kubernetes/convert/data/task_list_strategy.py b/tesk/api/kubernetes/convert/data/task_list_strategy.py new file mode 100644 index 0000000..7f8a14c --- /dev/null +++ b/tesk/api/kubernetes/convert/data/task_list_strategy.py @@ -0,0 +1,72 @@ +"""Module aimed at building Kubernetes object structure of a list of tasks. + +Accomplised by passing to it results of Kubernetes batch method calls. +All taskmaster jobs with unique names passed to it will get stored. +Only those executor jobs that match already stored taskmaster jobs will be stored +(filtering done by taskmaster's name and corresponding executor's label). +Pods must be added, when all jobs have already been added. +Thus, correct order of calls: +- Taskmasters by TaskBuilder#addJobList(List) +- Executors and outputFilers by TaskBuilder#addJobList(List) +- Pods by TaskBuilder#addPodList(List) +""" + +from typing import Dict, List, Optional + +from tesk.api.kubernetes.constants import Constants +from tesk.api.kubernetes.convert.data.build_strategy import BuildStrategy +from tesk.api.kubernetes.convert.data.job import Job +from tesk.api.kubernetes.convert.data.task import Task + + +class TaskListStrategy(BuildStrategy): + """Class for building Kubernetes object structure of a list of tasks. + + Accomplised by passing to it results of Kubernetes batch method calls. + All taskmaster jobs with unique names passed to it will get stored. + Only those executor jobs that match already stored taskmaster jobs will be stored + (filtering done by taskmaster's name and corresponding executor's label). + Pods must be added, when all jobs have already been added. + Thus, correct order of calls: + - Taskmasters by TaskBuilder#addJobList(List) + - Executors and outputFilers by TaskBuilder#addJobList(List) + - Pods by TaskBuilder#addPodList(List) + """ + + def __init__(self): + """Intialize TaskListStrategy.""" + self.tasks_by_id: Dict[str, Task] = {} + self.constants = Constants() + + def add_taskmaster_job(self, taskmaster_job: Job): + """Add taskmater job.""" + task_name = taskmaster_job.get_job().metadata.name + if task_name not in self.tasks_by_id: + self.tasks_by_id[task_name] = Task(taskmaster_job) + + def add_executor_job(self, executor_job: Job): + """Add executor Job.""" + taskmaster_name = executor_job.get_job().metadata.labels.get( + self.constants.label_testask_id_key + ) + if taskmaster_name in self.tasks_by_id: + self.tasks_by_id[taskmaster_name].add_executor(executor_job) + + def add_output_filer_job(self, filer_job: Job): + """Add output filer job.""" + output_filer_suffix = filer_job.get_job_name().find( + self.constants.job_name_filer_suf + ) + if output_filer_suffix == -1: + return + taskmaster_name = filer_job.get_job_name()[:output_filer_suffix] + if taskmaster_name in self.tasks_by_id: + self.tasks_by_id[taskmaster_name].set_output_filer(filer_job) + + def get_task(self) -> Optional[Task]: + """Get task.""" + raise NotImplementedError("This method is not supported for TaskListStrategy") + + def get_task_list(self) -> List[Task]: + """Get list of tasks.""" + return list(self.tasks_by_id.values()) diff --git a/tesk/api/kubernetes/convert/executor_command_wrapper.py b/tesk/api/kubernetes/convert/executor_command_wrapper.py new file mode 100644 index 0000000..ebda592 --- /dev/null +++ b/tesk/api/kubernetes/convert/executor_command_wrapper.py @@ -0,0 +1,54 @@ +"""Wraps list of executor's command. + +Such that: +- If any of executor's stdin/stdout/stderr params is set, the command runs in shell +- Each part of the original command (single command/argument) that contained shell + special chars is surrounded by single quotes, plus single quote inside such string + are replaced with '"'"' sequence +- `stdin`, `stdout`, `stderr` streams are redirected to paths according to executors + params +""" + +from typing import List + +from tesk.api.ga4gh.tes.models import TesExecutor + + +class ExecutorCommandWrapper: + """Wraps executor's command.""" + + def __init__(self, executor: TesExecutor): + """Initialize the wrapper.""" + self.executor = executor + + def get_commands_with_stream_redirects(self) -> List[str]: + """Get command with stream redirects.""" + result = [] + + if ( + not self.executor.stdin + and not self.executor.stdout + and not self.executor.stderr + ): + return self.executor.command + + result.append("/bin/sh") + result.append("-c") + + command_parts = [" ".join(self.executor.command)] + + if self.executor.stdin: + command_parts.append("<") + command_parts.append(self.executor.stdin) + + if self.executor.stdout: + command_parts.append(">") + command_parts.append(self.executor.stdout) + + if self.executor.stderr: + command_parts.append("2>") + command_parts.append(self.executor.stderr) + + result.append(" ".join(command_parts)) + + return result diff --git a/tesk/api/kubernetes/convert/taskmaster_env_properties.py b/tesk/api/kubernetes/convert/taskmaster_env_properties.py new file mode 100644 index 0000000..3ba0e30 --- /dev/null +++ b/tesk/api/kubernetes/convert/taskmaster_env_properties.py @@ -0,0 +1,74 @@ +"""Taskmaster environment properties model for the TESK.""" + +from typing import Dict, Optional + +from pydantic import BaseModel, Field + +from tesk.constants import TeskConstants + + +class FtpConfig(BaseModel): + """Ftp configuration model for the TESK.""" + + secretName: Optional[str] = Field( + default=None, description="Name of the secret with FTP account credentials" + ) + enabled: bool = Field( + default=False, + description="If FTP account enabled (based on non-emptiness of secretName)", + ) + + +class ExecutorSecret(BaseModel): + """Executor secret configuration.""" + + name: Optional[str] = Field( + default=None, + description=( + "Name of a secret that will be mounted as volume to each executor. The same" + " name will be used for the secret and the volume" + ), + ) + mountPath: Optional[str] = Field( + default=None, + alias="mountPath", + description="The path where the secret will be mounted to executors", + ) + enabled: bool = Field( + default=False, description="Indicates whether the secret is enabled" + ) + + +class TaskmasterEnvProperties(BaseModel): + """Taskmaster environment properties model for the TESK.""" + + imageName: str = Field( + default=TeskConstants.taskmaster_image_name, + description="Taskmaster image name", + ) + imageVersion: str = Field( + default=TeskConstants.taskmaster_image_version, + description="Taskmaster image version", + ) + filerImageName: str = Field( + default=TeskConstants.filer_image_name, description="Filer image name" + ) + filerImageVersion: str = Field( + default=TeskConstants.filer_image_version, description="Filer image version" + ) + ftp: FtpConfig = Field(default=None, description="Test FTP account settings") + debug: bool = Field( + default=False, + description="If verbose (debug) mode of taskmaster is on (passes additional " + "flag to taskmaster and sets image pull policy to Always)", + ) + environment: Optional[Dict[str, str]] = Field( + default=None, + description="Environment variables, that will be passed to taskmaster", + ) + serviceAccountName: str = Field( + default="default", description="Service Account name for taskmaster" + ) + executorSecret: Optional[ExecutorSecret] = Field( + default=None, description="Executor secret configuration" + ) diff --git a/tesk/api/kubernetes/convert/template.py b/tesk/api/kubernetes/convert/template.py new file mode 100644 index 0000000..b367c3c --- /dev/null +++ b/tesk/api/kubernetes/convert/template.py @@ -0,0 +1,202 @@ +"""Create template for kubernetes objects.""" + +import logging +import uuid +from typing import Iterable + +from kubernetes.client import ( + V1Container, + V1EnvVar, + V1JobSpec, + V1ObjectMeta, + V1PodSpec, + V1PodTemplateSpec, + V1ResourceRequirements, + V1SecretVolumeSource, + V1Volume, + V1VolumeMount, +) +from kubernetes.client.models import V1Job + +from tesk.api.kubernetes.constants import Constants, K8sConstants +from tesk.constants import TeskConstants +from tesk.custom_config import TaskmasterEnvProperties +from tesk.utils import get_taskmaster_env_property, get_taskmaster_template + +logger = logging.getLogger(__name__) + + +class KubernetesTemplateSupplier: + """Templates for tasmaster's and executor's job object..""" + + def __init__( + self, + # security_context=None + ): + """Initialize the converter.""" + self.taskmaster_template: V1Job = get_taskmaster_template() + self.taskmaster_env_properties: TaskmasterEnvProperties = ( + get_taskmaster_env_property() + ) + self.constants = Constants() + self.k8s_constants = K8sConstants() + self.tesk_constants = TeskConstants() + self.namespace = self.tesk_constants.tesk_namespace + # self.security_context = security_context + + def get_taskmaster_name(self) -> str: + """Generate a unique name for the taskmaster job.""" + name: str = self.constants.job_name_taskm_prefix + str(uuid.uuid4()) + return name + + def get_taskmaster_template_with_value_from_config(self) -> V1Job: + """Create a template for the taskmaster job.""" + job: V1Job = self.taskmaster_template + + if job.spec is None: + job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if job.spec.template.spec is None: + job.spec.template.spec = V1PodSpec(containers=[]) + + job.spec.template.spec.service_account_name = ( + self.taskmaster_env_properties.serviceAccountName + ) + + container = job.spec.template.spec.containers[0] + container.image = ( + f"{self.taskmaster_env_properties.imageName}:" + f"{self.taskmaster_env_properties.imageVersion}" + ) + + assert isinstance(container.args, Iterable) + + container.args.extend( + [ + "-n", + self.namespace, + "-fn", + self.taskmaster_env_properties.filerImageName, + "-fv", + self.taskmaster_env_properties.filerImageVersion, + ] + ) + + if self.taskmaster_env_properties.debug: + container.args.append("-d") + container.image_pull_policy = "Always" + + if job.metadata is None: + job.metadata = V1ObjectMeta(labels={}) + + if job.metadata.labels is None: + job.metadata.labels = V1ObjectMeta() + + job.metadata.labels[self.constants.label_jobtype_key] = ( + self.constants.label_jobtype_value_taskm + ) + taskmaster_name = self.get_taskmaster_name() + job.metadata.name = taskmaster_name + container.name = taskmaster_name + + assert isinstance(container.env, Iterable) + + if container.env is None: + container.env = V1EnvVar() + + if self.taskmaster_env_properties: + container.env.extend( + [ + V1EnvVar(name=key.upper().replace(".", "_"), value=value) + for key, value in self.taskmaster_env_properties.environment.items() + ] + ) + + # Set backoff env variables for `filer` and `executor` + backoff_limits = { + self.constants.filer_backoff_limit: self.tesk_constants.filer_backoff_limit, + self.constants.executor_backoff_limit: self.tesk_constants.executor_backoff_limit, + } + container.env.extend( + [V1EnvVar(name=key, value=value) for key, value in backoff_limits.items()] + ) + + ftp_secrets = [ + self.constants.ftp_secret_username_env, + self.constants.ftp_secret_password_env, + ] + container.env = [ + env + for env in container.env + if env.name not in ftp_secrets or self.taskmaster_env_properties.ftp.enabled + ] + + if self.taskmaster_env_properties.ftp.enabled: + for env in container.env: + if env.name in ftp_secrets: + assert env.value_from is not None + assert env.value_from.secret_key_ref is not None + env.value_from.secret_key_ref.name = ( + self.taskmaster_env_properties.ftp.secretName + ) + + return job + + def get_executor_template_with_value_from_config(self) -> V1Job: + """Create a template for the executor job.""" + container = V1Container( + name=self.constants.label_jobtype_value_exec, + resources=V1ResourceRequirements(), + ) + + if self.taskmaster_env_properties.executorSecret is not None: + container.volume_mounts = [ + V1VolumeMount( + read_only=True, + name=str(self.taskmaster_env_properties.executorSecret.name), + mount_path=str( + self.taskmaster_env_properties.executorSecret.mountPath + ), + ) + ] + + pod_spec = V1PodSpec( + containers=[container], + restart_policy=self.k8s_constants.job_restart_policy, + ) + + # if self.security_context: + # pod_spec.security_context = self.security_context + + job = V1Job( + api_version=self.k8s_constants.k8s_batch_api_version, + kind=self.k8s_constants.k8s_batch_api_job_type, + metadata=V1ObjectMeta( + labels={ + self.constants.label_jobtype_key: ( + self.constants.label_jobtype_value_exec + ) + } + ), + spec=V1JobSpec( + template=V1PodTemplateSpec(metadata=V1ObjectMeta(), spec=pod_spec) + ), + ) + + if self.taskmaster_env_properties.executorSecret is not None: + if job.spec is None: + job.spec = V1JobSpec(template=V1PodTemplateSpec()) + if job.spec.template.spec is None: + job.spec.template.spec = V1PodSpec(containers=[]) + + job.spec.template.spec.volumes = [ + V1Volume( + name=str(self.taskmaster_env_properties.executorSecret.name), + secret=V1SecretVolumeSource( + secret_name=self.taskmaster_env_properties.executorSecret.name + ), + ) + ] + + job.spec.template.spec.containers[0].restart_policy = "Never" + + return job diff --git a/tesk/api/specs/task_execution_service.117cd92.openapi.yaml b/tesk/api/specs/task_execution_service.117cd92.openapi.yaml index fb2a875..3d95882 100644 --- a/tesk/api/specs/task_execution_service.117cd92.openapi.yaml +++ b/tesk/api/specs/task_execution_service.117cd92.openapi.yaml @@ -582,7 +582,7 @@ components: type: integer description: Requested number of CPUs format: int32 - example: 4 + example: 1 preemptible: type: boolean description: |- @@ -595,12 +595,12 @@ components: type: number description: Requested RAM required in gigabytes (GB) format: double - example: 8 + example: 1 disk_gb: type: number description: Requested disk size in gigabytes (GB) format: double - example: 40 + example: 2 zones: type: array description: |- @@ -610,7 +610,8 @@ components: priority queue to which the job is assigned. items: type: string - example: us-west-1 + example: + - us-west-1 backend_parameters: type: object additionalProperties: diff --git a/tesk/app.py b/tesk/app.py index 1dd4883..7f3df49 100644 --- a/tesk/app.py +++ b/tesk/app.py @@ -46,7 +46,7 @@ def init_app() -> FlaskApp: def main() -> None: """Run FOCA application.""" app = init_app() - app.run(port=app.port) + app.run(port=8081) if __name__ == "__main__": diff --git a/tesk/constants.py b/tesk/constants.py new file mode 100644 index 0000000..dcdd5a9 --- /dev/null +++ b/tesk/constants.py @@ -0,0 +1,33 @@ +"""Tesk scoped constants.""" + +import os + + +class TeskConstants: + """Tesk's K8s scoped constants.""" + + filer_image_name: str = os.getenv( + "TESK_API_TASKMASTER_FILER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-filer" + ) + filer_image_version: str = os.getenv( + "TESK_API_TASKMASTER_FILER_IMAGE_VERSION", "latest" + ) + taskmaster_image_name: str = os.getenv( + "TESK_API_TASKMASTER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-taskmaster" + ) + taskmaster_image_version: str = os.getenv( + "TESK_API_TASKMASTER_IMAGE_VERSION", "latest" + ) + tesk_namespace: str = os.getenv("TESK_API_K8S_NAMESPACE", "tesk") + taskmaster_service_account_name: str = os.getenv( + "TESK_API_TASKMASTER_SERVICE_ACCOUNT_NAME", "taskmaster" + ) + taskmaster_environment_executor_backoff_limit: str = os.getenv( + "ENVIRONMENT_EXECUTOR_BACKOFF_LIMIT", "6" + ) + filer_backoff_limit: str = os.getenv("FILER_BACKOFF_LIMIT", "2") + executor_backoff_limit: str = os.getenv("EXECUTOR_BACKOFF_LIMIT", "2") + # TODO: IDK what to do with this + tesk_api_taskmaster_environment_filer_backoff_limit: str = os.getenv( + "TESK_API_TASKMASTER_ENVIRONMENT_FILER_BACKOFF_LIMIT", "6" + ) diff --git a/tesk/custom_config.py b/tesk/custom_config.py index ae1cbaf..961a225 100644 --- a/tesk/custom_config.py +++ b/tesk/custom_config.py @@ -1,8 +1,110 @@ """Custom configuration model for the FOCA app.""" -from pydantic import BaseModel +from typing import Dict, List, Optional + +from pydantic import BaseModel, Field from tesk.api.ga4gh.tes.models import Service +from tesk.api.kubernetes.convert.taskmaster_env_properties import ( + TaskmasterEnvProperties, +) + + +class PydanticK8sSecretKeyRef(BaseModel): + """Reference to a secret key.""" + + name: str + key: str + optional: Optional[bool] = Field(False, description="If the reference is optional") + + +class PydanticK8sEnvVarSource(BaseModel): + """Source for an environment variable.""" + + secretKeyRef: Optional[PydanticK8sSecretKeyRef] = None + + +class PydanticK8sEnvVar(BaseModel): + """Environment variable.""" + + name: str + valueFrom: Optional[PydanticK8sEnvVarSource] = None + + +class PydanticK8sVolumeMount(BaseModel): + """Volume mount configuration.""" + + name: str + mountPath: str + readOnly: bool + + +class PydanticK8sDownwardAPIItem(BaseModel): + """Downward API item configuration.""" + + path: str + fieldRef: Dict[str, str] + + +class PydanticK8sVolume(BaseModel): + """Volume configuration.""" + + name: str + downwardAPI: Optional[Dict[str, List[PydanticK8sDownwardAPIItem]]] = None + + +class PydanticK8sContainer(BaseModel): + """Container configuration.""" + + name: str + image: str + args: List[str] + env: List[PydanticK8sEnvVar] + volumeMounts: List[PydanticK8sVolumeMount] + + +class PydanticK8sPodSpec(BaseModel): + """Pod specification.""" + + serviceAccountName: str + containers: List[PydanticK8sContainer] + volumes: List[PydanticK8sVolume] + restartPolicy: str + + +class PydanticK8sPodMetadata(BaseModel): + """Pod metadata.""" + + name: str + + +class PydanticK8sPodTemplate(BaseModel): + """Pod template configuration.""" + + metadata: PydanticK8sPodMetadata + spec: PydanticK8sPodSpec + + +class PydanticK8sJobSpec(BaseModel): + """Job specification.""" + + template: PydanticK8sPodTemplate + + +class PydanticK8sJobMetadata(BaseModel): + """Job metadata.""" + + name: str + labels: Dict[str, str] + + +class PydanticK8sJob(BaseModel): + """Kubernetes Job configuration.""" + + apiVersion: str + kind: str + metadata: PydanticK8sJobMetadata + spec: PydanticK8sJobSpec class CustomConfig(BaseModel): @@ -10,3 +112,5 @@ class CustomConfig(BaseModel): # Define custom configuration fields here service_info: Service + taskmaster_template: PydanticK8sJob # This is Pydantic model for V1Job + taskmaster_env_properties: TaskmasterEnvProperties diff --git a/tesk/exceptions.py b/tesk/exceptions.py index 367064f..a6af9a1 100644 --- a/tesk/exceptions.py +++ b/tesk/exceptions.py @@ -1,5 +1,7 @@ """App exceptions.""" +from http import HTTPStatus + from connexion.exceptions import ( BadRequestProblem, ExtraParameterProblem, @@ -7,6 +9,7 @@ OAuthProblem, Unauthorized, ) +from kubernetes.client.exceptions import ApiException from werkzeug.exceptions import ( BadRequest, InternalServerError, @@ -71,3 +74,11 @@ class ConfigNotFoundError(FileNotFoundError): # exceptions raised outside of app context class ValidationError(Exception): """Value or object is not compatible with required type or schema.""" + + +class KubernetesError(ApiException): + """Kubernetes error.""" + + def is_object_name_duplicated(self) -> bool: + """Check if object name is duplicated.""" + return self.status == HTTPStatus.CONFLICT diff --git a/tesk/services/taskmaster.py b/tesk/services/taskmaster.py index a89a8d4..a0dcacb 100755 --- a/tesk/services/taskmaster.py +++ b/tesk/services/taskmaster.py @@ -28,7 +28,19 @@ def run_executor(executor, namespace, pvc=None): if os.environ.get('EXECUTOR_BACKOFF_LIMIT') is not None: executor['spec'].update( {'backoffLimit': int(os.environ['EXECUTOR_BACKOFF_LIMIT'])} - ) + ) + + if 'restartPolicy' not in spec.keys() and \ + 'restart_policy' in spec.keys(): + spec['restartPolicy'] = spec['restart_policy'] + + for container in spec['containers']: + if 'limits' not in container['resources'].keys(): + container['resources']['limits'] = None + if container['resources']['limits'] is None and \ + ('requests' in container['resources'].keys() and \ + container['resources']['requests'] is not None): + container['resources']['limits'] = container['resources']['requests'] if pvc is not None: mounts = spec['containers'][0].setdefault('volumeMounts', []) diff --git a/tesk/utils.py b/tesk/utils.py index 214ffd6..aca6b39 100644 --- a/tesk/utils.py +++ b/tesk/utils.py @@ -1,7 +1,47 @@ """Utility functions for the TESK package.""" import os +from decimal import Decimal +from enum import Enum from pathlib import Path +from typing import List, Optional, Sequence +import json + +from foca import Foca +from kubernetes.client.models import ( + V1Container, + V1DownwardAPIVolumeFile, + V1DownwardAPIVolumeSource, + V1EnvVar, + V1EnvVarSource, + V1Job, + V1JobSpec, + V1ObjectFieldSelector, + V1ObjectMeta, + V1PodSpec, + V1PodTemplateSpec, + V1SecretKeySelector, + V1Volume, + V1VolumeMount, +) +from pydantic import BaseModel + +from tesk.custom_config import ( + CustomConfig, + PydanticK8sContainer, + PydanticK8sDownwardAPIItem, + PydanticK8sEnvVar, + PydanticK8sEnvVarSource, + PydanticK8sJob, + PydanticK8sJobMetadata, + PydanticK8sJobSpec, + PydanticK8sPodSpec, + PydanticK8sPodTemplate, + PydanticK8sVolume, + PydanticK8sVolumeMount, + TaskmasterEnvProperties, +) +from tesk.exceptions import ConfigNotFoundError def get_config_path() -> Path: @@ -15,3 +55,189 @@ def get_config_path() -> Path: return Path(config_path_env).resolve() else: return (Path(__file__).parents[1] / "deployment" / "config.yaml").resolve() + + +def get_custom_config() -> CustomConfig: + """Get the custom configuration. + + Returns: + The custom configuration. + """ + conf = Foca(config_file=get_config_path()).conf + try: + return CustomConfig(**conf.custom) + except AttributeError: + raise ConfigNotFoundError( + "Custom configuration not found in config file." + ) from None + + +def get_taskmaster_template() -> V1Job: + """Get the taskmaster template from the custom configuration. + + Returns: + The taskmaster template. + """ + custom_conf = get_custom_config() + try: + return pydantic_k8s_job_to_v1job(custom_conf.taskmaster_template) + except AttributeError: + raise ConfigNotFoundError( + "Custom configuration doesn't seem to have taskmaster_template in config " + "file." + ) from None + + +def get_taskmaster_env_property() -> TaskmasterEnvProperties: + """Get the taskmaster env property from the custom configuration. + + Returns: + The taskmaster env property. + """ + custom_conf = get_custom_config() + try: + return custom_conf.taskmaster_env_properties + except AttributeError: + raise ConfigNotFoundError( + "Custom configuration doesn't seem to have taskmaster_env_properties in " + "config file." + ) from None + + +def pydantic_k8s_job_to_v1job(job: PydanticK8sJob) -> V1Job: + """Convert a pydantic job model to a V1Job object. + + FOCA validates config via pydantic models and kubernetes + client's models aren't written using pydantic. + + Returns: + The V1Job object. + """ + + def convert_env_var_source( + env_var_source: Optional[PydanticK8sEnvVarSource], + ) -> Optional[V1EnvVarSource]: + if env_var_source is None: + return None + if env_var_source.secretKeyRef: + return V1EnvVarSource( + secret_key_ref=V1SecretKeySelector( + name=env_var_source.secretKeyRef.name, + key=env_var_source.secretKeyRef.key, + optional=env_var_source.secretKeyRef.optional, + ) + ) + return None + + def convert_downward_api_item( + downward_api_item: PydanticK8sDownwardAPIItem, + ) -> V1DownwardAPIVolumeFile: + field_ref = None + if downward_api_item.fieldRef: + field_path = downward_api_item.fieldRef.get("fieldPath") + if field_path: + field_ref = V1ObjectFieldSelector( + api_version=downward_api_item.fieldRef.get("apiVersion"), + field_path=field_path, + ) + + return V1DownwardAPIVolumeFile( + path=downward_api_item.path, + field_ref=field_ref, + ) + + def convert_volume(volume: PydanticK8sVolume) -> V1Volume: + if volume.downwardAPI: + return V1Volume( + name=volume.name, + downward_api=V1DownwardAPIVolumeSource( + items=[ + convert_downward_api_item(item) + for item in volume.downwardAPI["items"] + ] + ), + ) + return V1Volume(name=volume.name) + + def convert_env_var(env_var: PydanticK8sEnvVar) -> V1EnvVar: + return V1EnvVar( + name=env_var.name, value_from=convert_env_var_source(env_var.valueFrom) + ) + + def convert_volume_mount(volume_mount: PydanticK8sVolumeMount) -> V1VolumeMount: + return V1VolumeMount( + name=volume_mount.name, + mount_path=volume_mount.mountPath, + read_only=volume_mount.readOnly, + ) + + def convert_container(container: PydanticK8sContainer) -> V1Container: + return V1Container( + name=container.name, + image=container.image, + args=container.args, + env=[convert_env_var(env) for env in container.env], + volume_mounts=[convert_volume_mount(vm) for vm in container.volumeMounts], + ) + + def convert_pod_spec(pod_spec: PydanticK8sPodSpec) -> V1PodSpec: + return V1PodSpec( + service_account_name=pod_spec.serviceAccountName, + containers=[ + convert_container(container) for container in pod_spec.containers + ], + volumes=[convert_volume(volume) for volume in pod_spec.volumes], + restart_policy=pod_spec.restartPolicy, + ) + + def convert_pod_template(pod_template: PydanticK8sPodTemplate) -> V1PodTemplateSpec: + return V1PodTemplateSpec( + metadata=V1ObjectMeta(name=pod_template.metadata.name), + spec=convert_pod_spec(pod_template.spec), + ) + + def convert_job_spec(job_spec: PydanticK8sJobSpec) -> V1JobSpec: + return V1JobSpec(template=convert_pod_template(job_spec.template)) + + def convert_job_metadata(job_metadata: PydanticK8sJobMetadata) -> V1ObjectMeta: + return V1ObjectMeta(name=job_metadata.name, labels=job_metadata.labels) + + return V1Job( + api_version=job.apiVersion, + kind=job.kind, + metadata=convert_job_metadata(job.metadata), + spec=convert_job_spec(job.spec), + ) + + +def pydantic_model_list_dict(model_list: Sequence[BaseModel]) -> List[str]: + """Convert a list of pydantic models to a list of dictionaries.""" + json_list = [] + for item in model_list: + json_list.append(json.loads(item.json())) + return json_list + + +def time_formatter(time: Optional[str]) -> Optional[str]: + """Set start time in RFC 3339 format.""" + rfc_time = time.strftime("%Y-%m-%dT%H:%M:%SZ") if time else None + return rfc_time + + +def decimal_to_float(obj): + """Convert decimal to float. + + Can be used to make Decimal serializable. + """ + if isinstance(obj, Decimal): + return float(obj) + raise TypeError + + +def enum_to_string(enum): + """Converts enum to string. + + Can be used to make Enums serializable. + """ + if isinstance(enum, Enum): + return str(enum.name)