Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create TES task as k8s job #200

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,32 @@ After the last executor, the `filer` is called once more to process the outputs
and push them to remote locations from the PVC. The PVC is the scrubbed, deleted
and the taskmaster ends, completing the task.

┌─────────────────────────────────────────────────────────┐
│ Kubernetes │
│ │
│ ┌────────────────────────────┐ ┌───────────────────┐ │
│ │ Secret: ftp-secret │ │ ConfigMap/PVC │ │
│ │ - username │ │ - JSON_INPUT.gz │ │
│ │ - password │ │ │ │
│ └──────────▲─────────────────┘ └───────▲───────────┘ │
│ │ | │
│ │ | │
│ │ | │
│ ┌─────────┴────────────────────────────┴────────────┐ │
│ │ Job: taskmaster │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ Pod: taskmaster │ │ │
│ │ │ - Container: taskmaster │ │ │
│ │ │ - Env: TESK_FTP_USERNAME │ │ │
│ │ │ - Env: TESK_FTP_PASSWORD │ │ │
│ │ │ - Args: -f /jsoninput/JSON_INPUT.gz │ │ │
│ │ │ - Mounts: /podinfo │ │ │
│ │ │ /jsoninput │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘

Comment on lines +58 to +83
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was just trying to understand k8s flow, please ignore, maybe will remove later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine for me, but the rendered version does not look good to me:

Screenshot from 2024-08-05 10-55-34

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle it's nice to have such a schema in the docs.

I am, however, wondering about the FTP secret. Maybe this will become clearer to me after looking at the code below, but seeing it represented like this in the doc, I have the concern that FTP is somehow treated special, when, ideally, storage solutions should all be treated in an abstract manner, like previously discussed: abstract storage handler and individual implementations for different storage/file transfer solutions. And that should probably extend to managing secrets/credentials as well, no?

## Requirements

- A working [Kubernetes](https://kubernetes.io/) cluster version 1.9 and later.
Expand Down
1 change: 1 addition & 0 deletions compressed_data
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"inputs": ["{\"name\": null, \"description\": null, \"url\": \"s3://my-object-store/file1\", \"path\": \"/data/file1\", \"type\": \"FILE\", \"content\": null, \"streamable\": null}"], "outputs": ["{\"name\": null, \"description\": null, \"url\": \"s3://my-object-store/outfile-1\", \"path\": \"/data/outfile\", \"path_prefix\": null, \"type\": \"FILE\"}"], "volumes": ["/vol/A/"], "resources": {"disk_gb": 40.0}, "executors": [{"api_version": "batch/v1", "kind": "Job", "metadata": {"annotations": {"tes-task-name": "string"}, "creation_timestamp": null, "deletion_grace_period_seconds": null, "deletion_timestamp": null, "finalizers": null, "generate_name": null, "generation": null, "labels": {"job-type": "executor", "taskmaster-name": "task-fef96900-6da7-4672-a1ec-14bf3d720c8f", "executor-no": "0"}, "managed_fields": null, "name": "newname", "namespace": null, "owner_references": null, "resource_version": null, "self_link": null, "uid": null}, "spec": {"active_deadline_seconds": null, "backoff_limit": null, "backoff_limit_per_index": null, "completion_mode": null, "completions": null, "manual_selector": null, "max_failed_indexes": null, "parallelism": null, "pod_failure_policy": null, "pod_replacement_policy": null, "selector": null, "suspend": null, "template": {"metadata": {"annotations": null, "creation_timestamp": null, "deletion_grace_period_seconds": null, "deletion_timestamp": null, "finalizers": null, "generate_name": null, "generation": null, "labels": null, "managed_fields": null, "name": "newname", "namespace": null, "owner_references": null, "resource_version": null, "self_link": null, "uid": null}, "spec": {"active_deadline_seconds": null, "affinity": null, "automount_service_account_token": null, "containers": [{"args": null, "command": ["/bin/sh", "-c", " < /data/file1 > /tmp/stdout.log 2> /tmp/stderr.log"], "env": [{"name": "BLASTDB", "value": "/data/GRC38", "value_from": null}, {"name": "HMMERDB", "value": "/data/hmmer", "value_from": null}], "env_from": null, "image": null, "image_pull_policy": null, "lifecycle": null, "liveness_probe": null, "name": "newname", "ports": null, "readiness_probe": null, "resize_policy": null, "resources": {"claims": null, "limits": null, "requests": {"cpu": 4.0, "memory": 8589934592.0}}, "restart_policy": null, "security_context": null, "startup_probe": null, "stdin": null, "stdin_once": null, "termination_message_path": null, "termination_message_policy": null, "tty": null, "volume_devices": null, "volume_mounts": null, "working_dir": "/data/"}], "dns_config": null, "dns_policy": null, "enable_service_links": null, "ephemeral_containers": null, "host_aliases": null, "host_ipc": null, "host_network": null, "host_pid": null, "host_users": null, "hostname": null, "image_pull_secrets": null, "init_containers": null, "node_name": null, "node_selector": null, "os": null, "overhead": null, "preemption_policy": null, "priority": null, "priority_class_name": null, "readiness_gates": null, "resource_claims": null, "restart_policy": "Never", "runtime_class_name": null, "scheduler_name": null, "scheduling_gates": null, "security_context": null, "service_account": null, "service_account_name": null, "set_hostname_as_fqdn": null, "share_process_namespace": null, "subdomain": null, "termination_grace_period_seconds": null, "tolerations": null, "topology_spread_constraints": null, "volumes": null}}, "ttl_seconds_after_finished": null}, "status": null}]}
2 changes: 1 addition & 1 deletion deployment/charts/tesk/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ host_name: ""
#

# 'openstack' or 's3'
storage: none
storage: s3
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: turn back to none.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏽

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be described in more detail somewhere. Also, if none is an allowed value, this should be listed, and not just openstack and s3.


# Configurable storage class.
storageClass:
Expand Down
73 changes: 73 additions & 0 deletions deployment/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,79 @@ custom:
tesResources_backend_parameters:
- VmSize
- ParamToRecogniseDataComingFromConfig
# Taskmaster configuration
taskmaster_template:
apiVersion: batch/v1
kind: Job
metadata:
name: taskmaster
labels:
app: taskmaster
spec:
template:
metadata:
name: taskmaster
spec:
serviceAccount: taskmaster
serviceAccountName: taskmaster
containers:
- name: taskmaster
image: docker.io/lvarin/tesk-core-taskmaster:testing
args:
- -f
- /jsoninput/JSON_INPUT.gz
env:
- name: TESK_FTP_USERNAME
valueFrom:
secretKeyRef:
name: ftp-secret
key: username
optional: true
- name: TESK_FTP_PASSWORD
valueFrom:
secretKeyRef:
name: ftp-secret
key: password
optional: true
volumeMounts:
- name: podinfo
mountPath: /podinfo
readOnly: true
- name: jsoninput
mountPath: /jsoninput
readOnly: true
volumes:
- name: podinfo
downwardAPI:
items:
- path: labels
fieldRef:
fieldPath: metadata.labels
restartPolicy: Never
# Taskmaster environment properties
taskmaster_env_properties:
# Taskmaster image name
imageName: docker.io/lvarin/tesk-core-taskmaster
# Taskmaster image version
imageVersion: testing
# Filer image name
filerImageName: docker.io/elixircloud/tesk-core-filer
# Filer image version
filerImageVersion: v0.10.2
# Test FTP account settings
ftp:
# Name of the secret with FTP account credentials
secretName: account-secret
# If FTP account enabled (based on non-emptiness of secretName)
enabled: true
# If verbose (debug) mode of taskmaster is on (passes additional flag to taskmaster and sets image pull policy to Always)
debug: false
# Environment variables, that will be passed to taskmaster
environment:
key: value
# Service Account name for taskmaster
serviceAccountName: taskmaster


# Logging configuration
# Cf. https://foca.readthedocs.io/en/latest/modules/foca.models.html#foca.models.config.LogConfig
Expand Down
1 change: 1 addition & 0 deletions encoded_data.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
H4sIAJUer2YC/9VWS3PbNhD+KxqeQ1N+xLYync4kbfqapIe0tzqDAYElhQivAKAsx+P/3gVJieDD7qWXXDTCt+A+v93FYya0bYLP3qz+yR7vMk0V3OFBN1K+Wt1lHDxzwgZhdAo3TsbjXeYv3xSFeshN+QVYyH0wDopKSDi/y+JFS8O2u1lwGmgqCg8WOtEvv39432HM6AA6pKZ8cEAVLeXJr6fs86tVZprwvzuOOqOD+TPO9+JBSKyDShxSA9OoOmf3RjYKOmcLPBRvixZ34E3jWCt5zLjwO1KX+P9qfbZ+QjkcgDXoWvvlY0atIHtwHoNCICtpYNtif47+ZDuhecT+MGU8Kgg0+tzqpVqbQGMqOjsBfB6o3+UxafEjzLHQdRYtMsx2vEmCQIcDVfYUHOZUQiurHWVALDhhOPGAVeN+4dqCikpoKsU3cMn9GjQ4GoD07ozhLtYek7QE2QXxxZR5THb0/5imGHkMTFEfwJ3Ca2OtoNpcb9br/JrTm/zq+uYip+fA8vOrsrrkNxdrdltlScpzbeK36zYrimpaAyeVAJmGerSg4b7920PeYn6GW+YeIyFIFXCgu2L3kmP9k6r2Eg+yIlLo3QA1gh87IF6wwLrqsiD2QDhQjvdhXo+Ssp2pojYlwjNwLCZBCsFhuMCMsn0lleGwJEisYIoaKtG6xH7CUiSCA6ko9g3vDKTxW+qolCCFVwmIpIofNA45ZqRgD2OZAysxvwoHxUw+N+8bzJTmAxAAvUe6tcl7oU+OwX4fDTFU4fslKq0wGyIk5aTYico0WGgPbi/QPmWsPQezA51SUgeKSk+T0tV+RFhMDO/Gbyl04bcxAzmLv6vVD6tkOa1+XBVB2cIHjvP+TJp6dTFA4FyE2uENet8ZOyb33Ye3f/3987uodE9l02Kd5l8//XR5e8JJ5YwaMjQo+O3jx/efFhVslQK3rKB3JcUQEQp5MDkSi4dZy0hRAXtgElJoj1zzHvebKeFFFlnjwognlIulT5FAyPKZ8fECZJIK5VM/cDSNtH9tsIH6y7aJm/Js3a47ZVxUe/v6drO5vHq9ucAF+tQZCNQtDQrWOOQaaZ8ch2QutvcbOw0Aiy/05EiMTtsHd47Cdu4mJuagTTk+E/7jxsSzkDZA93DAnonk9zO87Y0EvjcOHwI14cIN3MlainDtY6yVqJPBhNjUPOj41jr1W+zsxADYLQ5eHNpk1HG9dGt8IDjOqIcpKiybIBpCdHeCWsEnSONnNsZzMeE2VtVBmpA4TxZd1bjRJvO1heYLxCRfGRx/W6R4spAc4D5pCzrbVbgN3GicHRGCRMcWGZsfWqfG2b80e6ftMeN29ifsuzHhkBe4bBYNebYF3kic9It45M/EgxeaZTyWnxVMTUEgx0IS6kn1lae9taVx9TvDoHd+sqd8U3KjaNqPaWe9vI2Dkf0aHYGYRFM/EG9jHWKg+ChGC2HWdEegHS8hyKMFQit0gsQV5jGR6QbER0Vz+uzz07/JtQSDdQ0AAA==
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ requires = ["poetry-core"]

[tool.bandit]
skips = [
"B101", # Use of asserts is discouraged as it is removed during byte generation, we don't need that!
"B108", # Insecure usage of temp file/directory, false positive.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But are you sure that this will always be false positives?
If not, better to disable per line/block/module

"B321", # FTP-related functions are being called.
"B402", # A FTP-related module is being imported.
"B108" # Insecure usage of temp file/directory, false positive.
"B402" # A FTP-related module is being imported.
]

[tool.poetry]
Expand Down
34 changes: 26 additions & 8 deletions tesk/api/ga4gh/tes/controllers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
"""Controllers for GA4GH TES API endpoints."""

import logging

# from connexion import request # type: ignore
import json
from foca.utils.logging import log_traffic # type: ignore

from tesk.api.ga4gh.tes.models import TesTask
from tesk.api.ga4gh.tes.service_info.service_info import ServiceInfo
from tesk.api.ga4gh.tes.task.create_task import CreateTesTask
from tesk.exceptions import BadRequest, InternalServerError
from tesk.api.ga4gh.tes.task.get_task import GetTesTask
from tesk.api.ga4gh.tes.models import TaskView
from tesk.utils import enum_to_string

# Get logger instance
logger = logging.getLogger(__name__)
Expand All @@ -26,34 +31,47 @@ def CancelTask(id, *args, **kwargs) -> dict: # type: ignore

# POST /tasks
@log_traffic
def CreateTask(*args, **kwargs) -> dict: # type: ignore
def CreateTask(**kwargs) -> dict:
"""Create task.

Args:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
pass
try:
request_body = kwargs.get("body")
tes_task = TesTask(**request_body)
response = CreateTesTask(tes_task).response()
return response
except Exception as e:
raise InternalServerError from e


# GET /tasks/service-info
@log_traffic
def GetServiceInfo() -> dict: # type: ignore
def GetServiceInfo() -> dict:
"""Get service info."""
service_info = ServiceInfo()
return service_info.response()


# GET /tasks
@log_traffic
def ListTasks(*args, **kwargs) -> dict: # type: ignore
def ListTasks(*args, **kwargs):
"""List all available tasks.

Args:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
pass
get_tes_task = GetTesTask()
view = TaskView(kwargs.get("view"))
name_prefix = kwargs.get("name_prefix")
page_size = kwargs.get("page_size")
page_token = kwargs.get("page_token")
response = get_tes_task.list_tasks(
view=view, name_prefix=name_prefix, page_size=page_size, page_token=page_token
)
return json.loads(response.json())


# GET /tasks
Expand Down
10 changes: 9 additions & 1 deletion tesk/api/ga4gh/tes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class TesResources(BaseModel):
example={"VmSize": "Standard_D64_v3"},
)
backend_parameters_strict: Optional[bool] = Field(
False,
default=False,
description="If set to true, backends should fail the task if any "
"backend_parameters\nkey/values are unsupported, otherwise, backends should "
"attempt to run the task",
Expand Down Expand Up @@ -568,3 +568,11 @@ class TesListTasksResponse(BaseModel):
description="Token used to return the next page of results. This value can be "
"used\nin the `page_token` field of the next ListTasks request.",
)


class TaskView(str, Enum):
"""View of task for API response."""

MINIMAL = "MINIMAL"
BASIC = "BASIC"
FULL = "FULL"
1 change: 1 addition & 0 deletions tesk/api/ga4gh/tes/task/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Task API controller logic."""
106 changes: 106 additions & 0 deletions tesk/api/ga4gh/tes/task/create_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""TESK API module for creating a task."""

import logging

from pydantic import BaseModel

from tesk.api.ga4gh.tes.models import TesCreateTaskResponse, TesResources, TesTask
from tesk.api.ga4gh.tes.task.task_request import TesTaskRequest
from tesk.exceptions import KubernetesError
import os
from datetime import datetime
logger = logging.getLogger(__name__)


class CreateTesTask(TesTaskRequest):
"""Create TES task."""

# TODO: Add user to the class when auth implemented in FOCA
def __init__(
self,
task: TesTask,
# user: User
):
"""Initialize the CreateTask class.

Args:
task: TES task to create.
"""
super().__init__()
self.task = task
# self.user = user

def create_task(self) -> TesCreateTaskResponse:
"""Create TES task."""
attempts_no = 0
while attempts_no < self.constants.job_create_attempts_no:
try:
attempts_no += 1
resources = self.task.resources
limits = self.kubernetes_client_wrapper.list_limits()
print("limits", limits)
minimum_ram_gb = self.kubernetes_client_wrapper.minimum_ram_gb()

if not self.task.resources:
self.task.resources = TesResources(cpu_cores=int(minimum_ram_gb))
if resources and resources.ram_gb and resources.ram_gb < minimum_ram_gb:
self.task.resources.ram_gb = minimum_ram_gb

taskmaster_job = self.tes_kubernetes_converter.from_tes_task_to_k8s_job(
self.task,
# self.user
)

taskmaster_config_map = (
self.tes_kubernetes_converter.from_tes_task_to_k8s_config_map(
self.task,
taskmaster_job,
# self.user
)
)

configmap = self.kubernetes_client_wrapper.create_config_map(taskmaster_config_map)
created_job = self.kubernetes_client_wrapper.create_job(taskmaster_job)

os.makedirs("/tmp/tesk", exist_ok=True)
output_log_path = f"/tmp/tesk/output-at-{datetime.now().strftime('%H:%M:%S')}.log"
with open(output_log_path, "w") as f:
f.write("*********************************\n")
f.write(f"Manifest consumed to create taskmaster job as {type(taskmaster_job)}\n")
f.write(str(taskmaster_job) + "\n")
f.write("*********************************\n")

f.write("*********************************\n")
f.write(f"Manifest consumed to create config map as {type(taskmaster_config_map)}\n")
f.write(str(taskmaster_config_map) + "\n")
f.write("*********************************\n")

# Create ConfigMap and Job
f.write("*********************************\n")
f.write(f"Created ConfigMap as {type(configmap)}\n")
f.write(str(configmap) + "\n")
f.write("*********************************\n")

f.write("*********************************\n")
f.write(f"Created taskmaster job as {type(created_job)}\n")
f.write(str(created_job) + "\n")
f.write("*********************************\n")
assert created_job.metadata is not None
assert created_job.metadata.name is not None

return TesCreateTaskResponse(id=created_job.metadata.name)

except KubernetesError as e:
if (
not e.is_object_name_duplicated()
or attempts_no >= self.constants.job_create_attempts_no
):
raise e

except Exception as exc:
logging.error("ERROR: In createTask", exc_info=True)
raise exc
return {} # Dummy return to silence mypy

def handle_request(self) -> BaseModel:
return self.create_task()
Loading
Loading