Skip to content
101 changes: 86 additions & 15 deletions sky/provision/kubernetes/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
POLL_INTERVAL = 2
_TIMEOUT_FOR_POD_TERMINATION = 60 # 1 minutes
_MAX_RETRIES = 3
_MAX_MISSING_PODS_RETRIES = 5
_NUM_THREADS = subprocess_utils.get_parallel_threads('kubernetes')

# Pattern to extract SSH user from command output, handling MOTD contamination
Expand Down Expand Up @@ -526,28 +527,64 @@ def _check_init_containers(pod):
'Failed to create init container for pod '
f'{pod.metadata.name}. Error details: {msg}.')

missing_pods_retry = 0
while True:
# Get all pods in a single API call
cluster_name = new_nodes[0].metadata.labels[
cluster_name_on_cloud = new_nodes[0].metadata.labels[
k8s_constants.TAG_SKYPILOT_CLUSTER_NAME]
all_pods = kubernetes.core_api(context).list_namespaced_pod(
namespace,
label_selector=
f'{k8s_constants.TAG_SKYPILOT_CLUSTER_NAME}={cluster_name}').items
f'{k8s_constants.TAG_SKYPILOT_CLUSTER_NAME}={cluster_name_on_cloud}'
).items
cluster_name = (kubernetes_utils.get_cluster_name_from_cloud_name(
cluster_name_on_cloud))

# Get the set of found pod names and check if we have all expected pods
found_pod_names = {pod.metadata.name for pod in all_pods}
missing_pods = expected_pod_names - found_pod_names
if missing_pods:
missing_pod_names = expected_pod_names - found_pod_names
if missing_pod_names:
# In _wait_for_pods_to_schedule, we already wait for all pods to go
# from pending to scheduled. So if a pod is missing here, it means
# something unusual must have happened, and so should be treated as
# an exception.
# It is also only in _wait_for_pods_to_schedule that
# provision_timeout is used.
# TODO(kevin): Should we take provision_timeout into account here,
# instead of hardcoding the number of retries?
if missing_pods_retry >= _MAX_MISSING_PODS_RETRIES:
for pod_name in missing_pod_names:
reason = _get_pod_missing_reason(context, namespace,
cluster_name, pod_name)
logger.warning(f'Pod {pod_name} missing: {reason}')
raise config_lib.KubernetesError(
f'Failed to get all pods after {missing_pods_retry} '
f'retries. Some pods may have been terminated or failed '
f'unexpectedly. Run `sky logs --provision {cluster_name}` '
'for more details.')
logger.info('Retrying running pods check: '
f'Missing pods: {missing_pods}')
f'Missing pods: {missing_pod_names}')
time.sleep(0.5)
missing_pods_retry += 1
continue

all_pods_running = True
for pod in all_pods:
if pod.metadata.name not in expected_pod_names:
continue

# Check if pod is terminated/preempted/failed.
if (pod.metadata.deletion_timestamp is not None or
pod.status.phase == 'Failed'):
# Get the reason and write to cluster events before
# the pod gets completely deleted from the API.
reason = _get_pod_termination_reason(pod, cluster_name)
logger.warning(f'Pod {pod.metadata.name} terminated: {reason}')
raise config_lib.KubernetesError(
f'Pod {pod.metadata.name} has terminated or failed '
f'unexpectedly. Run `sky logs --provision {cluster_name}` '
'for more details.')

# Continue if pod and all the containers within the
# pod are successfully created and running.
if pod.status.phase == 'Running' and all(
Expand Down Expand Up @@ -1428,9 +1465,45 @@ def get_cluster_info(


def _get_pod_termination_reason(pod: Any, cluster_name: str) -> str:
"""Get pod termination reason and write to cluster events."""
reasons = []
"""Get pod termination reason and write to cluster events.

Checks both pod conditions (for preemption/disruption) and
container statuses (for exit codes/errors).
"""
latest_timestamp = pod.status.start_time or datetime.datetime.min
ready_state = 'Unknown'
termination_reason = 'Terminated unexpectedly'
container_reasons = []

# Check pod status conditions for high level overview.
# No need to sort, as each condition.type will only appear once.
for condition in pod.status.conditions:
reason = condition.reason or 'Unknown reason'
message = condition.message or ''

# Get last known readiness state.
if condition.type == 'Ready':
ready_state = f'{reason} ({message})' if message else reason
# Kueue preemption, as defined in:
# https://pkg.go.dev/sigs.k8s.io/kueue/pkg/controller/jobs/pod#pkg-constants
elif condition.type == 'TerminationTarget':
termination_reason = f'Preempted by Kueue: {reason}'
if message:
termination_reason += f' ({message})'
# Generic disruption.
elif condition.type == 'DisruptionTarget':
termination_reason = f'Disrupted: {reason}'
if message:
termination_reason += f' ({message})'

if condition.last_transition_time is not None:
latest_timestamp = max(latest_timestamp,
condition.last_transition_time)

pod_reason = (f'{termination_reason}.\n'
f'Last known state: {ready_state}.')

# Check container statuses for exit codes/errors
if pod.status and pod.status.container_statuses:
for container_status in pod.status.container_statuses:
terminated = container_status.state.terminated
Expand All @@ -1445,18 +1518,15 @@ def _get_pod_termination_reason(pod: Any, cluster_name: str) -> str:
if reason is None:
# just in-case reason is None, have default for debugging
reason = f'exit({exit_code})'
reasons.append(reason)
if terminated.finished_at > latest_timestamp:
latest_timestamp = terminated.finished_at
container_reasons.append(reason)
latest_timestamp = max(latest_timestamp, terminated.finished_at)

# TODO (kyuds): later, if needed, query `last_state` too.

if not reasons:
return ''

# Normally we will have a single container per pod for skypilot
# but doing this just in-case there are multiple containers.
pod_reason = ' | '.join(reasons)
if container_reasons:
pod_reason += f'\nContainer errors: {" | ".join(container_reasons)}'

global_user_state.add_cluster_event(
cluster_name,
Expand Down Expand Up @@ -1658,9 +1728,10 @@ def query_instances(
Optional[str]]] = {}
for pod in pods:
phase = pod.status.phase
is_terminating = pod.metadata.deletion_timestamp is not None
pod_status = status_map[phase]
reason = None
if phase in ('Failed', 'Unknown'):
if phase in ('Failed', 'Unknown') or is_terminating:
reason = _get_pod_termination_reason(pod, cluster_name)
logger.debug(f'Pod Status ({phase}) Reason(s): {reason}')
if non_terminated_only and pod_status is None:
Expand Down
13 changes: 10 additions & 3 deletions sky/provision/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2886,6 +2886,15 @@ def get_head_pod_name(cluster_name_on_cloud: str):
return f'{cluster_name_on_cloud}-head'


def get_cluster_name_from_cloud_name(cluster_name_on_cloud: str) -> str:
"""Extracts the cluster name from cluster_name_on_cloud.

cluster_name_on_cloud has the format: {cluster_name}-{user_hash}
(e.g., 'mycluster-2ea4').
"""
return cluster_name_on_cloud.rsplit('-', 1)[0]


def get_custom_config_k8s_contexts() -> List[str]:
"""Returns the list of context names from the config"""
contexts = skypilot_config.get_effective_region_config(
Expand Down Expand Up @@ -3440,9 +3449,7 @@ def process_skypilot_pods(

for pod in pods:
cluster_name_on_cloud = pod.metadata.labels.get('skypilot-cluster')
cluster_name = cluster_name_on_cloud.rsplit(
'-', 1
)[0] # Remove the user hash to get cluster name (e.g., mycluster-2ea4)
cluster_name = get_cluster_name_from_cloud_name(cluster_name_on_cloud)
if cluster_name_on_cloud not in clusters:
# Parse the start time for the cluster
start_time = pod.status.start_time
Expand Down
21 changes: 21 additions & 0 deletions tests/smoke_tests/test_cluster_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,27 @@ def test_kubernetes_custom_image(image_id):
smoke_tests_utils.run_one_test(test)


@pytest.mark.kubernetes
def test_kubernetes_pod_failure_detection():
"""Test that we detect pod failures and log useful details.

We use busybox image because it doesn't have bash,
so we know the pod must fail.
"""
name = smoke_tests_utils.get_cluster_name()
test = smoke_tests_utils.Test(
'test-kubernetes-pod-failure-detection',
[
f'sky launch -c {name} {smoke_tests_utils.LOW_RESOURCE_ARG} -y --image-id docker:busybox:latest --infra kubernetes echo hi || true',
# Check that the provision logs contain the expected error message.
f's=$(sky logs --provision {name}) && echo "==Validating error message==" && echo "$s" && echo "$s" | grep -A 2 "Pod.*terminated:.*" | grep -A 2 "PodFailed" | grep "StartError"',
],
f'sky down -y {name}',
timeout=10 * 60,
)
smoke_tests_utils.run_one_test(test)


@pytest.mark.azure
def test_azure_start_stop_two_nodes():
name = smoke_tests_utils.get_cluster_name()
Expand Down
93 changes: 92 additions & 1 deletion tests/unit_tests/kubernetes/test_provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def mock_warning(msg, *args, **kwargs):

def test_out_of_gpus_and_node_selector_failed(monkeypatch):
"""Test to check if the error message is correct when there is GPU resource

shortage and node selector failed to match.
"""

Expand Down Expand Up @@ -401,3 +401,94 @@ def test_insufficient_resources_msg(monkeypatch):
to_provision, requested_resources, insufficient_resources) ==
f'Failed to acquire resources (CPUs, Memory) in context {region} for {requested_resources_str}. '
)


def test_pod_termination_reason_start_error(monkeypatch):
"""Test _get_pod_termination_reason with StartError (like busybox).

Pod is in Failed state with container terminated due to StartError.
"""
import datetime

now = datetime.datetime(2025, 1, 1, 0, 0, 0)

pod = mock.MagicMock()
pod.metadata.name = 'test-pod'
pod.status.start_time = now

# Ready condition showing PodFailed
ready_condition = mock.MagicMock()
ready_condition.type = 'Ready'
ready_condition.reason = 'PodFailed'
ready_condition.message = ''
ready_condition.last_transition_time = now

pod.status.conditions = [ready_condition]

# Container with StartError
container_status = mock.MagicMock()
container_status.name = 'ray-node'
container_status.state.terminated = mock.MagicMock()
container_status.state.terminated.exit_code = 128
container_status.state.terminated.reason = 'StartError'
container_status.state.terminated.finished_at = now

pod.status.container_statuses = [container_status]

monkeypatch.setattr('sky.provision.kubernetes.instance.global_user_state',
mock.MagicMock())

reason = instance._get_pod_termination_reason(pod, 'test-cluster')

expected = ('Terminated unexpectedly.\n'
'Last known state: PodFailed.\n'
'Container errors: StartError')
assert reason == expected


def test_pod_termination_reason_kueue_preemption(monkeypatch):
"""Test _get_pod_termination_reason with Kueue preemption.

Pod is being terminated by Kueue due to PodsReady timeout.
Includes both the TerminationTarget condition (preemption) and
Ready condition (container status), as seen in real API responses.
"""
import datetime

now = datetime.datetime(2025, 1, 1, 0, 0, 0)

pod = mock.MagicMock()
pod.metadata.name = 'test-pod'
pod.status.start_time = now

ready_condition = mock.MagicMock()
ready_condition.type = 'Ready'
ready_condition.reason = 'ContainersNotReady'
ready_condition.message = 'containers with unready status: [ray-node]'
ready_condition.last_transition_time = now

# Taken from an actual Pod that got preempted by Kueue.
termination_condition = mock.MagicMock()
termination_condition.type = 'TerminationTarget'
termination_condition.reason = 'WorkloadEvictedDueToPodsReadyTimeout'
termination_condition.message = 'Exceeded the PodsReady timeout default/test-pod'
termination_condition.last_transition_time = now

pod.status.conditions = [ready_condition, termination_condition]

# Container still creating (not terminated)
container_status = mock.MagicMock()
container_status.state.terminated = None
pod.status.container_statuses = [container_status]

monkeypatch.setattr('sky.provision.kubernetes.instance.global_user_state',
mock.MagicMock())

reason = instance._get_pod_termination_reason(pod, 'test-cluster')

expected = (
'Preempted by Kueue: WorkloadEvictedDueToPodsReadyTimeout '
'(Exceeded the PodsReady timeout default/test-pod).\n'
'Last known state: ContainersNotReady (containers with unready status: [ray-node]).'
)
assert reason == expected