diff --git a/raycicmd/rayci_pipeline.go b/raycicmd/rayci_pipeline.go index dde74f6..f747ba6 100644 --- a/raycicmd/rayci_pipeline.go +++ b/raycicmd/rayci_pipeline.go @@ -69,6 +69,6 @@ var ( wandaStepAllowedKeys = []string{ "name", "label", "wanda", "depends_on", - "matrix", "env", "tags", "instance_type", + "matrix", "env", "tags", "instance_type", "priority", } ) diff --git a/raycicmd/wanda.go b/raycicmd/wanda.go index 7b5b059..ec59176 100644 --- a/raycicmd/wanda.go +++ b/raycicmd/wanda.go @@ -3,8 +3,11 @@ package raycicmd import ( "fmt" "log" + "os" "path" "sort" + + "github.com/ray-project/rayci/wanda" ) const rawGitHubURL = "https://raw.githubusercontent.com/" @@ -24,7 +27,10 @@ type wandaStep struct { launcherBranch string - matrix any + matrix any + priority *int + + cacheHit bool } func wandaCommands(br string) []string { @@ -56,6 +62,9 @@ func (s *wandaStep) buildkiteStep() map[string]any { if label == "" { label = "wanda: " + s.name } + if s.cacheHit { + label = label + " [cache hit]" + } bkStep := map[string]any{ "label": label, @@ -78,7 +87,10 @@ func (s *wandaStep) buildkiteStep() map[string]any { bkStep["agents"] = newBkAgents(agentQueue) } - if p := s.ciConfig.BuilderPriority; p != 0 { + // Use step-level priority if set, otherwise fall back to config-level priority + if s.priority != nil { + bkStep["priority"] = *s.priority + } else if p := s.ciConfig.BuilderPriority; p != 0 { bkStep["priority"] = p } if s.matrix != nil { @@ -108,6 +120,45 @@ func (c *wandaConverter) match(step map[string]any) bool { return ok } +func (c *wandaConverter) predictCacheHit(file string, envs map[string]string) bool { + // Only predict cache hits if we have the necessary config + if c.config.CIWorkRepo == "" { + return false + } + + // Set environment variables for the prediction + // This allows the wanda package to expand variables in the spec file + for k, v := range envs { + _ = setEnvIfNotSet(k, v) + } + + forgeConfig := &wanda.ForgeConfig{ + WorkDir: ".", + WorkRepo: c.config.CIWorkRepo, + NamePrefix: c.config.ForgePrefix, + BuildID: c.info.buildID, + Epoch: wanda.DefaultCacheEpoch(), + RayCI: true, + Rebuild: false, + } + + cacheHit, err := wanda.PredictCacheHit(file, forgeConfig) + if err != nil { + // If prediction fails, log the error but don't fail the build + log.Printf("failed to predict cache hit for %s: %v", file, err) + return false + } + + return cacheHit +} + +func setEnvIfNotSet(key, value string) error { + if os.Getenv(key) == "" { + return os.Setenv(key, value) + } + return nil +} + type envEntry struct { k string v string @@ -155,6 +206,15 @@ func (c *wandaConverter) convert(id string, step map[string]any) ( label, _ := stringInMap(step, "label") instanceType, _ := stringInMap(step, "instance_type") + var priority *int + if p, ok := step["priority"]; ok { + pInt, ok := p.(int) + if !ok { + return nil, fmt.Errorf("priority must be an integer, got %T", p) + } + priority = &pInt + } + var matrix any if m, ok := step["matrix"]; ok { matrix = m @@ -183,6 +243,7 @@ func (c *wandaConverter) convert(id string, step map[string]any) ( envs: envs, ciConfig: c.config, matrix: matrix, + priority: priority, instanceType: instanceType, launcherBranch: c.info.launcherBranch, } @@ -190,5 +251,8 @@ func (c *wandaConverter) convert(id string, step map[string]any) ( s.dependsOn = dependsOn } + // Predict cache hit if possible + s.cacheHit = c.predictCacheHit(file, envs) + return s.buildkiteStep(), nil } diff --git a/raycicmd/wanda_test.go b/raycicmd/wanda_test.go index 5bfb115..30baad5 100644 --- a/raycicmd/wanda_test.go +++ b/raycicmd/wanda_test.go @@ -66,3 +66,26 @@ func TestWandaStep_skip(t *testing.T) { t.Errorf("got agent %v, want nil", bk["agent"]) } } + +func TestWandaStep_priority(t *testing.T) { + priority := 5 + s := &wandaStep{ + name: "forge", + file: "ci/forge.wanda.yaml", + buildID: "abc123", + priority: &priority, + + envs: map[string]string{"RAYCI_BRANCH": "stable"}, + + ciConfig: &config{ + BuilderQueues: map[string]string{"builder": "mybuilder"}, + BuilderPriority: 1, // This should be overridden by step-level priority + }, + } + + bk := s.buildkiteStep() + + if got := bk["priority"].(int); got != 5 { + t.Errorf("got priority %d, want 5", got) + } +} diff --git a/wanda/predict_cache.go b/wanda/predict_cache.go new file mode 100644 index 0000000..098dc9f --- /dev/null +++ b/wanda/predict_cache.go @@ -0,0 +1,146 @@ +package wanda + +import ( + "fmt" + "log" + "os" + "path/filepath" + + cranename "github.com/google/go-containerregistry/pkg/name" + "github.com/google/go-containerregistry/pkg/authn" + "github.com/google/go-containerregistry/pkg/v1/remote" +) + +// PredictCacheHit predicts if a wanda build will result in a cache hit +// without actually building the image. It computes the build input digest +// and checks if the corresponding cache tag exists in the registry. +func PredictCacheHit(specFile string, config *ForgeConfig) (bool, error) { + if config == nil { + return false, fmt.Errorf("config is required") + } + + spec, err := parseSpecFile(specFile) + if err != nil { + return false, fmt.Errorf("parse spec file: %w", err) + } + + // Expand env variables just like the actual build does + spec = spec.expandVar(os.LookupEnv) + + // If caching is disabled, it won't cache hit + if spec.DisableCaching { + return false, nil + } + + // If rebuild is forced, it won't use cache + if config.Rebuild { + return false, nil + } + + // Only predict for remote builds where we can check the registry + if !config.isRemote() { + return false, nil + } + + // Resolve work directory + absWorkDir, err := filepath.Abs(filepath.FromSlash(config.WorkDir)) + if err != nil { + return false, fmt.Errorf("abs path for work dir: %w", err) + } + + // Prepare the tar stream + ts := newTarStream() + files, err := listSrcFiles(absWorkDir, spec.Srcs, spec.Dockerfile) + if err != nil { + return false, fmt.Errorf("list src files: %w", err) + } + for _, file := range files { + ts.addFile(file, nil, filepath.Join(absWorkDir, filepath.FromSlash(file))) + } + + // Create build input + in := newBuildInput(ts, spec.BuildArgs) + + // Resolve base images + froms, err := resolveBases(spec.Froms, config, absWorkDir) + if err != nil { + return false, fmt.Errorf("resolve bases: %w", err) + } + in.froms = froms + + // Compute build input core + inputCore, err := in.makeCore(spec.Dockerfile) + if err != nil { + return false, fmt.Errorf("make build input core: %w", err) + } + inputCore.Epoch = config.Epoch + + // Compute the digest + inputDigest, err := inputCore.digest() + if err != nil { + return false, fmt.Errorf("compute build input digest: %w", err) + } + + // Get the cache tag + cacheTag := config.cacheTag(inputDigest) + + // Check if the cache tag exists in the registry + ct, err := cranename.NewTag(cacheTag) + if err != nil { + return false, fmt.Errorf("parse cache tag %q: %w", cacheTag, err) + } + + remoteOpts := []remote.Option{ + remote.WithAuthFromKeychain(authn.DefaultKeychain), + } + + _, err = remote.Get(ct, remoteOpts...) + if err != nil { + // Cache miss or error checking + return false, nil + } + + // Cache hit! + return true, nil +} + +// resolveBases resolves base images for cache prediction +func resolveBases(froms []string, config *ForgeConfig, workDir string) (map[string]*imageSource, error) { + m := make(map[string]*imageSource) + namePrefix := config.NamePrefix + + remoteOpts := []remote.Option{ + remote.WithAuthFromKeychain(authn.DefaultKeychain), + } + + for _, from := range froms { + // Local images (prefixed with @) - skip for prediction + // We can't reliably predict local image digests without docker + if from[0] == '@' { + log.Printf("skipping local image %s for cache prediction", from) + return nil, fmt.Errorf("cannot predict cache for local images: %s", from) + } + + // Work namespace images + if namePrefix != "" && len(from) > len(namePrefix) && from[:len(namePrefix)] == namePrefix { + fromName := from[len(namePrefix):] + workTag := config.workTag(fromName) + + src, err := resolveRemoteImage(from, workTag, remoteOpts...) + if err != nil { + return nil, fmt.Errorf("resolve remote work image %s: %w", from, err) + } + m[from] = src + continue + } + + // Normal remote images + src, err := resolveRemoteImage(from, from, remoteOpts...) + if err != nil { + return nil, fmt.Errorf("resolve remote image %s: %w", from, err) + } + m[from] = src + } + + return m, nil +} diff --git a/wanda/predict_cache_test.go b/wanda/predict_cache_test.go new file mode 100644 index 0000000..5f95294 --- /dev/null +++ b/wanda/predict_cache_test.go @@ -0,0 +1,116 @@ +package wanda + +import ( + "os" + "path/filepath" + "testing" +) + +func TestPredictCacheHit_DisabledCaching(t *testing.T) { + // Create a temporary directory for test files + tmpDir := t.TempDir() + + // Create a simple wanda spec with caching disabled + specContent := `name: test +dockerfile: Dockerfile +disable_caching: true +` + specFile := filepath.Join(tmpDir, "test.wanda.yaml") + if err := os.WriteFile(specFile, []byte(specContent), 0644); err != nil { + t.Fatalf("failed to write spec file: %v", err) + } + + // Create a simple Dockerfile + dockerfile := filepath.Join(tmpDir, "Dockerfile") + if err := os.WriteFile(dockerfile, []byte("FROM alpine\n"), 0644); err != nil { + t.Fatalf("failed to write dockerfile: %v", err) + } + + config := &ForgeConfig{ + WorkDir: tmpDir, + WorkRepo: "test.example.com/repo", + Epoch: "test", + RayCI: true, + } + + cacheHit, err := PredictCacheHit(specFile, config) + if err != nil { + t.Fatalf("PredictCacheHit failed: %v", err) + } + + if cacheHit { + t.Error("expected cache hit to be false when caching is disabled") + } +} + +func TestPredictCacheHit_LocalOnly(t *testing.T) { + // Create a temporary directory for test files + tmpDir := t.TempDir() + + // Create a simple wanda spec + specContent := `name: test +dockerfile: Dockerfile +` + specFile := filepath.Join(tmpDir, "test.wanda.yaml") + if err := os.WriteFile(specFile, []byte(specContent), 0644); err != nil { + t.Fatalf("failed to write spec file: %v", err) + } + + // Create a simple Dockerfile + dockerfile := filepath.Join(tmpDir, "Dockerfile") + if err := os.WriteFile(dockerfile, []byte("FROM alpine\n"), 0644); err != nil { + t.Fatalf("failed to write dockerfile: %v", err) + } + + // Config without WorkRepo (local only) + config := &ForgeConfig{ + WorkDir: tmpDir, + Epoch: "test", + } + + cacheHit, err := PredictCacheHit(specFile, config) + if err != nil { + t.Fatalf("PredictCacheHit failed: %v", err) + } + + if cacheHit { + t.Error("expected cache hit to be false for local-only builds") + } +} + +func TestPredictCacheHit_RebuildForced(t *testing.T) { + // Create a temporary directory for test files + tmpDir := t.TempDir() + + // Create a simple wanda spec + specContent := `name: test +dockerfile: Dockerfile +` + specFile := filepath.Join(tmpDir, "test.wanda.yaml") + if err := os.WriteFile(specFile, []byte(specContent), 0644); err != nil { + t.Fatalf("failed to write spec file: %v", err) + } + + // Create a simple Dockerfile + dockerfile := filepath.Join(tmpDir, "Dockerfile") + if err := os.WriteFile(dockerfile, []byte("FROM alpine\n"), 0644); err != nil { + t.Fatalf("failed to write dockerfile: %v", err) + } + + config := &ForgeConfig{ + WorkDir: tmpDir, + WorkRepo: "test.example.com/repo", + Epoch: "test", + RayCI: true, + Rebuild: true, + } + + cacheHit, err := PredictCacheHit(specFile, config) + if err != nil { + t.Fatalf("PredictCacheHit failed: %v", err) + } + + if cacheHit { + t.Error("expected cache hit to be false when rebuild is forced") + } +}