diff --git a/indexer/controller/v2/controller.go b/indexer/controller/v2/controller.go new file mode 100644 index 000000000..83bf691ed --- /dev/null +++ b/indexer/controller/v2/controller.go @@ -0,0 +1,175 @@ +package controller + +import ( + "context" + "errors" + "fmt" + "io" + "reflect" + "runtime" + "strings" + "time" + + "github.com/quay/zlog" + + "github.com/quay/claircore" + "github.com/quay/claircore/indexer" +) + +type Controller struct { + store indexer.Store // NOTE(hank) This should be [datastore.Indexer]. + fetcher indexer.FetchArena +} + +func New(ctx context.Context, + store indexer.Store, + fetcher indexer.FetchArena, +) (*Controller, error) { + c := Controller{ + store: store, + fetcher: fetcher, + } + return &c, nil +} + +func (c *Controller) Index(ctx context.Context, m *claircore.Manifest) (*claircore.IndexReport, error) { + e, err := c.newExec(ctx) + if err != nil { + return nil, fmt.Errorf("controller: unable to construct execution context: %w", err) + } + defer func() { + if err := e.Close(); err != nil { + zlog.Info(ctx). + Err(err). + Msg("error closing resources") + } + }() + +Run: + for !e.IsTerminal() { + // At the start of every step, check if the request's context is valid. + // If not, everything should be at a safe-point and we can just exit this loop. + select { + case <-ctx.Done(): + err = context.Cause(ctx) + // Break directly to avoid messing with the exec struct. + break Run + default: + } + + // Run this step. + // The execution should continue as long as the parent context is valid + // or a short interval after the parent context was canceled, whichever + // is longer. + func() { + ctx := zlog.ContextWithValues(ctx, "step", e.State.String()) + defer func() { + zlog.Debug(ctx). + Err(err). + Stringer("next", e.State). + Msg("step ran") + }() + + // Create & cleanup the step context. + sctx, cause := context.WithCancelCause(context.WithoutCancel(ctx)) + stop := context.AfterFunc(ctx, func() { // NB Using the parent context. + time.Sleep(30 * time.Second) // BUG(hank) The per-step grace period is not configurable. + cause(fmt.Errorf("controller: %w: %w", errGracePeriod, context.Cause(ctx))) + }) + defer func() { + // This is complicated because of the desired grace period behavior. + usedGrace := !stop() + err := sctx.Err() // Make sure to capture this before the unconditional CancelCause call. + cause(errStepComplete) + zlog.Debug(ctx). + Bool("used_grace_period", usedGrace). + Bool("timed_out", errors.Is(err, errGracePeriod)). + AnErr("cause", err). + Msg("ending step context") + }() + + e.State, err = e.State(sctx, e, m) + }() + + // All errors out of controller steps should either be of type *stepError, + // or be accompanied by a terminal stateFn. + var serr *stepError + switch { + case errors.Is(err, nil): + case errors.As(err, &serr): + panic("TODO: handle stepErr") + case e.IsTerminal(): + // "Err" is not a *stepErr and is was with a terminal stateFn. + continue + default: + panic(fmt.Errorf("programmer error: previous step returned (%v, %v) ", e.State, err)) + } + + // TODO(hank) Do the database persistence. + } + switch { + case errors.Is(err, nil): + case errors.Is(err, context.Canceled): + // Log? + return nil, fmt.Errorf("controller: ended early: %w", err) + default: + return nil, fmt.Errorf("controller: fatal error: %w", err) + } + + return e.Result, nil +} + +func (c *Controller) newExec(ctx context.Context) (*exec, error) { + e := exec{ + Store: c.store, + Realizer: c.fetcher.Realizer(ctx).(indexer.DescriptionRealizer), + + Result: &claircore.IndexReport{ + Packages: map[string]*claircore.Package{}, + Environments: map[string][]*claircore.Environment{}, + Distributions: map[string]*claircore.Distribution{}, + Repositories: map[string]*claircore.Repository{}, + Files: map[string]claircore.File{}, + }, + State: checkManifest, + } + return &e, nil +} + +type stateFn func(context.Context, *exec, *claircore.Manifest) (stateFn, error) + +func (f stateFn) String() (n string) { + if f == nil { + return "" + } + n = runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name() + _, n, _ = strings.Cut(n, "controller.") + return n +} + +// Aliases for my sanity +type detector = indexer.VersionedScanner +type store = indexer.Store // Should be [datastore.Indexer] + +type exec struct { + Store store + Detectors []detector + Realizer indexer.DescriptionRealizer + + Defer []io.Closer + Result *claircore.IndexReport + State stateFn +} + +func (e *exec) IsTerminal() bool { + return e.State == nil +} + +func (e *exec) Close() error { + errs := make([]error, len(e.Defer)+1) + for i, c := range e.Defer { + errs[i] = c.Close() + } + errs[len(errs)-1] = e.Realizer.Close() + return errors.Join(errs...) +} diff --git a/indexer/controller/v2/error.go b/indexer/controller/v2/error.go new file mode 100644 index 000000000..862794d51 --- /dev/null +++ b/indexer/controller/v2/error.go @@ -0,0 +1,74 @@ +package controller + +import ( + "errors" + "fmt" +) + +// Errors for the [Controller's] internal runloop. +var ( + // ErrGracePeriod is a signal that the step's grace period started and expired. + errGracePeriod = errors.New("grace period exceeded") + // ErrStepComplete is used to stop the per-step context. + // This should not escape the step; it showing up outside the runloop means there's some wonky lifetimes. + errStepComplete = errors.New("step complete") +) + +type stepError struct { + inner error + durability errorDurability +} + +//go:generate go run golang.org/x/tools/cmd/stringer -type errorDurability -linecomment +type errorDurability uint + +const ( + // ErrKindUnspecified is the default; the error says nothing about its durability. + errKindUnspecified errorDurability = iota // unspecified + // ErrKindBlob indicates there's some feature of the blob that means this step will never return a positive result. + // + // Typically there's something wrong with the blob, like it not actually being the expected kind of data. + errKindBlob // blob + // ErrKindCode indicates there's a bug in the code and retrying after a code change may yield a different result. + errKindCode // code + // ErrKindTransient indicates there was an environmental issue, retrying may yield a different result. + errKindTransient // transient +) + +func (e *stepError) Error() string { + if e.durability == errKindUnspecified { + return e.inner.Error() + } + return fmt.Sprintf("%v (durable for: %v)", e.inner, e.durability) +} +func (e *stepError) Unwrap() error { + return e.inner +} + +var errPerLayer = errors.New("per-layer error") + +type layerError struct { + layer, op string + inner error +} + +// Error implements error. +func (e *layerError) Error() string { + return fmt.Sprintf("%s for layer %s: %v", e.op, e.layer, e.inner) +} + +func (e *layerError) Unwrap() error { + return e.inner +} + +func (e *layerError) Is(tgt error) bool { + return errors.Is(errPerLayer, tgt) +} + +func newLayerError(which string, op string, err error) error { + return &layerError{ + layer: which, + op: op, + inner: err, + } +} diff --git a/indexer/controller/v2/errordurability_string.go b/indexer/controller/v2/errordurability_string.go new file mode 100644 index 000000000..383347afd --- /dev/null +++ b/indexer/controller/v2/errordurability_string.go @@ -0,0 +1,26 @@ +// Code generated by "stringer -type errorDurability -linecomment"; DO NOT EDIT. + +package controller + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[errKindUnspecified-0] + _ = x[errKindBlob-1] + _ = x[errKindCode-2] + _ = x[errKindTransient-3] +} + +const _errorDurability_name = "unspecifiedblobcodetransient" + +var _errorDurability_index = [...]uint8{0, 11, 15, 19, 28} + +func (i errorDurability) String() string { + if i >= errorDurability(len(_errorDurability_index)-1) { + return "errorDurability(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _errorDurability_name[_errorDurability_index[i]:_errorDurability_index[i+1]] +} diff --git a/indexer/controller/v2/metrics.go b/indexer/controller/v2/metrics.go new file mode 100644 index 000000000..0ef4f04ac --- /dev/null +++ b/indexer/controller/v2/metrics.go @@ -0,0 +1,35 @@ +package controller + +import ( + "sync" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +var ( + meter = otel.Meter("github.com/quay/claircore/indexer/controller/v2") + tracer = otel.Tracer("github.com/quay/claircore/indexer/controller/v2") + + stepCall metric.Int64Counter +) + +var metricInit = sync.OnceValue(func() (err error) { + stepCall, err = meter.Int64Counter("step.count", + metric.WithUnit("{call}"), + metric.WithDescription("tktk"), + ) + if err != nil { + return err + } + return nil +}) + +var ( + stepAttrKey = attribute.Key("step") +) + +func stepAttr(name string) attribute.KeyValue { + return stepAttrKey.String(name) +} diff --git a/indexer/controller/v2/states.go b/indexer/controller/v2/states.go new file mode 100644 index 000000000..fef001304 --- /dev/null +++ b/indexer/controller/v2/states.go @@ -0,0 +1,288 @@ +package controller + +import ( + "container/heap" + "context" + "errors" + "io" + + "github.com/quay/zlog" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "golang.org/x/sync/errgroup" + + "github.com/quay/claircore" + "github.com/quay/claircore/internal/wart" +) + +var _ = [...]stateFn{ + checkManifest, + indexLayers, + coalesce, + indexManifest, + manifestError, + manifestFinished, + loadManifest, + ((*plan)(nil)).Fetch, + ((*plan)(nil)).Index, +} + +func checkManifest(ctx context.Context, e *exec, m *claircore.Manifest) (stateFn, error) { + const op = `checkManifest` + defer stepCall.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(stepAttr(op)))) + ok, err := e.Store.ManifestScanned(ctx, m.Hash, e.Detectors) + if err != nil { + return nil, err + } + + if ok { + zlog.Info(ctx).Msg("manifest already scanned") + return loadManifest, nil + } + + // if we haven't seen this manifest, determine which scanners to use, persist it + // and transition to FetchLayer state. + zlog.Info(ctx).Msg("manifest to be scanned") + + // TODO(hank) Should add some API that reports this per-layer and doesn't + // need loops like this. + descs := wart.LayersToDescriptions(m.Layers) + plan := plan{ + Reqs: make([]layerRequest, len(descs)), + Execs: make([]layerExec, len(descs)), + N: 1, // TODO(hank) Make concurrency configurable. + } + for i, desc := range descs { + p := &plan.Reqs[i] + p.Desc = desc + for _, det := range e.Detectors { + d, err := claircore.ParseDigest(desc.Digest) + if err != nil { + return nil, newLayerError(desc.Digest, `digest parse failure`, err) + } + ok, err := e.Store.LayerScanned(ctx, d, det) + if err != nil { + return nil, newLayerError(desc.Digest, `layer existence lookup`, err) + } + if ok { + continue + } + p.Detector = append(p.Detector, det) + } + } + + /* + if err := e.Store.PersistManifest(ctx, *m); err != nil { + return nil, fmt.Errorf("%s: failed to persist manifest: %w", op, err) + } + */ + return plan.Fetch, nil + +} + +type plan struct { + N int + Reqs []layerRequest + Execs []layerExec +} + +func (p *plan) Fetch(ctx context.Context, e *exec, m *claircore.Manifest) (stateFn, error) { + const op = `fetch` + h := execOrder(make([]workItem, len(p.Reqs))) + for i := range h { + h[i] = workItem{ + Req: &p.Reqs[i], + Exec: &p.Execs[i], + Index: -1, + } + } + heap.Init(&h) + zlog.Info(ctx). + Interface("plan", h). + Msg("planned execution") + + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(p.N) + for v := heap.Pop(&h); h.Len() > 0; v = heap.Pop(&h) { + w := v.(workItem) + d := w.Req.Desc.Digest + ctx := zlog.ContextWithValues(ctx, "layer", d) + select { + case <-ctx.Done(): + zlog.Debug(ctx). + Msg("context done") + w.Exec.Err = newLayerError(d, op, context.Cause(ctx)) + continue + default: + } + if len(w.Req.Detector) == 0 { + zlog.Info(ctx). + Msg("no fetch needed; skipping") + continue + } + eg.Go(func() error { + zlog.Warn(ctx). + Msg("need to fetch; unimplemented") + w.Exec.Err = newLayerError(d, op, errors.New("TODO")) + return nil + }) + } + eg.Wait() + + errs := make([]error, len(p.Execs)) + for i, exec := range p.Execs { + errs[i] = exec.Err + } + if err := errors.Join(errs...); err != nil { + return nil, &stepError{ + durability: errKindTransient, + inner: err, + } + } + + return p.Index, nil +} + +// ... +// +// This is overkill for the current use cases, but should allow easy +// modifications to the ordering logic later. If we had a size hint, ordering by +// detector-bytes seems like a way to reduce latency. +type execOrder []workItem + +type workItem struct { + Req *layerRequest + Exec *layerExec + Index int +} + +// Len implements [heap.Interface]. +func (q *execOrder) Len() int { + return len(*q) +} + +// Less implements [heap.Interface]. +func (q *execOrder) Less(i int, j int) bool { + a, b := (*q)[i].Req, (*q)[j].Req + return len(a.Detector) < len(b.Detector) +} + +// Pop implements [heap.Interface]. +func (q *execOrder) Pop() any { + s := *q + n := len(s) + p := s[n-1] + s[n-1] = workItem{Index: -1} + p.Index = -1 + *q = s[:n-1] + return p +} + +// Push implements [heap.Interface]. +func (q *execOrder) Push(x any) { + i := len(*q) + p := x.(workItem) + p.Index = i + *q = append(*q, p) +} + +// Swap implements [heap.Interface]. +func (q *execOrder) Swap(i int, j int) { + (*q)[i], (*q)[j] = (*q)[j], (*q)[i] + (*q)[i].Index = i + (*q)[j].Index = j +} + +var _ heap.Interface = (*execOrder)(nil) + +type layerRequest struct { + Desc claircore.LayerDescription + Detector []detector +} + +type layerExec struct { + Cleanup io.Closer + Err error + Layer claircore.Layer +} + +func (p *plan) Index(ctx context.Context, e *exec, m *claircore.Manifest) (stateFn, error) { + const op = `index` + h := execOrder(make([]workItem, len(p.Reqs))) + for i := range h { + h[i] = workItem{ + Req: &p.Reqs[i], + Exec: &p.Execs[i], + Index: -1, + } + } + heap.Init(&h) + zlog.Info(ctx). + Interface("plan", h). + Msg("planned execution") + + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(p.N) + for v := heap.Pop(&h); h.Len() > 0; v = heap.Pop(&h) { + w := v.(workItem) + d := w.Req.Desc.Digest + ctx := zlog.ContextWithValues(ctx, "layer", d) + select { + case <-ctx.Done(): + zlog.Debug(ctx). + Msg("context done") + w.Exec.Err = newLayerError(d, op, context.Cause(ctx)) + continue + default: + } + if len(w.Req.Detector) == 0 { + zlog.Info(ctx). + Msg("no index needed; skipping") + continue + } + eg.Go(func() error { + zlog.Warn(ctx). + Msg("need to index; unimplemented") + w.Exec.Err = newLayerError(d, op, errors.New("TODO")) + return nil + }) + } + eg.Wait() + + errs := make([]error, len(p.Execs)) + for i, exec := range p.Execs { + errs[i] = exec.Err + } + if err := errors.Join(errs...); err != nil { + return nil, &stepError{ + durability: errKindTransient, + inner: err, + } + } + + return nil, errors.New("TODO: next step") +} + +func indexLayers(ctx context.Context, e *exec, m *claircore.Manifest) (stateFn, error) { + panic("unimplemented") +} + +func coalesce(ctx context.Context, e *exec, m *claircore.Manifest) (stateFn, error) { + panic("unimplemented") +} + +func indexManifest(ctx context.Context, e *exec, m *claircore.Manifest) (stateFn, error) { + panic("unimplemented") +} + +func manifestError(ctx context.Context, e *exec, m *claircore.Manifest) (stateFn, error) { + panic("unimplemented") +} + +func manifestFinished(ctx context.Context, e *exec, m *claircore.Manifest) (stateFn, error) { + panic("unimplemented") +} + +func loadManifest(ctx context.Context, e *exec, m *claircore.Manifest) (stateFn, error) { + panic("unimplemented") +} diff --git a/libindex/fetcher.go b/libindex/fetcher.go index d7783d1b6..ff9573ae4 100644 --- a/libindex/fetcher.go +++ b/libindex/fetcher.go @@ -445,6 +445,22 @@ func (p *FetchProxy) RealizeDescriptions(ctx context.Context, descs []claircore. return ls, nil } +func (p *FetchProxy) RealizeDescription(ctx context.Context, desc claircore.LayerDescription) (l claircore.Layer, cl io.Closer, err error) { + ctx = zlog.ContextWithValues(ctx, + "component", "libindex/FetchProxy.RealizeDescription") + ctx, span := tracer.Start(ctx, "RealizeDescription") + defer span.End() + + err = p.a.fetchInto(ctx, &l, &cl, &desc)() + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "RealizeDescriptions errored") + } else { + span.SetStatus(codes.Ok, "") + } + return +} + // Close marks all the files backing any returned [claircore.Layer] as unused. // // This method may delete the backing files, necessitating them being fetched by