diff --git a/sky/provision/kubernetes/instance.py b/sky/provision/kubernetes/instance.py index cedc738644f..b6d24d47598 100644 --- a/sky/provision/kubernetes/instance.py +++ b/sky/provision/kubernetes/instance.py @@ -33,6 +33,7 @@ POLL_INTERVAL = 2 _TIMEOUT_FOR_POD_TERMINATION = 60 # 1 minutes _MAX_RETRIES = 3 +_MAX_MISSING_PODS_RETRIES = 5 _NUM_THREADS = subprocess_utils.get_parallel_threads('kubernetes') # Pattern to extract SSH user from command output, handling MOTD contamination @@ -489,17 +490,17 @@ def _evaluate_timeout() -> bool: @timeline.event -def _wait_for_pods_to_run(namespace, context, new_nodes): +def _wait_for_pods_to_run(namespace, context, cluster_name, new_pods): """Wait for pods and their containers to be ready. Pods may be pulling images or may be in the process of container creation. """ - if not new_nodes: + if not new_pods: return # Create a set of pod names we're waiting for - expected_pod_names = {node.metadata.name for node in new_nodes} + expected_pod_names = {pod.metadata.name for pod in new_pods} def _check_init_containers(pod): # Check if any of the init containers failed @@ -526,28 +527,62 @@ def _check_init_containers(pod): 'Failed to create init container for pod ' f'{pod.metadata.name}. Error details: {msg}.') + missing_pods_retry = 0 while True: # Get all pods in a single API call - cluster_name = new_nodes[0].metadata.labels[ + cluster_name_on_cloud = new_pods[0].metadata.labels[ k8s_constants.TAG_SKYPILOT_CLUSTER_NAME] all_pods = kubernetes.core_api(context).list_namespaced_pod( namespace, label_selector= - f'{k8s_constants.TAG_SKYPILOT_CLUSTER_NAME}={cluster_name}').items + f'{k8s_constants.TAG_SKYPILOT_CLUSTER_NAME}={cluster_name_on_cloud}' + ).items # Get the set of found pod names and check if we have all expected pods found_pod_names = {pod.metadata.name for pod in all_pods} - missing_pods = expected_pod_names - found_pod_names - if missing_pods: + missing_pod_names = expected_pod_names - found_pod_names + if missing_pod_names: + # In _wait_for_pods_to_schedule, we already wait for all pods to go + # from pending to scheduled. So if a pod is missing here, it means + # something unusual must have happened, and so should be treated as + # an exception. + # It is also only in _wait_for_pods_to_schedule that + # provision_timeout is used. + # TODO(kevin): Should we take provision_timeout into account here, + # instead of hardcoding the number of retries? + if missing_pods_retry >= _MAX_MISSING_PODS_RETRIES: + for pod_name in missing_pod_names: + reason = _get_pod_missing_reason(context, namespace, + cluster_name, pod_name) + logger.warning(f'Pod {pod_name} missing: {reason}') + raise config_lib.KubernetesError( + f'Failed to get all pods after {missing_pods_retry} ' + f'retries. Some pods may have been terminated or failed ' + f'unexpectedly. Run `sky logs --provision {cluster_name}` ' + 'for more details.') logger.info('Retrying running pods check: ' - f'Missing pods: {missing_pods}') + f'Missing pods: {missing_pod_names}') time.sleep(0.5) + missing_pods_retry += 1 continue all_pods_running = True for pod in all_pods: if pod.metadata.name not in expected_pod_names: continue + + # Check if pod is terminated/preempted/failed. + if (pod.metadata.deletion_timestamp is not None or + pod.status.phase == 'Failed'): + # Get the reason and write to cluster events before + # the pod gets completely deleted from the API. + reason = _get_pod_termination_reason(pod, cluster_name) + logger.warning(f'Pod {pod.metadata.name} terminated: {reason}') + raise config_lib.KubernetesError( + f'Pod {pod.metadata.name} has terminated or failed ' + f'unexpectedly. Run `sky logs --provision {cluster_name}` ' + 'for more details.') + # Continue if pod and all the containers within the # pod are successfully created and running. if pod.status.phase == 'Running' and all( @@ -1169,7 +1204,7 @@ def _create_resource_thread(i: int): # fail early if there is an error logger.debug(f'run_instances: waiting for pods to be running (pulling ' f'images): {[pod.metadata.name for pod in pods]}') - _wait_for_pods_to_run(namespace, context, pods) + _wait_for_pods_to_run(namespace, context, cluster_name, pods) logger.debug(f'run_instances: all pods are scheduled and running: ' f'{[pod.metadata.name for pod in pods]}') @@ -1428,9 +1463,45 @@ def get_cluster_info( def _get_pod_termination_reason(pod: Any, cluster_name: str) -> str: - """Get pod termination reason and write to cluster events.""" - reasons = [] + """Get pod termination reason and write to cluster events. + + Checks both pod conditions (for preemption/disruption) and + container statuses (for exit codes/errors). + """ latest_timestamp = pod.status.start_time or datetime.datetime.min + ready_state = 'Unknown' + termination_reason = 'Terminated unexpectedly' + container_reasons = [] + + # Check pod status conditions for high level overview. + # No need to sort, as each condition.type will only appear once. + for condition in pod.status.conditions: + reason = condition.reason or 'Unknown reason' + message = condition.message or '' + + # Get last known readiness state. + if condition.type == 'Ready': + ready_state = f'{reason} ({message})' if message else reason + # Kueue preemption, as defined in: + # https://pkg.go.dev/sigs.k8s.io/kueue/pkg/controller/jobs/pod#pkg-constants + elif condition.type == 'TerminationTarget': + termination_reason = f'Preempted by Kueue: {reason}' + if message: + termination_reason += f' ({message})' + # Generic disruption. + elif condition.type == 'DisruptionTarget': + termination_reason = f'Disrupted: {reason}' + if message: + termination_reason += f' ({message})' + + if condition.last_transition_time is not None: + latest_timestamp = max(latest_timestamp, + condition.last_transition_time) + + pod_reason = (f'{termination_reason}.\n' + f'Last known state: {ready_state}.') + + # Check container statuses for exit codes/errors if pod.status and pod.status.container_statuses: for container_status in pod.status.container_statuses: terminated = container_status.state.terminated @@ -1445,18 +1516,15 @@ def _get_pod_termination_reason(pod: Any, cluster_name: str) -> str: if reason is None: # just in-case reason is None, have default for debugging reason = f'exit({exit_code})' - reasons.append(reason) - if terminated.finished_at > latest_timestamp: - latest_timestamp = terminated.finished_at + container_reasons.append(reason) + latest_timestamp = max(latest_timestamp, terminated.finished_at) # TODO (kyuds): later, if needed, query `last_state` too. - if not reasons: - return '' - # Normally we will have a single container per pod for skypilot # but doing this just in-case there are multiple containers. - pod_reason = ' | '.join(reasons) + if container_reasons: + pod_reason += f'\nContainer errors: {" | ".join(container_reasons)}' global_user_state.add_cluster_event( cluster_name, @@ -1658,9 +1726,10 @@ def query_instances( Optional[str]]] = {} for pod in pods: phase = pod.status.phase + is_terminating = pod.metadata.deletion_timestamp is not None pod_status = status_map[phase] reason = None - if phase in ('Failed', 'Unknown'): + if phase in ('Failed', 'Unknown') or is_terminating: reason = _get_pod_termination_reason(pod, cluster_name) logger.debug(f'Pod Status ({phase}) Reason(s): {reason}') if non_terminated_only and pod_status is None: diff --git a/tests/smoke_tests/test_cluster_job.py b/tests/smoke_tests/test_cluster_job.py index 262d8843ee8..a919b54363d 100644 --- a/tests/smoke_tests/test_cluster_job.py +++ b/tests/smoke_tests/test_cluster_job.py @@ -1732,6 +1732,27 @@ def test_kubernetes_custom_image(image_id): smoke_tests_utils.run_one_test(test) +@pytest.mark.kubernetes +def test_kubernetes_pod_failure_detection(): + """Test that we detect pod failures and log useful details. + + We use busybox image because it doesn't have bash, + so we know the pod must fail. + """ + name = smoke_tests_utils.get_cluster_name() + test = smoke_tests_utils.Test( + 'test-kubernetes-pod-failure-detection', + [ + f'sky launch -c {name} {smoke_tests_utils.LOW_RESOURCE_ARG} -y --image-id docker:busybox:latest --infra kubernetes echo hi || true', + # Check that the provision logs contain the expected error message. + f's=$(sky logs --provision {name}) && echo "==Validating error message==" && echo "$s" && echo "$s" | grep -A 2 "Pod.*terminated:.*" | grep -A 2 "PodFailed" | grep "StartError"', + ], + f'sky down -y {name}', + timeout=10 * 60, + ) + smoke_tests_utils.run_one_test(test) + + @pytest.mark.azure def test_azure_start_stop_two_nodes(): name = smoke_tests_utils.get_cluster_name() diff --git a/tests/unit_tests/kubernetes/test_provision.py b/tests/unit_tests/kubernetes/test_provision.py index 6beacbb36f5..a3c050b8758 100644 --- a/tests/unit_tests/kubernetes/test_provision.py +++ b/tests/unit_tests/kubernetes/test_provision.py @@ -207,7 +207,7 @@ def mock_warning(msg, *args, **kwargs): def test_out_of_gpus_and_node_selector_failed(monkeypatch): """Test to check if the error message is correct when there is GPU resource - + shortage and node selector failed to match. """ @@ -401,3 +401,94 @@ def test_insufficient_resources_msg(monkeypatch): to_provision, requested_resources, insufficient_resources) == f'Failed to acquire resources (CPUs, Memory) in context {region} for {requested_resources_str}. ' ) + + +def test_pod_termination_reason_start_error(monkeypatch): + """Test _get_pod_termination_reason with StartError (like busybox). + + Pod is in Failed state with container terminated due to StartError. + """ + import datetime + + now = datetime.datetime(2025, 1, 1, 0, 0, 0) + + pod = mock.MagicMock() + pod.metadata.name = 'test-pod' + pod.status.start_time = now + + # Ready condition showing PodFailed + ready_condition = mock.MagicMock() + ready_condition.type = 'Ready' + ready_condition.reason = 'PodFailed' + ready_condition.message = '' + ready_condition.last_transition_time = now + + pod.status.conditions = [ready_condition] + + # Container with StartError + container_status = mock.MagicMock() + container_status.name = 'ray-node' + container_status.state.terminated = mock.MagicMock() + container_status.state.terminated.exit_code = 128 + container_status.state.terminated.reason = 'StartError' + container_status.state.terminated.finished_at = now + + pod.status.container_statuses = [container_status] + + monkeypatch.setattr('sky.provision.kubernetes.instance.global_user_state', + mock.MagicMock()) + + reason = instance._get_pod_termination_reason(pod, 'test-cluster') + + expected = ('Terminated unexpectedly.\n' + 'Last known state: PodFailed.\n' + 'Container errors: StartError') + assert reason == expected + + +def test_pod_termination_reason_kueue_preemption(monkeypatch): + """Test _get_pod_termination_reason with Kueue preemption. + + Pod is being terminated by Kueue due to PodsReady timeout. + Includes both the TerminationTarget condition (preemption) and + Ready condition (container status), as seen in real API responses. + """ + import datetime + + now = datetime.datetime(2025, 1, 1, 0, 0, 0) + + pod = mock.MagicMock() + pod.metadata.name = 'test-pod' + pod.status.start_time = now + + ready_condition = mock.MagicMock() + ready_condition.type = 'Ready' + ready_condition.reason = 'ContainersNotReady' + ready_condition.message = 'containers with unready status: [ray-node]' + ready_condition.last_transition_time = now + + # Taken from an actual Pod that got preempted by Kueue. + termination_condition = mock.MagicMock() + termination_condition.type = 'TerminationTarget' + termination_condition.reason = 'WorkloadEvictedDueToPodsReadyTimeout' + termination_condition.message = 'Exceeded the PodsReady timeout default/test-pod' + termination_condition.last_transition_time = now + + pod.status.conditions = [ready_condition, termination_condition] + + # Container still creating (not terminated) + container_status = mock.MagicMock() + container_status.state.terminated = None + pod.status.container_statuses = [container_status] + + monkeypatch.setattr('sky.provision.kubernetes.instance.global_user_state', + mock.MagicMock()) + + reason = instance._get_pod_termination_reason(pod, 'test-cluster') + + expected = ( + 'Preempted by Kueue: WorkloadEvictedDueToPodsReadyTimeout ' + '(Exceeded the PodsReady timeout default/test-pod).\n' + 'Last known state: ContainersNotReady (containers with unready status: [ray-node]).' + ) + assert reason == expected