Skip to content

Commit c4342f7

Browse files
committed
update pipeline_with_importer_workspace test
Signed-off-by: VaniHaripriya <[email protected]>
1 parent 9eaea2c commit c4342f7

File tree

5 files changed

+317
-67
lines changed

5 files changed

+317
-67
lines changed

backend/src/v2/component/launcher_v2_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,10 @@ func Test_getPlaceholders_WorkspaceArtifactPath(t *testing.T) {
193193
if err != nil {
194194
t.Fatalf("getPlaceholders error: %v", err)
195195
}
196-
got := ph["{{$.inputs.artifacts['data'].path}}"]
197-
want := filepath.Join(WorkspaceMountPath, ".artifacts", "sample", "sample.txt")
198-
if got != want {
199-
t.Fatalf("placeholder path mismatch: got=%q want=%q", got, want)
196+
actual := ph["{{$.inputs.artifacts['data'].path}}"]
197+
expected := filepath.Join(WorkspaceMountPath, ".artifacts", "minio", "mlpipeline", "sample", "sample.txt")
198+
if actual != expected {
199+
t.Fatalf("placeholder path mismatch: actual=%q expected=%q", actual, expected)
200200
}
201201
}
202202

sdk/python/kfp/dsl/types/artifact_types.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,21 +96,25 @@ def path(self, path: str) -> None:
9696
def _get_path(self) -> Optional[str]:
9797
local_path = self.uri
9898
if self.uri.startswith(RemotePrefix.GCS.value):
99-
local_path = _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value):]
99+
local_path = _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS
100+
.value):]
100101
elif self.uri.startswith(RemotePrefix.MINIO.value):
101-
local_path = _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.MINIO.value):]
102+
local_path = _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.
103+
MINIO.value):]
102104
elif self.uri.startswith(RemotePrefix.S3.value):
103-
local_path = _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3.value):]
105+
local_path = _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3
106+
.value):]
104107
elif self.uri.startswith(RemotePrefix.OCI.value):
105108
escaped_uri = self.uri[len(RemotePrefix.OCI.value):].replace(
106109
'/', '_')
107110
local_path = _OCI_LOCAL_MOUNT_PREFIX + escaped_uri
108-
111+
109112
# If the artifact is already present in the pipeline workspace, map to the workspace path.
110113
# This is indicated by backend setting metadata['_kfp_workspace'] = True.
111114
if self.metadata.get('_kfp_workspace') is True:
112-
local_path = os.path.join(WORKSPACE_MOUNT_PATH, ".artifacts", local_path.lstrip("/"))
113-
115+
local_path = os.path.join(WORKSPACE_MOUNT_PATH, ".artifacts",
116+
local_path.lstrip("/"))
117+
114118
return local_path
115119

116120
def _set_path(self, path: str) -> None:

test_data/compiled-workflows/pipeline_with_importer_workspace.yaml

Lines changed: 92 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,54 @@ spec:
99
- name: components-comp-importer
1010
value: '{"executorLabel":"exec-importer","inputDefinitions":{"parameters":{"uri":{"parameterType":"STRING"}}},"outputDefinitions":{"artifacts":{"artifact":{"artifactType":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}}}}'
1111
- name: implementations-comp-importer
12-
value: '{"artifactUri":{"constant":"minio://mlpipeline/sample/sample.txt"},"downloadToWorkspace":true,"metadata":{"source":"sample"},"typeSchema":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}'
13-
- name: components-9ffedf4e2479eb43e7fee57b45f0a09698cb1b0a7313f57371726b58f07afb52
14-
value: '{"executorLabel":"exec-read-imported","inputDefinitions":{"artifacts":{"data":{"artifactType":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}'
15-
- name: implementations-9ffedf4e2479eb43e7fee57b45f0a09698cb1b0a7313f57371726b58f07afb52
16-
value: '{"args":["--executor_input","{{$}}","--function_to_execute","read_imported"],"command":["sh","-c","\nif
12+
value: '{"artifactUri":{"constant":"gs://ml-pipeline-playground/shakespeare1.txt"},"downloadToWorkspace":true,"metadata":{"key":"value"},"typeSchema":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}'
13+
- name: components-comp-importer-2
14+
value: '{"executorLabel":"exec-importer-2","inputDefinitions":{"parameters":{"uri":{"parameterType":"STRING"}}},"outputDefinitions":{"artifacts":{"artifact":{"artifactType":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}}}}'
15+
- name: implementations-comp-importer-2
16+
value: '{"artifactUri":{"runtimeParameter":"uri"},"downloadToWorkspace":true,"metadata":{"source":"directory"},"reimport":true,"typeSchema":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}'
17+
- name: components-fdb727929b2be77348f126a5dee529ea510c18494fff066636b7b47c809c2bd4
18+
value: '{"executorLabel":"exec-read-dir","inputDefinitions":{"artifacts":{"data":{"artifactType":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}'
19+
- name: implementations-fdb727929b2be77348f126a5dee529ea510c18494fff066636b7b47c809c2bd4
20+
value: '{"args":["--executor_input","{{$}}","--function_to_execute","read_dir"],"command":["sh","-c","\nif
1721
! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3
1822
-m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1
19-
python3 -m pip install --quiet --no-warn-script-location ''kfp==2.14.3'' ''--no-deps''
23+
python3 -m pip install --quiet --no-warn-script-location ''kfp==2.14.6'' ''--no-deps''
2024
''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026
2125
\"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\"
2226
\u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3
2327
-m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport
2428
kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef
25-
read_imported(data: dsl.Input[dsl.Dataset]) -\u003e str:\n with open(data.path,
26-
\"r\") as f:\n content = f.read()\n print(f\"Imported content length:
27-
{len(content)}\")\n return content\n\n"],"image":"python:3.9"}'
29+
read_dir(data: dsl.Input[dsl.Dataset]) -\u003e str:\n \"\"\"Walk the directory
30+
and return a summary of file names.\"\"\"\n import os\n path = data.path\n\n if
31+
not os.path.exists(path):\n print(f\"ERROR: Path does not exist: {path}\")\n return
32+
\"ERROR: Path not found\"\n\n if os.path.isdir(path):\n names =
33+
[]\n for root, _, files in os.walk(path):\n for name in
34+
files:\n names.append(os.path.relpath(os.path.join(root, name),
35+
path))\n names.sort()\n result = \",\".join(names) if names
36+
else \"EMPTY_DIRECTORY\"\n print(f\"Found {len(names)} files: {result}\")\n return
37+
result\n elif os.path.isfile(path):\n print(f\"Path is a single
38+
file: {path}\")\n return os.path.basename(path)\n\n return \"ERROR:
39+
Unknown path type\"\n\n"],"image":"python:3.9"}'
40+
- name: components-78537f2c409754d8fe50b9b2796fa1de4f71bce3a2f68459d83d895bfc7657c5
41+
value: '{"executorLabel":"exec-train","inputDefinitions":{"artifacts":{"dataset":{"artifactType":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}}},"outputDefinitions":{"parameters":{"message":{"parameterType":"STRING"},"scalar":{"parameterType":"STRING"}}}}'
42+
- name: implementations-78537f2c409754d8fe50b9b2796fa1de4f71bce3a2f68459d83d895bfc7657c5
43+
value: '{"args":["--executor_input","{{$}}","--function_to_execute","train"],"command":["sh","-c","\nif
44+
! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3
45+
-m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1
46+
python3 -m pip install --quiet --no-warn-script-location ''kfp==2.14.6'' ''--no-deps''
47+
''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026
48+
\"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\"
49+
\u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3
50+
-m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport
51+
kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef
52+
train(\n dataset: dsl.Input[dsl.Dataset]\n) -\u003e NamedTuple(''Outputs'',
53+
[\n (''scalar'', str),\n (''message'', str),\n]):\n \"\"\"Dummy Training
54+
step.\"\"\"\n with open(dataset.path) as f:\n data = f.read()\n print(''Dataset:'',
55+
data)\n\n scalar = ''123''\n message = f''My model trained using data:
56+
{data}''\n\n from collections import namedtuple\n output = namedtuple(''Outputs'',
57+
[''scalar'', ''message''])\n return output(scalar, message)\n\n"],"image":"python:3.9"}'
2858
- name: components-root
29-
value: '{"dag":{"outputs":{"parameters":{"Output":{"valueFromParameter":{"outputParameterKey":"Output","producerSubtask":"read-imported"}}}},"tasks":{"importer":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"runtimeValue":{"constant":"minio://mlpipeline/sample/sample.txt"}}}},"taskInfo":{"name":"importer"}},"read-imported":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-read-imported"},"dependentTasks":["importer"],"inputs":{"artifacts":{"data":{"taskOutputArtifact":{"outputArtifactKey":"artifact","producerTask":"importer"}}}},"taskInfo":{"name":"read-imported"}}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}'
59+
value: '{"dag":{"outputs":{"parameters":{"dir_result":{"valueFromParameter":{"outputParameterKey":"Output","producerSubtask":"read-dir"}},"train_result":{"valueFromParameter":{"outputParameterKey":"scalar","producerSubtask":"train"}}}},"tasks":{"importer":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"runtimeValue":{"constant":"gs://ml-pipeline-playground/shakespeare1.txt"}}}},"taskInfo":{"name":"importer"}},"importer-2":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer-2"},"inputs":{"parameters":{"uri":{"componentInputParameter":"dataset_dir"}}},"taskInfo":{"name":"importer-2"}},"read-dir":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-read-dir"},"dependentTasks":["importer-2"],"inputs":{"artifacts":{"data":{"taskOutputArtifact":{"outputArtifactKey":"artifact","producerTask":"importer-2"}}}},"taskInfo":{"name":"read-dir"}},"train":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-train"},"dependentTasks":["importer"],"inputs":{"artifacts":{"dataset":{"taskOutputArtifact":{"outputArtifactKey":"artifact","producerTask":"importer"}}}},"taskInfo":{"name":"train"}}}},"inputDefinitions":{"parameters":{"dataset_dir":{"defaultValue":"gs://ml-pipeline-playground","isOptional":true,"parameterType":"STRING"}}},"outputDefinitions":{"parameters":{"dir_result":{"parameterType":"STRING"},"train_result":{"parameterType":"STRING"}}}}'
3060
entrypoint: entrypoint
3161
podMetadata:
3262
annotations:
@@ -93,7 +123,7 @@ spec:
93123
- name: importer
94124
- name: parent-dag-id
95125
metadata: {}
96-
name: system-importer
126+
name: system-importer-workspace
97127
outputs: {}
98128
volumes:
99129
- name: kfp-workspace
@@ -271,39 +301,76 @@ spec:
271301
- arguments:
272302
parameters:
273303
- name: task
274-
value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"runtimeValue":{"constant":"minio://mlpipeline/sample/sample.txt"}}}},"taskInfo":{"name":"importer"}}'
304+
value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"runtimeValue":{"constant":"gs://ml-pipeline-playground/shakespeare1.txt"}}}},"taskInfo":{"name":"importer"}}'
275305
- name: component
276306
value: '{{workflow.parameters.components-comp-importer}}'
277307
- name: importer
278308
value: '{{workflow.parameters.implementations-comp-importer}}'
279309
- name: parent-dag-id
280310
value: '{{inputs.parameters.parent-dag-id}}'
281311
name: importer
282-
template: system-importer
312+
template: system-importer-workspace
313+
- arguments:
314+
parameters:
315+
- name: task
316+
value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer-2"},"inputs":{"parameters":{"uri":{"componentInputParameter":"dataset_dir"}}},"taskInfo":{"name":"importer-2"}}'
317+
- name: component
318+
value: '{{workflow.parameters.components-comp-importer-2}}'
319+
- name: importer
320+
value: '{{workflow.parameters.implementations-comp-importer-2}}'
321+
- name: parent-dag-id
322+
value: '{{inputs.parameters.parent-dag-id}}'
323+
name: importer-2
324+
template: system-importer-workspace
325+
- arguments:
326+
parameters:
327+
- name: component
328+
value: '{{workflow.parameters.components-fdb727929b2be77348f126a5dee529ea510c18494fff066636b7b47c809c2bd4}}'
329+
- name: task
330+
value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-read-dir"},"dependentTasks":["importer-2"],"inputs":{"artifacts":{"data":{"taskOutputArtifact":{"outputArtifactKey":"artifact","producerTask":"importer-2"}}}},"taskInfo":{"name":"read-dir"}}'
331+
- name: container
332+
value: '{{workflow.parameters.implementations-fdb727929b2be77348f126a5dee529ea510c18494fff066636b7b47c809c2bd4}}'
333+
- name: task-name
334+
value: read-dir
335+
- name: parent-dag-id
336+
value: '{{inputs.parameters.parent-dag-id}}'
337+
depends: importer-2.Succeeded
338+
name: read-dir-driver
339+
template: system-container-driver
340+
- arguments:
341+
parameters:
342+
- name: pod-spec-patch
343+
value: '{{tasks.read-dir-driver.outputs.parameters.pod-spec-patch}}'
344+
- default: "false"
345+
name: cached-decision
346+
value: '{{tasks.read-dir-driver.outputs.parameters.cached-decision}}'
347+
depends: read-dir-driver.Succeeded
348+
name: read-dir
349+
template: system-container-executor
283350
- arguments:
284351
parameters:
285352
- name: component
286-
value: '{{workflow.parameters.components-9ffedf4e2479eb43e7fee57b45f0a09698cb1b0a7313f57371726b58f07afb52}}'
353+
value: '{{workflow.parameters.components-78537f2c409754d8fe50b9b2796fa1de4f71bce3a2f68459d83d895bfc7657c5}}'
287354
- name: task
288-
value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-read-imported"},"dependentTasks":["importer"],"inputs":{"artifacts":{"data":{"taskOutputArtifact":{"outputArtifactKey":"artifact","producerTask":"importer"}}}},"taskInfo":{"name":"read-imported"}}'
355+
value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-train"},"dependentTasks":["importer"],"inputs":{"artifacts":{"dataset":{"taskOutputArtifact":{"outputArtifactKey":"artifact","producerTask":"importer"}}}},"taskInfo":{"name":"train"}}'
289356
- name: container
290-
value: '{{workflow.parameters.implementations-9ffedf4e2479eb43e7fee57b45f0a09698cb1b0a7313f57371726b58f07afb52}}'
357+
value: '{{workflow.parameters.implementations-78537f2c409754d8fe50b9b2796fa1de4f71bce3a2f68459d83d895bfc7657c5}}'
291358
- name: task-name
292-
value: read-imported
359+
value: train
293360
- name: parent-dag-id
294361
value: '{{inputs.parameters.parent-dag-id}}'
295362
depends: importer.Succeeded
296-
name: read-imported-driver
363+
name: train-driver
297364
template: system-container-driver
298365
- arguments:
299366
parameters:
300367
- name: pod-spec-patch
301-
value: '{{tasks.read-imported-driver.outputs.parameters.pod-spec-patch}}'
368+
value: '{{tasks.train-driver.outputs.parameters.pod-spec-patch}}'
302369
- default: "false"
303370
name: cached-decision
304-
value: '{{tasks.read-imported-driver.outputs.parameters.cached-decision}}'
305-
depends: read-imported-driver.Succeeded
306-
name: read-imported
371+
value: '{{tasks.train-driver.outputs.parameters.cached-decision}}'
372+
depends: train-driver.Succeeded
373+
name: train
307374
template: system-container-executor
308375
inputs:
309376
parameters:
@@ -395,7 +462,7 @@ spec:
395462
- name: component
396463
value: '{{workflow.parameters.components-root}}'
397464
- name: runtime-config
398-
value: '{}'
465+
value: '{"parameterValues":{"dataset_dir":"gs://ml-pipeline-playground"}}'
399466
- name: driver-type
400467
value: ROOT_DAG
401468
name: root-driver
@@ -418,6 +485,8 @@ spec:
418485
creationTimestamp: null
419486
name: kfp-workspace
420487
spec:
488+
accessModes:
489+
- ReadWriteOnce
421490
resources:
422491
requests:
423492
storage: 1Gi

0 commit comments

Comments
 (0)