Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion raycicmd/rayci_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,6 @@ var (

wandaStepAllowedKeys = []string{
"name", "label", "wanda", "depends_on",
"matrix", "env", "tags", "instance_type",
"matrix", "env", "tags", "instance_type", "priority",
}
)
68 changes: 66 additions & 2 deletions raycicmd/wanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package raycicmd
import (
"fmt"
"log"
"os"
"path"
"sort"

"github.com/ray-project/rayci/wanda"
)

const rawGitHubURL = "https://raw.githubusercontent.com/"
Expand All @@ -24,7 +27,10 @@ type wandaStep struct {

launcherBranch string

matrix any
matrix any
priority *int

cacheHit bool
}

func wandaCommands(br string) []string {
Expand Down Expand Up @@ -56,6 +62,9 @@ func (s *wandaStep) buildkiteStep() map[string]any {
if label == "" {
label = "wanda: " + s.name
}
if s.cacheHit {
label = label + " [cache hit]"
}

bkStep := map[string]any{
"label": label,
Expand All @@ -78,7 +87,10 @@ func (s *wandaStep) buildkiteStep() map[string]any {
bkStep["agents"] = newBkAgents(agentQueue)
}

if p := s.ciConfig.BuilderPriority; p != 0 {
// Use step-level priority if set, otherwise fall back to config-level priority
if s.priority != nil {
bkStep["priority"] = *s.priority
} else if p := s.ciConfig.BuilderPriority; p != 0 {
bkStep["priority"] = p
}
if s.matrix != nil {
Expand Down Expand Up @@ -108,6 +120,45 @@ func (c *wandaConverter) match(step map[string]any) bool {
return ok
}

func (c *wandaConverter) predictCacheHit(file string, envs map[string]string) bool {
// Only predict cache hits if we have the necessary config
if c.config.CIWorkRepo == "" {
return false
}

// Set environment variables for the prediction
// This allows the wanda package to expand variables in the spec file
for k, v := range envs {
_ = setEnvIfNotSet(k, v)
Copy link

Copilot AI Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring the error from setEnvIfNotSet discards potential OS-level failures. Capture and log (or propagate) the error instead of using the blank identifier.

Suggested change
_ = setEnvIfNotSet(k, v)
if err := setEnvIfNotSet(k, v); err != nil {
log.Printf("failed to set environment variable %s=%s: %v", k, v, err)
}

Copilot uses AI. Check for mistakes.
}

forgeConfig := &wanda.ForgeConfig{
WorkDir: ".",
WorkRepo: c.config.CIWorkRepo,
NamePrefix: c.config.ForgePrefix,
BuildID: c.info.buildID,
Epoch: wanda.DefaultCacheEpoch(),
RayCI: true,
Rebuild: false,
}

cacheHit, err := wanda.PredictCacheHit(file, forgeConfig)
if err != nil {
// If prediction fails, log the error but don't fail the build
log.Printf("failed to predict cache hit for %s: %v", file, err)
return false
}

return cacheHit
}
Comment on lines +123 to +153

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Modifying the process-wide environment using os.Setenv is not thread-safe and can lead to race conditions and unpredictable behavior if rayci ever runs converters concurrently. The environment is a global state.

A safer approach is to pass the environment information explicitly without mutating the global state. The wanda.PredictCacheHit function (also introduced in this PR) can be modified to accept a lookup function for environment variables.

Suggested Refactoring:

  1. Export lookupFunc in wanda/spec.go:
    Rename lookupFunc to LookupFunc to make it public.

  2. Update wanda.PredictCacheHit in wanda/predict_cache.go:
    Modify its signature to accept the lookup function.

    // In wanda/predict_cache.go
    // The wanda.LookupFunc type needs to be exported from the wanda package.
    func PredictCacheHit(specFile string, config *ForgeConfig, lookupEnv wanda.LookupFunc) (bool, error) {
        // ...
        spec, err := parseSpecFile(specFile)
        if err != nil {
            return false, fmt.Errorf("parse spec file: %w", err)
        }
    
        if lookupEnv == nil {
            lookupEnv = os.LookupEnv
        }
        // Expand env variables just like the actual build does
        spec = spec.expandVar(lookupEnv)
        // ...
    }
  3. Update predictCacheHit in this file (raycicmd/wanda.go):
    This suggestion replaces the function body to create a lookup function and pass it to wanda.PredictCacheHit, removing the need for setEnvIfNotSet.

func (c *wandaConverter) predictCacheHit(file string, envs map[string]string) bool {
	// Only predict cache hits if we have the necessary config
	if c.config.CIWorkRepo == "" {
		return false
	}

	// Create a lookup function for environment variables to avoid mutating global state.
	// This allows the wanda package to expand variables in the spec file safely.
	lookup := func(key string) (string, bool) {
		if val, ok := envs[key]; ok {
			return val, true
		}
		return os.LookupEnv(key)
	}

	forgeConfig := &wanda.ForgeConfig{
		WorkDir:    ".",
		WorkRepo:   c.config.CIWorkRepo,
		NamePrefix: c.config.ForgePrefix,
		BuildID:    c.info.buildID,
		Epoch:      wanda.DefaultCacheEpoch(),
		RayCI:      true,
		Rebuild:    false,
	}

	// Assumes PredictCacheHit is modified to accept a lookup function.
	cacheHit, err := wanda.PredictCacheHit(file, forgeConfig, lookup)
	if err != nil {
		// If prediction fails, log the error but don't fail the build
		log.Printf("failed to predict cache hit for %s: %v", file, err)
		return false
	}

	return cacheHit
}


func setEnvIfNotSet(key, value string) error {
if os.Getenv(key) == "" {
return os.Setenv(key, value)
}
return nil
}
Comment on lines +155 to +160
Copy link

Copilot AI Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function returns an error that all callers currently ignore. Either handle the error at call sites or document that failures are intentionally suppressed.

Copilot uses AI. Check for mistakes.
Comment on lines +155 to +160

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function will no longer be needed after refactoring predictCacheHit as suggested in my other comment to avoid using global environment variables.

As a side note, os.Getenv(key) == "" is not a reliable way to check if an environment variable is unset, as it could be explicitly set to an empty string. The correct way is to use _, ok := os.LookupEnv(key).


type envEntry struct {
k string
v string
Expand Down Expand Up @@ -155,6 +206,15 @@ func (c *wandaConverter) convert(id string, step map[string]any) (
label, _ := stringInMap(step, "label")
instanceType, _ := stringInMap(step, "instance_type")

var priority *int
if p, ok := step["priority"]; ok {
pInt, ok := p.(int)
if !ok {
return nil, fmt.Errorf("priority must be an integer, got %T", p)
}
priority = &pInt
}

var matrix any
if m, ok := step["matrix"]; ok {
matrix = m
Expand Down Expand Up @@ -183,12 +243,16 @@ func (c *wandaConverter) convert(id string, step map[string]any) (
envs: envs,
ciConfig: c.config,
matrix: matrix,
priority: priority,
instanceType: instanceType,
launcherBranch: c.info.launcherBranch,
}
if dependsOn, ok := step["depends_on"]; ok {
s.dependsOn = dependsOn
}

// Predict cache hit if possible
s.cacheHit = c.predictCacheHit(file, envs)

return s.buildkiteStep(), nil
}
23 changes: 23 additions & 0 deletions raycicmd/wanda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,26 @@ func TestWandaStep_skip(t *testing.T) {
t.Errorf("got agent %v, want nil", bk["agent"])
}
}

func TestWandaStep_priority(t *testing.T) {
priority := 5
s := &wandaStep{
name: "forge",
file: "ci/forge.wanda.yaml",
buildID: "abc123",
priority: &priority,

envs: map[string]string{"RAYCI_BRANCH": "stable"},

ciConfig: &config{
BuilderQueues: map[string]string{"builder": "mybuilder"},
BuilderPriority: 1, // This should be overridden by step-level priority
},
}

bk := s.buildkiteStep()

if got := bk["priority"].(int); got != 5 {
t.Errorf("got priority %d, want 5", got)
}
}
146 changes: 146 additions & 0 deletions wanda/predict_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package wanda

import (
"fmt"
"log"
"os"
"path/filepath"

cranename "github.com/google/go-containerregistry/pkg/name"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/v1/remote"
)

// PredictCacheHit predicts if a wanda build will result in a cache hit
// without actually building the image. It computes the build input digest
// and checks if the corresponding cache tag exists in the registry.
func PredictCacheHit(specFile string, config *ForgeConfig) (bool, error) {
if config == nil {
return false, fmt.Errorf("config is required")
}

spec, err := parseSpecFile(specFile)
if err != nil {
return false, fmt.Errorf("parse spec file: %w", err)
}

// Expand env variables just like the actual build does
spec = spec.expandVar(os.LookupEnv)

// If caching is disabled, it won't cache hit
if spec.DisableCaching {
return false, nil
}

// If rebuild is forced, it won't use cache
if config.Rebuild {
return false, nil
}

// Only predict for remote builds where we can check the registry
if !config.isRemote() {
return false, nil
}

// Resolve work directory
absWorkDir, err := filepath.Abs(filepath.FromSlash(config.WorkDir))
if err != nil {
return false, fmt.Errorf("abs path for work dir: %w", err)
}

// Prepare the tar stream
ts := newTarStream()
files, err := listSrcFiles(absWorkDir, spec.Srcs, spec.Dockerfile)
if err != nil {
return false, fmt.Errorf("list src files: %w", err)
}
for _, file := range files {
ts.addFile(file, nil, filepath.Join(absWorkDir, filepath.FromSlash(file)))
}

// Create build input
in := newBuildInput(ts, spec.BuildArgs)

// Resolve base images
froms, err := resolveBases(spec.Froms, config, absWorkDir)
if err != nil {
return false, fmt.Errorf("resolve bases: %w", err)
}
in.froms = froms

// Compute build input core
inputCore, err := in.makeCore(spec.Dockerfile)
if err != nil {
return false, fmt.Errorf("make build input core: %w", err)
}
inputCore.Epoch = config.Epoch

// Compute the digest
inputDigest, err := inputCore.digest()
if err != nil {
return false, fmt.Errorf("compute build input digest: %w", err)
}

// Get the cache tag
cacheTag := config.cacheTag(inputDigest)

// Check if the cache tag exists in the registry
ct, err := cranename.NewTag(cacheTag)
if err != nil {
return false, fmt.Errorf("parse cache tag %q: %w", cacheTag, err)
}

remoteOpts := []remote.Option{
remote.WithAuthFromKeychain(authn.DefaultKeychain),
}

_, err = remote.Get(ct, remoteOpts...)
if err != nil {
// Cache miss or error checking
return false, nil
}
Comment on lines +97 to +101

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic swallows all errors from remote.Get, treating them as a cache miss. This can hide underlying issues like authentication failures or network problems, making them hard to debug. It would be better to distinguish between a 'not found' error (which is a cache miss) and other unexpected errors.

You can achieve this by inspecting the error type and status code. You will need to add the following imports:

import (
    "errors"
    "net/http"

    "github.com/google/go-containerregistry/pkg/v1/remote/transport"
)
	_, err = remote.Get(ct, remoteOpts...)
	if err != nil {
		var terr *transport.Error
		if errors.As(err, &terr) && terr.StatusCode == http.StatusNotFound {
			// This is a cache miss, not an error.
			return false, nil
		}

		// Any other error is unexpected and should be returned to be logged by the caller.
		return false, fmt.Errorf("failed to check remote cache tag %q: %w", cacheTag, err)
	}


// Cache hit!
return true, nil
}

// resolveBases resolves base images for cache prediction
func resolveBases(froms []string, config *ForgeConfig, workDir string) (map[string]*imageSource, error) {
Copy link

Copilot AI Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The workDir parameter is never used in this function. Remove it if unnecessary, or incorporate it if it was intended for resolving relative references.

Suggested change
func resolveBases(froms []string, config *ForgeConfig, workDir string) (map[string]*imageSource, error) {
func resolveBases(froms []string, config *ForgeConfig) (map[string]*imageSource, error) {

Copilot uses AI. Check for mistakes.
m := make(map[string]*imageSource)
namePrefix := config.NamePrefix

remoteOpts := []remote.Option{
remote.WithAuthFromKeychain(authn.DefaultKeychain),
}

for _, from := range froms {
// Local images (prefixed with @) - skip for prediction
// We can't reliably predict local image digests without docker
if from[0] == '@' {
Copy link

Copilot AI Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indexing from[0] without checking length will panic if an empty string is present in froms. Add a length guard (e.g., if len(from) > 0 && from[0] == '@').

Suggested change
if from[0] == '@' {
if len(from) > 0 && from[0] == '@' {

Copilot uses AI. Check for mistakes.
log.Printf("skipping local image %s for cache prediction", from)
return nil, fmt.Errorf("cannot predict cache for local images: %s", from)
Copy link

Copilot AI Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment says 'skip for prediction' but the code returns an error, aborting prediction. Either update the comment to reflect the behavior or change the code to actually skip (e.g., continue) without returning an error.

Suggested change
return nil, fmt.Errorf("cannot predict cache for local images: %s", from)
continue

Copilot uses AI. Check for mistakes.
}

// Work namespace images
if namePrefix != "" && len(from) > len(namePrefix) && from[:len(namePrefix)] == namePrefix {
fromName := from[len(namePrefix):]
workTag := config.workTag(fromName)

src, err := resolveRemoteImage(from, workTag, remoteOpts...)
if err != nil {
return nil, fmt.Errorf("resolve remote work image %s: %w", from, err)
}
m[from] = src
continue
}

// Normal remote images
src, err := resolveRemoteImage(from, from, remoteOpts...)
if err != nil {
return nil, fmt.Errorf("resolve remote image %s: %w", from, err)
}
m[from] = src
}

return m, nil
}
Loading