Skip to content

Commit

Permalink
Handle spot instance eviction in Kubernetes Infrastructure Block (#10426
Browse files Browse the repository at this point in the history
)
  • Loading branch information
zangell44 authored Aug 17, 2023
1 parent 8ec9174 commit c0a4795
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 2 deletions.
19 changes: 17 additions & 2 deletions src/prefect/infrastructure/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,9 +707,9 @@ def _watch_job(self, job_name: str) -> int:
watch.stop()
break

with self.get_client() as client:
with self.get_client() as core_client:
# Get all pods for the job
pods = client.list_namespaced_pod(
pods = core_client.list_namespaced_pod(
namespace=self.namespace, label_selector=f"job-name={job_name}"
)
# Get the status for only the most recently used pod
Expand All @@ -726,6 +726,21 @@ def _watch_job(self, job_name: str) -> int:
self.logger.error(f"Job {job_name!r}: No pods found for job.")
return -1

# In some cases, such as spot instance evictions, the pod will be forcibly
# terminated and not report a status correctly.
elif (
first_container_status.state is None
or first_container_status.state.terminated is None
or first_container_status.state.terminated.exit_code is None
):
self.logger.error(
f"Could not determine exit code for {job_name!r}."
"Exit code will be reported as -1."
"First container status info did not report an exit code."
f"First container info: {first_container_status}."
)
return -1

return first_container_status.state.terminated.exit_code

def _create_job(self, job_manifest: KubernetesManifest) -> "V1Job":
Expand Down
37 changes: 37 additions & 0 deletions tests/infrastructure/test_kubernetes_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,43 @@ def mock_stream(*args, **kwargs):
assert result.status_code == -1


async def test_watch_handles_pod_without_exit_code(
mock_k8s_client,
mock_watch,
mock_k8s_batch_client,
):
# The job should not be completed to start
mock_k8s_batch_client.read_namespaced_job.return_value.status.completion_time = None
job_pod = MagicMock(spec=kubernetes.client.V1Pod)
job_pod.status.phase = "Running"
mock_container_status = MagicMock(spec=kubernetes.client.V1ContainerStatus)
# The container may exist but because it has been forcefully terminated
# it will not have an exit code.
mock_container_status.state.terminated = None
job_pod.status.container_statuses = [mock_container_status]
mock_k8s_client.list_namespaced_pod.return_value.items = [job_pod]

def mock_stream(*args, **kwargs):
if kwargs["func"] == mock_k8s_client.list_namespaced_pod:
yield {"object": job_pod}

if kwargs["func"] == mock_k8s_batch_client.list_namespaced_job:
job = MagicMock(spec=kubernetes.client.V1Job)

# Yield the job then return exiting the stream
job.status.completion_time = None
job.spec.backoff_limit = 6
for i in range(0, 8):
job.status.failed = i
yield {"object": job, "type": "ADDED"}

mock_watch.stream.side_effect = mock_stream

result = await KubernetesJob(command=["echo", "hello"]).run()

assert result.status_code == -1


class TestCustomizingBaseJob:
"""Tests scenarios where a user is providing a customized base Job template"""

Expand Down

0 comments on commit c0a4795

Please sign in to comment.