Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add k8s client wrapper and constants #201

Merged
merged 21 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,092 changes: 1,091 additions & 1,001 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ boto3 = "1.34.104"
foca = "^0.13.0"
kubernetes = "^29.0.0"
python = "^3.11"
requests = ">=2.20.0"
requests = ">=2.32.0"
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
urllib3 = "^2.2.2"
werkzeug = "==2.2.3"
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved

[tool.poetry.group.docs.dependencies]
added-value = "^0.24.0"
Expand Down
1 change: 1 addition & 0 deletions tesk/api/kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Kubernetes API module for TESK."""
205 changes: 205 additions & 0 deletions tesk/api/kubernetes/constants.py
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
JaeAeich marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""Constants for Kubernetes API."""

from enum import Enum
from typing import Set

from pydantic import BaseModel, Field


class JobConstants(BaseModel):
"""Constants related to job and tasks."""

taskmaster_input: str = Field(
default="JSON_INPUT",
description="ENV var that serves as taskmaster script input (JSON format)",
)
taskmaster_input_exec_key: str = Field(
default="executors",
description="Key in JSON taskmaster input, which holds list of executors",
)
volume_name: str = Field(default="PVC", description="Volume name")
job_create_attempts_no: int = Field(
default=5,
description="Number of attempts of job creation in case of name collision",
)
job_name_taskm_prefix: str = Field(
default="task-",
description="Constant prefix of taskmaster's job name (== TES task ID)",
)
job_name_exec_prefix: str = Field(
default="-ex-",
description="Part of executor's job name, that follows taskmaster's name",
)
job_name_taskm_rand_part_length: int = Field(
default=4,
description=(
"No of bytes of random part of task master's name (which end up "
"encoded to hex)"
),
)
job_name_exec_no_length: int = Field(
default=2,
description="No of digits reserved for executor number in executor's job name."
" Ends up padded with '0' for numbers < 10",
)
job_name_filer_suf: str = Field(
default="-outputs-filer", description="Output filer name suffix"
)
resource_disk_default: float = Field(
default=0.1, description="Default resource disk value"
)
completed_states: Set[str] = Field(
default={"CANCELED", "COMPLETE", "EXECUTOR_ERROR", "SYSTEM_ERROR"},
description="TES task states, indicating task is not running and cannot be "
"cancelled",
)
executor_backoff_limit: str = Field(
default="EXECUTOR_BACKOFF_LIMIT",
description="Set a number of retries of a job execution.",
)
filer_backoff_limit: str = Field(
default="FILER_BACKOFF_LIMIT",
description="Set a number of retries of a filer job execution.",
)


class AnnotationConstants(BaseModel):
"""Constants related to Kubernetes annotations."""

ann_testask_name_key: str = Field(
default="tes-task-name",
description=(
"Key of the annotation, that stores name of TES task in both taskmaster's "
"job and executor's jobs"
),
)
ann_json_input_key: str = Field(
default="json-input",
description="Key of the annotation, that stores whole input TES task serialized"
" to JSON",
)


class LabelConstants(BaseModel):
"""Constants related to Kubernetes labels."""

label_testask_id_key: str = Field(
default="taskmaster-name",
description="Key of the label, that stores taskmaster's name (==TES task "
"generated ID) in executor jobs",
)
label_jobtype_key: str = Field(
default="job-type",
description="Key of the label, that stores type of a job (taskmaster or "
"executor)",
)
label_jobtype_value_taskm: str = Field(
default="taskmaster",
description="Value of the label with taskmaster's job type",
)
label_jobtype_value_exec: str = Field(
default="executor", description="Value of the label with executor's job type"
)
label_execno_key: str = Field(
default="executor-no",
description="Key of the label, that holds executor number for executor jobs",
)
label_taskstate_key: str = Field(
default="task-status",
description="Key of the label, that holds executor's state",
)
label_taskstate_value_canc: str = Field(
default="Cancelled",
description="Value of the label, that holds executor's Cancelled state",
)
label_userid_key: str = Field(
default="creator-user-id", description="Key of the label, that holds user id"
)
label_groupname_key: str = Field(
default="creator-group-name",
description="Key of the label, that holds user's group name",
)


class PathValidationConstants(BaseModel):
"""Constants related to path validation."""

absolute_path_regexp: str = Field(
default="^\\/.*", description="Pattern to validate paths"
)
absolute_path_message: str = Field(
default="must be an absolute path",
description="Message for absolute path validation (to avoid "
"message.properties)",
)


class FTPConstants(BaseModel):
"""Constants related to FTP configuration."""

ftp_secret_username_env: str = Field(
default="TESK_FTP_USERNAME",
description="Name of taskmaster's ENV variable with username of FTP account "
"used for storage",
)
ftp_secret_password_env: str = Field(
default="TESK_FTP_PASSWORD",
description="Name of taskmaster's ENV variable with password of FTP account "
"used for storage",
)


class PatchConstants(BaseModel):
"""Constants related to patch operations."""

cancel_patch: str = Field(
default='{"metadata":{"labels":{"task-status":"Cancelled"}}}',
description="Patch object passed to job API, when cancelling task",
)


class K8sConstants(BaseModel):
"""Constants related to Kubernetes."""

k8s_batch_api_version: str = Field(
default="batch/v1", description="Kubernetes Batch API version"
)
k8s_batch_api_job_type: str = Field(
default="Job", description="Kubernetes Job object type"
)
job_restart_policy: str = Field(
default="Never", description="Kubernetes Job restart policy"
)
resource_cpu_key: str = Field(
default="cpu", description="Executor CPU resource label"
)
resource_mem_key: str = Field(
default="memory", description="Executor memory resource label"
)
resource_mem_unit: str = Field(
default="Gi", description="Executor memory resource unit"
)
resource_mem_one_gb: int = Field(
default=1073741824, description="One Gibibyte (Gi) in bytes"
)

class PodPhase(Enum):
"""Pod state."""

PENDING = "Pending"

def get_code(self) -> str:
"""Return the pod state."""
return self.value


class TeskK8sConstants(BaseModel):
"""All the constants related to k8s and job creation."""

job_constants: JobConstants = JobConstants()
annotation_constants: AnnotationConstants = AnnotationConstants()
label_constants: LabelConstants = LabelConstants()
path_validation_constants: PathValidationConstants = PathValidationConstants()
ftp_constants: FTPConstants = FTPConstants()
patch_constants: PatchConstants = PatchConstants()
k8s_constants: K8sConstants = K8sConstants()
Loading
Loading