Skip to content

Commit 829bd3b

Browse files
committed
issue-12432 - cleanups
Signed-off-by: Helber Belmiro <[email protected]>
1 parent 6ff9a0a commit 829bd3b

File tree

6 files changed

+68
-55
lines changed

6 files changed

+68
-55
lines changed

backend/src/agent/persistence/client/pipeline_client.go

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"strings"
2424
"time"
2525

26+
"github.com/golang/glog"
2627
"google.golang.org/grpc/metadata"
2728

2829
api "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
@@ -39,7 +40,7 @@ const (
3940
type PipelineClientInterface interface {
4041
ReportWorkflow(workflow util.ExecutionSpec) error
4142
ReportScheduledWorkflow(swf *util.ScheduledWorkflow) error
42-
ReadArtifactForMetrics(request *util.ArtifactRequest) (*util.ArtifactResponse, error)
43+
ReadArtifact(request *util.ReadArtifactRequest) (*util.ReadArtifactResponse, error)
4344
ReportRunMetrics(request *api.ReportRunMetricsRequest) (*api.ReportRunMetricsResponse, error)
4445
}
4546

@@ -122,7 +123,10 @@ func (p *PipelineClient) ReportWorkflow(workflow util.ExecutionSpec) error {
122123
workflow.ToStringForStore())
123124
} else if statusCode.Code() == codes.Unauthenticated && strings.Contains(err.Error(), "service account token has expired") {
124125
// If unauthenticated because SA token is expired, re-read/refresh the token and try again
125-
p.tokenRefresher.RefreshToken()
126+
if refreshErr := p.tokenRefresher.RefreshToken(); refreshErr != nil {
127+
return util.NewCustomError(refreshErr, util.CUSTOM_CODE_PERMANENT,
128+
"Failed to refresh token: %v", refreshErr.Error())
129+
}
126130
return util.NewCustomError(err, util.CUSTOM_CODE_TRANSIENT,
127131
"Error while reporting workflow resource (code: %v, message: %v): %v, %+v",
128132
statusCode.Code(),
@@ -167,7 +171,10 @@ func (p *PipelineClient) ReportScheduledWorkflow(swf *util.ScheduledWorkflow) er
167171
swf.ScheduledWorkflow)
168172
} else if statusCode.Code() == codes.Unauthenticated && strings.Contains(err.Error(), "service account token has expired") {
169173
// If unauthenticated because SA token is expired, re-read/refresh the token and try again
170-
p.tokenRefresher.RefreshToken()
174+
if refreshErr := p.tokenRefresher.RefreshToken(); refreshErr != nil {
175+
return util.NewCustomError(refreshErr, util.CUSTOM_CODE_PERMANENT,
176+
"Failed to refresh token: %v", refreshErr.Error())
177+
}
171178
return util.NewCustomError(err, util.CUSTOM_CODE_TRANSIENT,
172179
"Error while reporting workflow resource (code: %v, message: %v): %v, %+v",
173180
statusCode.Code(),
@@ -187,15 +194,17 @@ func (p *PipelineClient) ReportScheduledWorkflow(swf *util.ScheduledWorkflow) er
187194
return nil
188195
}
189196

190-
// ReadArtifactForMetrics reads artifact content using the new util.ArtifactRequest/Response types.
191-
// This method is used by the metrics collection system.
192-
func (p *PipelineClient) ReadArtifactForMetrics(request *util.ArtifactRequest) (*util.ArtifactResponse, error) {
193-
// Construct the HTTP streaming endpoint URL
194-
// Format: /apis/v1beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:stream
197+
// ReadArtifact reads artifact content using HTTP streaming.
198+
//
199+
// Error Handling:
200+
// - Returns nil for artifacts that don't exist (HTTP 404)
201+
// - Returns CUSTOM_CODE_PERMANENT for client errors (400, 403) and unexpected failures
202+
// - Returns CUSTOM_CODE_TRANSIENT for retryable errors (401, 500, network issues)
203+
// - Automatically refreshes tokens on expiry; callers should retry transient errors
204+
func (p *PipelineClient) ReadArtifact(request *util.ReadArtifactRequest) (*util.ReadArtifactResponse, error) {
195205
url := fmt.Sprintf("%s/apis/v1beta1/runs/%s/nodes/%s/artifacts/%s:stream",
196206
p.httpBaseURL, request.RunID, request.NodeID, request.ArtifactName)
197207

198-
// Create HTTP request with timeout
199208
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
200209
defer cancel()
201210

@@ -205,61 +214,65 @@ func (p *PipelineClient) ReadArtifactForMetrics(request *util.ArtifactRequest) (
205214
"Failed to create HTTP request: %v", err.Error())
206215
}
207216

208-
// Add authorization header
209217
req.Header.Set("Authorization", "Bearer "+p.tokenRefresher.GetToken())
210218

211-
// Make the HTTP request
212219
resp, err := p.httpClient.Do(req)
213220
if err != nil {
214221
if strings.Contains(err.Error(), "service account token has expired") {
215-
// If unauthenticated because SA token is expired, re-read/refresh the token and try again
216-
p.tokenRefresher.RefreshToken()
222+
// If unauthenticated because SA token is expired, refresh the token and the caller should retry
223+
if refreshErr := p.tokenRefresher.RefreshToken(); refreshErr != nil {
224+
return nil, util.NewCustomError(refreshErr, util.CUSTOM_CODE_PERMANENT,
225+
"Failed to refresh token: %v", refreshErr.Error())
226+
}
217227
return nil, util.NewCustomError(err, util.CUSTOM_CODE_TRANSIENT,
218228
"Error while reading artifact due to token expiry: %v", err.Error())
219229
}
220230
return nil, util.NewCustomError(err, util.CUSTOM_CODE_PERMANENT,
221231
"Failed to make HTTP request: %v", err.Error())
222232
}
223-
defer resp.Body.Close()
233+
defer func() {
234+
if closeErr := resp.Body.Close(); closeErr != nil {
235+
glog.Warningf("Failed to close response body: %v", closeErr)
236+
}
237+
}()
224238

225-
// Handle HTTP status codes
226239
switch resp.StatusCode {
227240
case http.StatusOK:
228-
// Success case - read the artifact data
229241
data, err := io.ReadAll(resp.Body)
230242
if err != nil {
231243
return nil, util.NewCustomError(err, util.CUSTOM_CODE_PERMANENT,
232244
"Failed to read artifact data: %v", err.Error())
233245
}
234-
return &util.ArtifactResponse{Data: data}, nil
246+
return &util.ReadArtifactResponse{Data: data}, nil
235247

236248
case http.StatusNotFound:
237-
// Artifact not found - return nil as per original behavior
238249
return nil, nil
239250

240251
case http.StatusUnauthorized:
241252
// Unauthorized - refresh token and return transient error
242-
p.tokenRefresher.RefreshToken()
253+
if refreshErr := p.tokenRefresher.RefreshToken(); refreshErr != nil {
254+
if closeErr := resp.Body.Close(); closeErr != nil {
255+
glog.Warningf("Failed to close response body: %v", closeErr)
256+
}
257+
return nil, util.NewCustomError(refreshErr, util.CUSTOM_CODE_PERMANENT,
258+
"Failed to refresh token: %v", refreshErr.Error())
259+
}
243260
return nil, util.NewCustomError(fmt.Errorf("HTTP 401"), util.CUSTOM_CODE_TRANSIENT,
244261
"Failed to read artifact, unauthorized (token may have expired)")
245262

246263
case http.StatusForbidden:
247-
// Forbidden - return permanent error
248264
return nil, util.NewCustomError(fmt.Errorf("HTTP 403"), util.CUSTOM_CODE_PERMANENT,
249265
"Failed to read artifact, forbidden")
250266

251267
case http.StatusBadRequest:
252-
// Bad request - return permanent error
253268
return nil, util.NewCustomError(fmt.Errorf("HTTP 400"), util.CUSTOM_CODE_PERMANENT,
254269
"Failed to read artifact, bad request")
255270

256271
case http.StatusInternalServerError:
257-
// Internal server error - return transient error
258272
return nil, util.NewCustomError(fmt.Errorf("HTTP 500"), util.CUSTOM_CODE_TRANSIENT,
259273
"Failed to read artifact, internal server error")
260274

261275
default:
262-
// Other status codes - return permanent error
263276
return nil, util.NewCustomError(fmt.Errorf("HTTP %d", resp.StatusCode), util.CUSTOM_CODE_PERMANENT,
264277
"Failed to read artifact, HTTP status: %d", resp.StatusCode)
265278
}

backend/src/agent/persistence/client/pipeline_client_fake.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ type PipelineClientFake struct {
2424
workflows map[string]util.ExecutionSpec
2525
scheduledWorkflows map[string]*util.ScheduledWorkflow
2626
err error
27-
artifacts map[string]*util.ArtifactResponse
28-
readArtifactRequest *util.ArtifactRequest
27+
artifacts map[string]*util.ReadArtifactResponse
28+
readArtifactRequest *util.ReadArtifactRequest
2929
reportedMetricsRequest *api.ReportRunMetricsRequest
3030
reportMetricsResponseStub *api.ReportRunMetricsResponse
3131
reportMetricsErrorStub error
@@ -36,7 +36,7 @@ func NewPipelineClientFake() *PipelineClientFake {
3636
workflows: make(map[string]util.ExecutionSpec),
3737
scheduledWorkflows: make(map[string]*util.ScheduledWorkflow),
3838
err: nil,
39-
artifacts: make(map[string]*util.ArtifactResponse),
39+
artifacts: make(map[string]*util.ReadArtifactResponse),
4040
reportMetricsResponseStub: &api.ReportRunMetricsResponse{},
4141
}
4242
}
@@ -57,7 +57,7 @@ func (p *PipelineClientFake) ReportScheduledWorkflow(swf *util.ScheduledWorkflow
5757
return nil
5858
}
5959

60-
func (p *PipelineClientFake) ReadArtifactForMetrics(request *util.ArtifactRequest) (*util.ArtifactResponse, error) {
60+
func (p *PipelineClientFake) ReadArtifact(request *util.ReadArtifactRequest) (*util.ReadArtifactResponse, error) {
6161
if p.err != nil {
6262
return nil, p.err
6363
}
@@ -82,11 +82,11 @@ func (p *PipelineClientFake) GetScheduledWorkflow(namespace string, name string)
8282
return p.scheduledWorkflows[getKey(namespace, name)]
8383
}
8484

85-
func (p *PipelineClientFake) StubArtifact(request *util.ArtifactRequest, response *util.ArtifactResponse) {
85+
func (p *PipelineClientFake) StubArtifact(request *util.ReadArtifactRequest, response *util.ReadArtifactResponse) {
8686
p.artifacts[request.String()] = response
8787
}
8888

89-
func (p *PipelineClientFake) GetReadArtifactRequest() *util.ArtifactRequest {
89+
func (p *PipelineClientFake) GetReadArtifactRequest() *util.ReadArtifactRequest {
9090
return p.readArtifactRequest
9191
}
9292

backend/src/agent/persistence/worker/metrics_reporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (r MetricsReporter) ReportMetrics(workflow util.ExecutionSpec) error {
5252
// Skip reporting if the workflow doesn't have the run id label
5353
return nil
5454
}
55-
runMetrics, partialFailures := workflow.ExecutionStatus().CollectionMetrics(r.pipelineClient.ReadArtifactForMetrics)
55+
runMetrics, partialFailures := workflow.ExecutionStatus().CollectionMetrics(r.pipelineClient.ReadArtifact)
5656
if len(runMetrics) == 0 {
5757
return aggregateErrors(partialFailures)
5858
}

backend/src/agent/persistence/worker/metrics_reporter_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,12 @@ func TestReportMetrics_Succeed(t *testing.T) {
170170
metricsJSON := `{"metrics": [{"name": "accuracy", "numberValue": 0.77}, {"name": "logloss", "numberValue": 1.2}]}`
171171
artifactData, _ := util.ArchiveTgz(map[string]string{"file": metricsJSON})
172172
pipelineFake.StubArtifact(
173-
&util.ArtifactRequest{
173+
&util.ReadArtifactRequest{
174174
RunID: "run-1",
175175
NodeID: "node-1",
176176
ArtifactName: "mlpipeline-metrics",
177177
},
178-
&util.ArtifactResponse{
178+
&util.ReadArtifactResponse{
179179
Data: []byte(artifactData),
180180
})
181181
pipelineFake.StubReportRunMetrics(&api.ReportRunMetricsResponse{
@@ -233,12 +233,12 @@ func TestReportMetrics_EmptyArchive_Fail(t *testing.T) {
233233
})
234234
artifactData, _ := util.ArchiveTgz(map[string]string{})
235235
pipelineFake.StubArtifact(
236-
&util.ArtifactRequest{
236+
&util.ReadArtifactRequest{
237237
RunID: "run-1",
238238
NodeID: "node-1",
239239
ArtifactName: "mlpipeline-metrics",
240240
},
241-
&util.ArtifactResponse{
241+
&util.ReadArtifactResponse{
242242
Data: []byte(artifactData),
243243
})
244244

@@ -277,12 +277,12 @@ func TestReportMetrics_MultipleFilesInArchive_Fail(t *testing.T) {
277277
invalidMetricsJSON := `invalid JSON`
278278
artifactData, _ := util.ArchiveTgz(map[string]string{"file1": validMetricsJSON, "file2": invalidMetricsJSON})
279279
pipelineFake.StubArtifact(
280-
&util.ArtifactRequest{
280+
&util.ReadArtifactRequest{
281281
RunID: "run-1",
282282
NodeID: "node-1",
283283
ArtifactName: "mlpipeline-metrics",
284284
},
285-
&util.ArtifactResponse{
285+
&util.ReadArtifactResponse{
286286
Data: []byte(artifactData),
287287
})
288288

@@ -320,12 +320,12 @@ func TestReportMetrics_InvalidMetricsJSON_Fail(t *testing.T) {
320320
metricsJSON := `invalid JSON`
321321
artifactData, _ := util.ArchiveTgz(map[string]string{"file": metricsJSON})
322322
pipelineFake.StubArtifact(
323-
&util.ArtifactRequest{
323+
&util.ReadArtifactRequest{
324324
RunID: "run-1",
325325
NodeID: "node-1",
326326
ArtifactName: "mlpipeline-metrics",
327327
},
328-
&util.ArtifactResponse{
328+
&util.ReadArtifactResponse{
329329
Data: []byte(artifactData),
330330
})
331331

@@ -374,21 +374,21 @@ func TestReportMetrics_InvalidMetricsJSON_PartialFail(t *testing.T) {
374374
invalidArtifactData, _ := util.ArchiveTgz(map[string]string{"file": invalidMetricsJSON})
375375
// Stub two artifacts, node-1 is invalid, node-2 is valid.
376376
pipelineFake.StubArtifact(
377-
&util.ArtifactRequest{
377+
&util.ReadArtifactRequest{
378378
RunID: "run-1",
379379
NodeID: "node-1",
380380
ArtifactName: "mlpipeline-metrics",
381381
},
382-
&util.ArtifactResponse{
382+
&util.ReadArtifactResponse{
383383
Data: []byte(invalidArtifactData),
384384
})
385385
pipelineFake.StubArtifact(
386-
&util.ArtifactRequest{
386+
&util.ReadArtifactRequest{
387387
RunID: "run-1",
388388
NodeID: "node-2",
389389
ArtifactName: "mlpipeline-metrics",
390390
},
391-
&util.ArtifactResponse{
391+
&util.ReadArtifactResponse{
392392
Data: []byte(validArtifactData),
393393
})
394394

@@ -444,12 +444,12 @@ func TestReportMetrics_CorruptedArchiveFile_Fail(t *testing.T) {
444444
},
445445
})
446446
pipelineFake.StubArtifact(
447-
&util.ArtifactRequest{
447+
&util.ReadArtifactRequest{
448448
RunID: "run-1",
449449
NodeID: "node-1",
450450
ArtifactName: "mlpipeline-metrics",
451451
},
452-
&util.ArtifactResponse{
452+
&util.ReadArtifactResponse{
453453
Data: []byte("invalid tgz content"),
454454
})
455455

@@ -488,12 +488,12 @@ func TestReportMetrics_MultiplMetricErrors_TransientErrowWin(t *testing.T) {
488488
`{"metrics": [{"name": "accuracy", "numberValue": 0.77}, {"name": "log loss", "numberValue": 1.2}, {"name": "accuracy", "numberValue": 1.2}]}`
489489
artifactData, _ := util.ArchiveTgz(map[string]string{"file": metricsJSON})
490490
pipelineFake.StubArtifact(
491-
&util.ArtifactRequest{
491+
&util.ReadArtifactRequest{
492492
RunID: "run-1",
493493
NodeID: "node-1",
494494
ArtifactName: "mlpipeline-metrics",
495495
},
496-
&util.ArtifactResponse{
496+
&util.ReadArtifactResponse{
497497
Data: []byte(artifactData),
498498
})
499499
pipelineFake.StubReportRunMetrics(&api.ReportRunMetricsResponse{
@@ -551,12 +551,12 @@ func TestReportMetrics_Unauthorized(t *testing.T) {
551551
metricsJSON := `{"metrics": [{"name": "accuracy", "numberValue": 0.77}, {"name": "logloss", "numberValue": 1.2}]}`
552552
artifactData, _ := util.ArchiveTgz(map[string]string{"file": metricsJSON})
553553
pipelineFake.StubArtifact(
554-
&util.ArtifactRequest{
554+
&util.ReadArtifactRequest{
555555
RunID: "run-1",
556556
NodeID: "node-1",
557557
ArtifactName: "mlpipeline-metrics",
558558
},
559-
&util.ArtifactResponse{
559+
&util.ReadArtifactResponse{
560560
Data: []byte(artifactData),
561561
})
562562
pipelineFake.StubReportRunMetrics(&api.ReportRunMetricsResponse{

backend/src/common/util/execution_status.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,24 @@ type NodeStatus struct {
3131
Children []string
3232
}
3333

34-
// ArtifactRequest is a simple artifact request struct to replace the removed protobuf types
35-
type ArtifactRequest struct {
34+
// ReadArtifactRequest is a simple artifact request struct to replace the removed protobuf types
35+
type ReadArtifactRequest struct {
3636
RunID string
3737
NodeID string
3838
ArtifactName string
3939
}
4040

4141
// String returns a string representation for use as a map key
42-
func (r *ArtifactRequest) String() string {
42+
func (r *ReadArtifactRequest) String() string {
4343
return r.RunID + "/" + r.NodeID + "/" + r.ArtifactName
4444
}
4545

46-
// ArtifactResponse is a simple artifact response struct to replace the removed protobuf types
47-
type ArtifactResponse struct {
46+
// ReadArtifactResponse is a simple artifact response struct to replace the removed protobuf types
47+
type ReadArtifactResponse struct {
4848
Data []byte
4949
}
5050

51-
type RetrieveArtifact func(request *ArtifactRequest) (*ArtifactResponse, error)
51+
type RetrieveArtifact func(request *ReadArtifactRequest) (*ReadArtifactResponse, error)
5252

5353
// Abstract interface to encapsulate the resources of the execution runtime specifically
5454
// for status information. This interface is mainly to access the status related information

backend/src/common/util/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ func readNodeMetricsJSONOrEmpty(runID string, nodeStatus *workflowapi.NodeStatus
579579
return "", nil // No metrics artifact, skip the reporting
580580
}
581581

582-
artifactRequest := &ArtifactRequest{
582+
artifactRequest := &ReadArtifactRequest{
583583
RunID: runID,
584584
NodeID: nodeStatus.ID,
585585
ArtifactName: metricsArtifactName,

0 commit comments

Comments
 (0)