Skip to content

Commit e8f1e0e

Browse files
committed
feat(backend):implement file streaming to reduce memory usage
Signed-off-by: ddalvi <[email protected]>
1 parent 68c1dd7 commit e8f1e0e

File tree

6 files changed

+128
-176
lines changed

6 files changed

+128
-176
lines changed

backend/api/v1beta1/run.proto

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,6 @@ service RunService {
116116
};
117117
}
118118

119-
// Finds a run's artifact data.
120-
rpc ReadArtifactV1(ReadArtifactRequest) returns (ReadArtifactResponse) {
121-
option (google.api.http) = {
122-
get: "/apis/v1beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read"
123-
};
124-
}
125119

126120
// Terminates an active run.
127121
rpc TerminateRunV1(TerminateRunRequest) returns (google.protobuf.Empty) {

backend/src/apiserver/main.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,57 @@ func startHttpProxy(resourceManager *resource.ResourceManager, usePipelinesKuber
324324
runLogServer := server.NewRunLogServer(resourceManager)
325325
topMux.HandleFunc("/apis/v1alpha1/runs/{run_id}/nodes/{node_id}/log", runLogServer.ReadRunLogV1)
326326

327+
topMux.HandleFunc("/apis/v1beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read", func(w http.ResponseWriter, r *http.Request) {
328+
vars := mux.Vars(r)
329+
330+
runId := vars["run_id"]
331+
nodeId := vars["node_id"]
332+
artifactName := vars["artifact_name"]
333+
334+
if runId == "" || nodeId == "" || artifactName == "" {
335+
http.Error(w, "Missing required parameters", http.StatusBadRequest)
336+
return
337+
}
338+
339+
run, err := resourceManager.GetRun(runId)
340+
if err != nil {
341+
http.Error(w, "Run not found", http.StatusNotFound)
342+
return
343+
}
344+
345+
if run.WorkflowRuntimeManifest == "" {
346+
http.Error(w, "V2 IR spec not supported", http.StatusBadRequest)
347+
return
348+
}
349+
350+
execSpec, err := util.NewExecutionSpecJSON(util.ArgoWorkflow, []byte(run.WorkflowRuntimeManifest))
351+
if err != nil {
352+
http.Error(w, "Failed to parse workflow", http.StatusInternalServerError)
353+
return
354+
}
355+
356+
artifactPath := execSpec.ExecutionStatus().FindObjectStoreArtifactKeyOrEmpty(nodeId, artifactName)
357+
if artifactPath == "" {
358+
http.Error(w, "Artifact not found", http.StatusNotFound)
359+
return
360+
}
361+
362+
reader, err := resourceManager.GetObjectStore().GetFileReader(r.Context(), artifactPath)
363+
if err != nil {
364+
http.Error(w, "File not found", http.StatusNotFound)
365+
return
366+
}
367+
defer reader.Close()
368+
369+
w.Header().Set("Content-Type", "application/octet-stream")
370+
w.Header().Set("Cache-Control", "no-cache, private")
371+
372+
if _, err := io.Copy(w, reader); err != nil {
373+
glog.Errorf("Failed to stream artifact: %v", err)
374+
http.Error(w, "Internal server error", http.StatusInternalServerError)
375+
}
376+
})
377+
327378
topMux.PathPrefix("/apis/").Handler(runtimeMux)
328379

329380
// Register a handler for Prometheus to poll.

backend/src/apiserver/resource/resource_manager.go

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
package resource
1616

1717
import (
18+
"bufio"
1819
"context"
1920
"encoding/json"
21+
"errors"
2022
"fmt"
2123
"io"
2224
"net"
@@ -38,7 +40,7 @@ import (
3840
exec "github.com/kubeflow/pipelines/backend/src/common"
3941
"github.com/kubeflow/pipelines/backend/src/common/util"
4042
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
41-
"github.com/pkg/errors"
43+
pkgerrors "github.com/pkg/errors"
4244
"github.com/prometheus/client_golang/prometheus"
4345
"github.com/prometheus/client_golang/prometheus/promauto"
4446
"google.golang.org/grpc/codes"
@@ -129,6 +131,7 @@ type ResourceManager struct {
129131
}
130132

131133
func NewResourceManager(clientManager ClientManagerInterface, options *ResourceManagerOptions) *ResourceManager {
134+
k8sCoreClient := clientManager.KubernetesCoreClient()
132135
return &ResourceManager{
133136
experimentStore: clientManager.ExperimentStore(),
134137
pipelineStore: clientManager.PipelineStore(),
@@ -141,7 +144,7 @@ func NewResourceManager(clientManager ClientManagerInterface, options *ResourceM
141144
objectStore: clientManager.ObjectStore(),
142145
execClient: clientManager.ExecClient(),
143146
swfClient: clientManager.SwfClient(),
144-
k8sCoreClient: clientManager.KubernetesCoreClient(),
147+
k8sCoreClient: k8sCoreClient,
145148
subjectAccessReviewClient: clientManager.SubjectAccessReviewClient(),
146149
tokenReviewClient: clientManager.TokenReviewClient(),
147150
logArchive: clientManager.LogArchive(),
@@ -666,7 +669,7 @@ func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error {
666669
if err != nil {
667670
if apierrors.IsConflict(errors.Unwrap(err)) {
668671
continue
669-
} else if util.IsNotFound(errors.Cause(err)) {
672+
} else if util.IsNotFound(pkgerrors.Cause(err)) {
670673
break
671674
}
672675
return failedToReconcileSwfCrsError(err)
@@ -1543,13 +1546,13 @@ func (r *ResourceManager) fetchTemplateFromPipelineVersion(pipelineVersion *mode
15431546
} else {
15441547
// Try reading object store from pipeline_spec_uri
15451548
// nolint:staticcheck // [ST1003] Field name matches upstream legacy naming
1546-
template, errUri := r.objectStore.GetFile(context.TODO(), string(pipelineVersion.PipelineSpecURI))
1549+
template, errUri := r.readFileStreaming(context.TODO(), string(pipelineVersion.PipelineSpecURI))
15471550
if errUri != nil {
15481551
// Try reading object store from pipeline_version_id
1549-
template, errUUID := r.objectStore.GetFile(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.UUID)))
1552+
template, errUUID := r.readFileStreaming(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.UUID)))
15501553
if errUUID != nil {
15511554
// Try reading object store from pipeline_id
1552-
template, errPipelineId := r.objectStore.GetFile(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.PipelineId)))
1555+
template, errPipelineId := r.readFileStreaming(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.PipelineId)))
15531556
if errPipelineId != nil {
15541557
return nil, "", util.Wrap(
15551558
util.Wrap(
@@ -1567,6 +1570,36 @@ func (r *ResourceManager) fetchTemplateFromPipelineVersion(pipelineVersion *mode
15671570
}
15681571
}
15691572

1573+
func (r *ResourceManager) readFileStreaming(ctx context.Context, filePath string) ([]byte, error) {
1574+
reader, err := r.objectStore.GetFileReader(ctx, filePath)
1575+
if err != nil {
1576+
return nil, err
1577+
}
1578+
defer reader.Close()
1579+
1580+
return r.loadFileWithSizeLimit(reader, common.MaxFileLength)
1581+
}
1582+
1583+
func (r *ResourceManager) loadFileWithSizeLimit(fileReader io.Reader, maxFileLength int) ([]byte, error) {
1584+
reader := bufio.NewReaderSize(fileReader, maxFileLength)
1585+
var fileContent []byte
1586+
for {
1587+
currentRead := make([]byte, bufio.MaxScanTokenSize)
1588+
size, err := reader.Read(currentRead)
1589+
fileContent = append(fileContent, currentRead[:size]...)
1590+
if err == io.EOF {
1591+
break
1592+
}
1593+
if err != nil {
1594+
return nil, util.NewInternalServerError(err, "Error reading file from object store")
1595+
}
1596+
}
1597+
if len(fileContent) > maxFileLength {
1598+
return nil, util.NewInternalServerError(nil, "File size too large. Maximum supported size: %v", maxFileLength)
1599+
}
1600+
return fileContent, nil
1601+
}
1602+
15701603
// Creates the default experiment entry.
15711604
func (r *ResourceManager) CreateDefaultExperiment(namespace string) (string, error) {
15721605
// First check that we don't already have a default experiment ID in the DB.
@@ -1617,28 +1650,8 @@ func (r *ResourceManager) ReportMetric(metric *model.RunMetric) error {
16171650
return nil
16181651
}
16191652

1620-
// ReadArtifact parses run's workflow to find artifact file path and reads the content of the file
1621-
// from object store.
1622-
func (r *ResourceManager) ReadArtifact(runID string, nodeID string, artifactName string) ([]byte, error) {
1623-
run, err := r.runStore.GetRun(runID)
1624-
if err != nil {
1625-
return nil, err
1626-
}
1627-
if run.WorkflowRuntimeManifest == "" {
1628-
return nil, util.NewInvalidInputError("read artifact from run with v2 IR spec is not supported")
1629-
}
1630-
execSpec, err := util.NewExecutionSpecJSON(util.ArgoWorkflow, []byte(run.WorkflowRuntimeManifest))
1631-
if err != nil {
1632-
// This should never happen.
1633-
return nil, util.NewInternalServerError(
1634-
err, "failed to unmarshal workflow '%s'", run.WorkflowRuntimeManifest)
1635-
}
1636-
artifactPath := execSpec.ExecutionStatus().FindObjectStoreArtifactKeyOrEmpty(nodeID, artifactName)
1637-
if artifactPath == "" {
1638-
return nil, util.NewResourceNotFoundError(
1639-
"artifact", common.CreateArtifactPath(runID, nodeID, artifactName))
1640-
}
1641-
return r.objectStore.GetFile(context.TODO(), artifactPath)
1653+
func (r *ResourceManager) GetObjectStore() storage.ObjectStoreInterface {
1654+
return r.objectStore
16421655
}
16431656

16441657
// Fetches the default experiment id.

backend/src/apiserver/resource/resource_manager_test.go

Lines changed: 5 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"encoding/json"
2020
"fmt"
21+
"io"
2122
"strings"
2223
"testing"
2324
"time"
@@ -67,6 +68,10 @@ func (m *FakeBadObjectStore) GetFile(ctx context.Context, filePath string) ([]by
6768
return []byte(""), nil
6869
}
6970

71+
func (m *FakeBadObjectStore) GetFileReader(ctx context.Context, filePath string) (io.ReadCloser, error) {
72+
return nil, util.NewInternalServerError(errors.New("Error"), "bad object store")
73+
}
74+
7075
func (m *FakeBadObjectStore) AddAsYamlFile(ctx context.Context, o interface{}, filePath string) error {
7176
return util.NewInternalServerError(errors.New("Error"), "bad object store")
7277
}
@@ -3301,104 +3306,6 @@ func TestReportScheduledWorkflowResource_Error(t *testing.T) {
33013306
assert.Contains(t, err.(*util.UserError).String(), "database is closed")
33023307
}
33033308

3304-
func TestReadArtifact_Succeed(t *testing.T) {
3305-
store, manager, job := initWithJob(t)
3306-
defer store.Close()
3307-
3308-
expectedContent := "test"
3309-
filePath := "test/file.txt"
3310-
store.ObjectStore().AddFile(context.TODO(), []byte(expectedContent), filePath)
3311-
3312-
// Create a scheduled run
3313-
// job, _ := manager.CreateJob(model.Job{
3314-
// Name: "pp1",
3315-
// PipelineId: p.UUID,
3316-
// Enabled: true,
3317-
// })
3318-
workflow := util.NewWorkflow(&v1alpha1.Workflow{
3319-
TypeMeta: v1.TypeMeta{
3320-
APIVersion: "argoproj.io/v1alpha1",
3321-
Kind: "Workflow",
3322-
},
3323-
ObjectMeta: v1.ObjectMeta{
3324-
Name: "MY_NAME",
3325-
Namespace: "MY_NAMESPACE",
3326-
UID: "run-1",
3327-
Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"},
3328-
CreationTimestamp: v1.NewTime(time.Unix(11, 0).UTC()),
3329-
OwnerReferences: []v1.OwnerReference{{
3330-
APIVersion: "kubeflow.org/v1beta1",
3331-
Kind: "ScheduledWorkflow",
3332-
Name: "SCHEDULE_NAME",
3333-
UID: types.UID(job.UUID),
3334-
}},
3335-
},
3336-
Status: v1alpha1.WorkflowStatus{
3337-
Nodes: map[string]v1alpha1.NodeStatus{
3338-
"node-1": {
3339-
Outputs: &v1alpha1.Outputs{
3340-
Artifacts: []v1alpha1.Artifact{
3341-
{
3342-
Name: "artifact-1",
3343-
ArtifactLocation: v1alpha1.ArtifactLocation{
3344-
S3: &v1alpha1.S3Artifact{
3345-
Key: filePath,
3346-
},
3347-
},
3348-
},
3349-
},
3350-
},
3351-
},
3352-
},
3353-
},
3354-
})
3355-
_, err := manager.ReportWorkflowResource(context.Background(), workflow)
3356-
assert.Nil(t, err)
3357-
3358-
artifactContent, err := manager.ReadArtifact("run-1", "node-1", "artifact-1")
3359-
assert.Nil(t, err)
3360-
assert.Equal(t, expectedContent, string(artifactContent))
3361-
}
3362-
3363-
func TestReadArtifact_WorkflowNoStatus_NotFound(t *testing.T) {
3364-
store, manager, job := initWithJob(t)
3365-
defer store.Close()
3366-
// report workflow
3367-
workflow := util.NewWorkflow(&v1alpha1.Workflow{
3368-
TypeMeta: v1.TypeMeta{
3369-
APIVersion: "argoproj.io/v1alpha1",
3370-
Kind: "Workflow",
3371-
},
3372-
ObjectMeta: v1.ObjectMeta{
3373-
Name: "MY_NAME",
3374-
Namespace: "MY_NAMESPACE",
3375-
UID: "run-1",
3376-
Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"},
3377-
CreationTimestamp: v1.NewTime(time.Unix(11, 0).UTC()),
3378-
OwnerReferences: []v1.OwnerReference{{
3379-
APIVersion: "kubeflow.org/v1beta1",
3380-
Kind: "ScheduledWorkflow",
3381-
Name: "SCHEDULE_NAME",
3382-
UID: types.UID(job.UUID),
3383-
}},
3384-
},
3385-
})
3386-
_, err := manager.ReportWorkflowResource(context.Background(), workflow)
3387-
assert.Nil(t, err)
3388-
3389-
_, err = manager.ReadArtifact("run-1", "node-1", "artifact-1")
3390-
assert.True(t, util.IsUserErrorCodeMatch(err, codes.NotFound))
3391-
}
3392-
3393-
func TestReadArtifact_NoRun_NotFound(t *testing.T) {
3394-
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
3395-
defer store.Close()
3396-
manager := NewResourceManager(store, &ResourceManagerOptions{CollectMetrics: false})
3397-
3398-
_, err := manager.ReadArtifact("run-1", "node-1", "artifact-1")
3399-
assert.True(t, util.IsUserErrorCodeMatch(err, codes.NotFound))
3400-
}
3401-
34023309
const (
34033310
v2compatPipeline = `
34043311
apiVersion: argoproj.io/v1alpha1

backend/src/apiserver/server/run_server.go

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -460,28 +460,6 @@ func (s *RunServerV1) ReportRunMetricsV1(ctx context.Context, request *apiv1beta
460460
return &apiv1beta1.ReportRunMetricsResponse{Results: apiResults}, nil
461461
}
462462

463-
// Reads an artifact.
464-
// Supports v1beta1 behavior.
465-
func (s *RunServerV1) ReadArtifactV1(ctx context.Context, request *apiv1beta1.ReadArtifactRequest) (*apiv1beta1.ReadArtifactResponse, error) {
466-
if s.options.CollectMetrics {
467-
readArtifactRequests.Inc()
468-
}
469-
470-
err := s.canAccessRun(ctx, request.RunId, &authorizationv1.ResourceAttributes{Verb: common.RbacResourceVerbReadArtifact})
471-
if err != nil {
472-
return nil, util.Wrap(err, "Failed to authorize the request")
473-
}
474-
475-
content, err := s.resourceManager.ReadArtifact(
476-
request.GetRunId(), request.GetNodeId(), request.GetArtifactName())
477-
if err != nil {
478-
return nil, util.Wrapf(err, "failed to read artifact '%+v'", request)
479-
}
480-
return &apiv1beta1.ReadArtifactResponse{
481-
Data: content,
482-
}, nil
483-
}
484-
485463
// Terminates a run.
486464
// Applies common logic on v1beta1 and v2beta1 API.
487465
func (s *BaseRunServer) terminateRun(ctx context.Context, runId string) error {
@@ -626,28 +604,6 @@ func (s *RunServer) DeleteRun(ctx context.Context, request *apiv2beta1.DeleteRun
626604
return &emptypb.Empty{}, nil
627605
}
628606

629-
// Reads an artifact.
630-
// Supports v2beta1 behavior.
631-
func (s *RunServer) ReadArtifact(ctx context.Context, request *apiv2beta1.ReadArtifactRequest) (*apiv2beta1.ReadArtifactResponse, error) {
632-
if s.options.CollectMetrics {
633-
readArtifactRequests.Inc()
634-
}
635-
636-
err := s.canAccessRun(ctx, request.GetRunId(), &authorizationv1.ResourceAttributes{Verb: common.RbacResourceVerbReadArtifact})
637-
if err != nil {
638-
return nil, util.Wrap(err, "Failed to authorize the request")
639-
}
640-
641-
content, err := s.resourceManager.ReadArtifact(
642-
request.GetRunId(), request.GetNodeId(), request.GetArtifactName())
643-
if err != nil {
644-
return nil, util.Wrapf(err, "failed to read artifact '%+v'", request)
645-
}
646-
return &apiv2beta1.ReadArtifactResponse{
647-
Data: content,
648-
}, nil
649-
}
650-
651607
// Terminates a run.
652608
// Supports v2beta1 behavior.
653609
func (s *RunServer) TerminateRun(ctx context.Context, request *apiv2beta1.TerminateRunRequest) (*emptypb.Empty, error) {

0 commit comments

Comments
 (0)