-
Couldn't load subscription status.
- Fork 1.8k
feat(backend/sdk):Add download_to_workspace option to dsl.importer #12353
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat(backend/sdk):Add download_to_workspace option to dsl.importer #12353
Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
be5f8f5 to
9866919
Compare
9866919 to
60212b1
Compare
| return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err) | ||
| } | ||
| defer bucket.Close() | ||
| if err := objectstore.DownloadBlob(ctx, bucket, workspaceRoot, blobKey); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the workspace paths are quite correct. This is more in line with the design of having the same path but just under /kfp-workspace/.artifacts:
diff --git a/backend/src/v2/component/importer_launcher.go b/backend/src/v2/component/importer_launcher.go
index 7c11f9fbd..78e8a6d03 100644
--- a/backend/src/v2/component/importer_launcher.go
+++ b/backend/src/v2/component/importer_launcher.go
@@ -4,8 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
- "os"
- "path/filepath"
"strings"
"github.com/kubeflow/pipelines/backend/src/common/util"
@@ -310,20 +308,22 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact
bucketConfig.SessionInfo = &sess
}
}
+
+ localPath, err := LocalWorkspacePathForURI(artifactUri)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get local path for uri %q: %w", artifactUri, err)
+ }
+
blobKey, err := bucketConfig.KeyFromURI(artifactUri)
if err != nil {
return nil, fmt.Errorf("failed to derive blob key from uri %q while downloading artifact into workspace: %w", artifactUri, err)
}
- workspaceRoot := filepath.Join(WorkspaceMountPath, ".artifacts")
- if err := os.MkdirAll(workspaceRoot, 0755); err != nil {
- return nil, fmt.Errorf("failed to create workspace directory %q: %w", workspaceRoot, err)
- }
bucket, err := objectstore.OpenBucket(ctx, l.k8sClient, l.launcherV2Options.Namespace, bucketConfig)
if err != nil {
return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err)
}
defer bucket.Close()
- if err := objectstore.DownloadBlob(ctx, bucket, workspaceRoot, blobKey); err != nil {
+ if err := objectstore.DownloadBlob(ctx, bucket, localPath, blobKey); err != nil {
return nil, fmt.Errorf("failed to download artifact to workspace: %w", err)
}
}
diff --git a/backend/src/v2/component/launcher_v2.go b/backend/src/v2/component/launcher_v2.go
index 95caf375b..ac670ca2d 100644
--- a/backend/src/v2/component/launcher_v2.go
+++ b/backend/src/v2/component/launcher_v2.go
@@ -865,16 +865,13 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
// If the artifact is marked as already in the workspace, map the workspace path.
if inputArtifact.GetMetadata() != nil {
if v, ok := inputArtifact.GetMetadata().GetFields()["_kfp_workspace"]; ok && v.GetBoolValue() {
- bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(inputArtifact.Uri)
- if err == nil {
- blobKey, err := bucketConfig.KeyFromURI(inputArtifact.Uri)
- if err == nil {
- localPath := filepath.Join(WorkspaceMountPath, ".artifacts", blobKey)
- key = fmt.Sprintf(`{{$.inputs.artifacts['%s'].path}}`, name)
- placeholders[key] = localPath
- continue
- }
+ localPath, err := LocalWorkspacePathForURI(inputArtifact.Uri)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get local workspace path for input artifact %q: %w", name, err)
}
+ key = fmt.Sprintf(`{{$.inputs.artifacts['%s'].path}}`, name)
+ placeholders[key] = localPath
+ continue
}
}
diff --git a/sdk/python/kfp/dsl/types/artifact_types.py b/sdk/python/kfp/dsl/types/artifact_types.py
index ae737b483..39ae42adb 100644
--- a/sdk/python/kfp/dsl/types/artifact_types.py
+++ b/sdk/python/kfp/dsl/types/artifact_types.py
@@ -94,39 +94,26 @@ class Artifact:
self._set_path(path)
def _get_path(self) -> Optional[str]:
- # If the artifact is already present in the pipeline workspace, map to the workspace path.
- # This is indicated by backend setting metadata['_kfp_workspace'] = True.
- if self.metadata.get('_kfp_workspace') is True:
- uri = self.uri or ''
- for prefix in (RemotePrefix.GCS.value, RemotePrefix.MINIO.value,
- RemotePrefix.S3.value):
- if uri.startswith(prefix):
- # Derive the object key relative to the bucket:
- # "<bucket>/<key>" -> blob_key == "<key>"
- without_scheme = uri[len(prefix):]
- parts = without_scheme.split('/', 1)
- blob_key = parts[1] if len(parts) == 2 else ''
- if blob_key:
- return os.path.join(WORKSPACE_MOUNT_PATH, '.artifacts',
- blob_key)
-
- return os.path.join(WORKSPACE_MOUNT_PATH, '.artifacts')
+ local_path = self.uri
if self.uri.startswith(RemotePrefix.GCS.value):
- return _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value
- ):]
- if self.uri.startswith(RemotePrefix.MINIO.value):
- return _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.MINIO
- .value):]
- if self.uri.startswith(RemotePrefix.S3.value):
- return _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3.value
- ):]
- if self.uri.startswith(RemotePrefix.OCI.value):
+ local_path = _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value ):]
+ elif self.uri.startswith(RemotePrefix.MINIO.value):
+ local_path = _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.MINIO .value):]
+ elif self.uri.startswith(RemotePrefix.S3.value):
+ local_path = _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3.value ):]
+ elif self.uri.startswith(RemotePrefix.OCI.value):
escaped_uri = self.uri[len(RemotePrefix.OCI.value):].replace(
'/', '_')
- return _OCI_LOCAL_MOUNT_PREFIX + escaped_uri
+ local_path = _OCI_LOCAL_MOUNT_PREFIX + escaped_uri
+
+ # If the artifact is already present in the pipeline workspace, map to the workspace path.
+ # This is indicated by backend setting metadata['_kfp_workspace'] = True.
+ if self.metadata.get('_kfp_workspace') is True:
+ local_path = os.path.join(WORKSPACE_MOUNT_PATH, ".artifacts", local_path.lstrip("/"))
+
# uri == path for local execution
- return self.uri
+ return local_path
def _set_path(self, path: str) -> None:
self.uri = convert_local_path_to_remote_path(path)| } | ||
| // Add workspace volume only if the workflow defines a workspace PVC | ||
| hasWorkspacePVC := false | ||
| for _, pvc := range c.wf.Spec.VolumeClaimTemplates { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This always mounts the PVC for all importers if the pipeline uses a workspace. Let's make this more conditional to avoid unnecessary PVC mounting which can cause pod scheduling issues. Something like this:
diff --git a/backend/src/v2/compiler/argocompiler/dag.go b/backend/src/v2/compiler/argocompiler/dag.go
index 73e3efc8b..c57193222 100644
--- a/backend/src/v2/compiler/argocompiler/dag.go
+++ b/backend/src/v2/compiler/argocompiler/dag.go
@@ -326,7 +326,8 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
// it's impossible to add a when condition based on driver outputs.
return nil, fmt.Errorf("triggerPolicy.condition on importer task is not supported")
}
- importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID)
+
+ importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID, e.Importer.GetDownloadToWorkspace())
if err != nil {
return nil, err
}
diff --git a/backend/src/v2/compiler/argocompiler/importer.go b/backend/src/v2/compiler/argocompiler/importer.go
index 5c509bf4e..096481ee6 100644
--- a/backend/src/v2/compiler/argocompiler/importer.go
+++ b/backend/src/v2/compiler/argocompiler/importer.go
@@ -32,7 +32,7 @@ func (c *workflowCompiler) Importer(name string, componentSpec *pipelinespec.Com
return c.saveComponentImpl(name, importer)
}
-func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string) (*wfapi.DAGTask, error) {
+func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string, downloadToWorkspace bool) (*wfapi.DAGTask, error) {
componentPlaceholder, err := c.useComponentSpec(task.GetComponentRef().GetName())
if err != nil {
return nil, err
@@ -43,7 +43,7 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
}
return &wfapi.DAGTask{
Name: name,
- Template: c.addImporterTemplate(),
+ Template: c.addImporterTemplate(downloadToWorkspace),
Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{{
Name: paramTask,
Value: wfapi.AnyStringPtr(taskJSON),
@@ -60,11 +60,16 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
}, nil
}
-func (c *workflowCompiler) addImporterTemplate() string {
+func (c *workflowCompiler) addImporterTemplate(downloadToWorkspace bool) string {
name := "system-importer"
+ if downloadToWorkspace {
+ name += "-workspace"
+ }
+
if _, alreadyExists := c.templates[name]; alreadyExists {
return name
}
+
args := []string{
"--executor_type", "importer",
"--task_spec", inputValue(paramTask),
@@ -91,18 +96,10 @@ func (c *workflowCompiler) addImporterTemplate() string {
if value, ok := os.LookupEnv(PublishLogsEnvVar); ok {
args = append(args, "--publish_logs", value)
}
- // Add workspace volume only if the workflow defines a workspace PVC
- hasWorkspacePVC := false
- for _, pvc := range c.wf.Spec.VolumeClaimTemplates {
- if pvc.Name == workspaceVolumeName {
- hasWorkspacePVC = true
- break
- }
- }
var volumeMounts []k8score.VolumeMount
var volumes []k8score.Volume
- if hasWorkspacePVC {
+ if downloadToWorkspace {
volumeMounts = append(volumeMounts, k8score.VolumeMount{
Name: workspaceVolumeName,
MountPath: component.WorkspaceMountPath,| bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(artifactUri) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to parse bucket config while downloading artifact into workspace with uri %q: %w", artifactUri, err) | ||
| } | ||
| // Resolve and attach session info from kfp-launcher config for the artifact provider | ||
| if cfg, cfgErr := config.FromConfigMap(ctx, l.k8sClient, l.launcherV2Options.Namespace); cfgErr != nil { | ||
| glog.Warningf("failed to load launcher config for workspace download: %v", cfgErr) | ||
| } else if cfg != nil { | ||
| if sess, sessErr := cfg.GetStoreSessionInfo(artifactUri); sessErr != nil { | ||
| glog.Warningf("failed to resolve store session info for %q: %v", artifactUri, sessErr) | ||
| } else { | ||
| bucketConfig.SessionInfo = &sess | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid duplicating this code and move the code from the Execute method to a helper function.
| return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err) | ||
| } | ||
| defer bucket.Close() | ||
| if err := objectstore.DownloadBlob(ctx, bucket, workspaceRoot, blobKey); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also log a message indicating the artifact is being downloaded?
| ) | ||
| def pipeline_with_importer_workspace() -> str: | ||
| ds = importer( | ||
| artifact_uri='minio://mlpipeline/sample/sample.txt', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a test case that downloads a directory as well.
c92e63f to
9eaea2c
Compare
e367ac3 to
e034cc7
Compare
Signed-off-by: VaniHaripriya <[email protected]>
Signed-off-by: VaniHaripriya <[email protected]>
Signed-off-by: VaniHaripriya <[email protected]>
Signed-off-by: VaniHaripriya <[email protected]>
9c1df93 to
9f12059
Compare
Description of your changes:
Resolves #12352 .
This PR adds download_to_workspace option to dsl.importer to download artifacts into the pipeline workspace and consume them downstream without re-downloading.
Checklist: