diff --git a/nemo_run/core/execution/kubeflow.py b/nemo_run/core/execution/kubeflow.py index 525a7c0d..11f4fe3c 100644 --- a/nemo_run/core/execution/kubeflow.py +++ b/nemo_run/core/execution/kubeflow.py @@ -100,6 +100,11 @@ class KubeflowExecutor(Executor): pod_labels: dict[str, Any] = field(default_factory=dict) tolerations: list[dict[str, Any]] = field(default_factory=list) affinity: dict[str, Any] = field(default_factory=dict) + # Shell commands run once per pod in launch.sh, BEFORE the training command + # (e.g. installing a missing dependency into the container venv). They run + # before torchrun spawns the per-GPU ranks, so each executes exactly once + # per node (not once per rank) and under errexit (a failure aborts the pod). + setup_commands: list[str] = field(default_factory=list) # env_list accepts full env var dicts (e.g. valueFrom/secretKeyRef). # Simple key=value pairs should use the inherited env_vars dict instead. env_list: list[dict[str, Any]] = field(default_factory=list) @@ -930,6 +935,7 @@ def materialize_launch_script(self, cmd: list[str], max_retries: int = 0) -> Non env_vars=env_var_lines, max_retries=max_retries, code_dir=self.code_dir, + setup_commands=self.setup_commands, ) os.makedirs(self.job_dir, exist_ok=True) launch_script_path = os.path.join(self.job_dir, "launch.sh") @@ -1055,6 +1061,28 @@ def pull_results(self, job_name: str, dest_dir: Optional[str] = None) -> None: self.copy_from_workspace(self.code_dir, local_path, label=job_name) + def cleanup(self, handle: str) -> None: + """Mirror run outputs from the workdir PVC back to ``job_dir``, then tear down. + + On Kubernetes the training pods write every recipe output — including the + PyTorch profiler chrome trace and CUDA memory snapshot, which land under + ``/nemo_run`` (the PVC ``code_dir``) — to the shared volume. The launcher + that collects artifacts and parses logs only sees the local ``job_dir``, + so without a copy-back those outputs are stranded on the PVC. Reuse the + existing data-mover (:meth:`pull_results`) to bring them back before + teardown. Best-effort: a failed pull must never break cleanup. + + Args: + handle: The app handle ``______``. + """ + if self.workdir_pvc: + job_name = handle.split("___")[-1] if handle else "" + try: + self.pull_results(job_name) + except Exception as e: + logger.warning("pull_results during cleanup of '%s' failed: %s", handle, e) + super().cleanup(handle) + def _lookup_job_dir(self, job_name: str) -> str: """Look up the job_dir saved by the scheduler for *job_name*.""" try: diff --git a/nemo_run/core/execution/templates/kubeflow.sh.j2 b/nemo_run/core/execution/templates/kubeflow.sh.j2 index 24529d7c..248da146 100644 --- a/nemo_run/core/execution/templates/kubeflow.sh.j2 +++ b/nemo_run/core/execution/templates/kubeflow.sh.j2 @@ -12,6 +12,12 @@ export TORCHX_MAX_RETRIES={{max_retries}} # Symlink /nemo_run → code_dir so pod-side paths like /nemo_run/scripts/... resolve correctly. ln -sfn {{code_dir}} /nemo_run +{% if setup_commands %} +echo "Running setup commands..." +{%- for setup_command in setup_commands %} +{{setup_command}} +{%- endfor %} +{%- endif %} echo "Starting training command..." set +e diff --git a/test/core/execution/test_kubeflow.py b/test/core/execution/test_kubeflow.py index 8babcf57..42011d11 100644 --- a/test/core/execution/test_kubeflow.py +++ b/test/core/execution/test_kubeflow.py @@ -503,6 +503,19 @@ def test_pull_results_noop_without_workdir_pvc(self, mock_k8s_clients): e.pull_results("test-job") mock_core.create_namespaced_pod.assert_not_called() + def test_cleanup_pulls_results_when_workdir_pvc(self, workdir_executor): + # cleanup() mirrors PVC outputs (incl. profiler traces) back to job_dir + # via pull_results, keying off the ______ handle. + with patch.object(workdir_executor, "pull_results") as mock_pull: + workdir_executor.cleanup("exp-id___trainer___test-job") + mock_pull.assert_called_once_with("test-job") + + def test_cleanup_noop_without_workdir_pvc(self): + e = KubeflowExecutor(image="test:latest") + with patch.object(e, "pull_results") as mock_pull: + e.cleanup("exp-id___trainer___test-job") + mock_pull.assert_not_called() + def test_data_mover_pod_inherits_tolerations_affinity_pull_secrets( self, mock_k8s_clients, tmp_path ):