diff --git a/api/v1/common_types.go b/api/v1/common_types.go index f4ee1295..ae9a7868 100644 --- a/api/v1/common_types.go +++ b/api/v1/common_types.go @@ -19,6 +19,8 @@ const ( ReconciliationDisabledReason = "ReconciliationDisabled" ReconciliationDisabledMessage = "Reconciliation is disabled" + + HealthCheckExpr = "status.conditions.filter(c, c.type == 'Ready').all(c, c.status == 'True' && c.observedGeneration == metadata.generation)" ) var ( diff --git a/internal/controller/resourceset_controller.go b/internal/controller/resourceset_controller.go index 3dd263a7..412982b6 100644 --- a/internal/controller/resourceset_controller.go +++ b/internal/controller/resourceset_controller.go @@ -14,6 +14,7 @@ import ( "github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine" "github.com/fluxcd/cli-utils/pkg/kstatus/status" "github.com/fluxcd/cli-utils/pkg/object" + "github.com/fluxcd/pkg/apis/kustomize" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/runtime/cel" runtimeClient "github.com/fluxcd/pkg/runtime/client" @@ -314,7 +315,9 @@ func (r *ResourceSetReconciler) checkDependencies(ctx context.Context, } if dep.Ready { - if dep.ReadyExpr != "" { + switch { + // Custom CEL ready expression. + case dep.ReadyExpr != "": isReady, err := exprs[i].EvaluateBoolean(ctx, depObj.UnstructuredContent()) if err != nil { return err @@ -323,7 +326,23 @@ func (r *ResourceSetReconciler) checkDependencies(ctx context.Context, if !isReady { return fmt.Errorf("dependency %s/%s not ready: expression '%s'", dep.APIVersion, ssautil.FmtObjMetadata(depMd), dep.ReadyExpr) } - } else { + // Built-in CEL ready expression for ResourceSet and ResourceSetInputProvider. + case dep.Kind == fluxcdv1.ResourceSetKind || dep.Kind == fluxcdv1.ResourceSetInputProviderKind: + expr, err := cel.NewExpression(fluxcdv1.HealthCheckExpr) + if err != nil { + return err + } + + isReady, err := expr.EvaluateBoolean(ctx, depObj.UnstructuredContent()) + if err != nil { + return err + } + + if !isReady { + return fmt.Errorf("dependency %s/%s not ready", dep.APIVersion, ssautil.FmtObjMetadata(depMd)) + } + // Default status check using kstatus. + default: stat, err := status.Compute(depObj) if err != nil { return fmt.Errorf("dependency %s/%s not ready: %w", dep.APIVersion, ssautil.FmtObjMetadata(depMd), err) @@ -432,15 +451,10 @@ func (r *ResourceSetReconciler) apply(ctx context.Context, } // Configure the Kubernetes client for impersonation. - var impersonatorOpts []runtimeClient.ImpersonatorOption - if r.DefaultServiceAccount != "" || obj.Spec.ServiceAccountName != "" { - impersonatorOpts = append(impersonatorOpts, - runtimeClient.WithServiceAccount(r.DefaultServiceAccount, obj.Spec.ServiceAccountName, obj.GetNamespace())) - } - if r.ClusterReader != nil { - impersonatorOpts = append(impersonatorOpts, runtimeClient.WithPolling(r.ClusterReader)) + impersonation, err := r.makeImpersonator(ctx, obj) + if err != nil { + return "", err } - impersonation := runtimeClient.NewImpersonator(r.Client, impersonatorOpts...) // Create the Kubernetes client that runs under impersonation. kubeClient, statusPoller, err := impersonation.GetClient(ctx) @@ -795,6 +809,45 @@ func (r *ResourceSetReconciler) patch(ctx context.Context, return nil } +// makeImpersonator creates an impersonator for the ResourceSet. +// It configures service account impersonation and custom health check readers. +func (r *ResourceSetReconciler) makeImpersonator(ctx context.Context, obj *fluxcdv1.ResourceSet) (*runtimeClient.Impersonator, error) { + var impersonatorOpts []runtimeClient.ImpersonatorOption + + // Configure service account for impersonation. + if r.DefaultServiceAccount != "" || obj.Spec.ServiceAccountName != "" { + impersonatorOpts = append(impersonatorOpts, + runtimeClient.WithServiceAccount(r.DefaultServiceAccount, obj.Spec.ServiceAccountName, obj.GetNamespace())) + } + + // Configure the kstatus poller with custom health checks for + // Flux Operator owned resources. + if r.ClusterReader != nil { + kinds := []string{fluxcdv1.FluxInstanceKind, fluxcdv1.ResourceSetKind, fluxcdv1.ResourceSetInputProviderKind} + healthChecks := make([]kustomize.CustomHealthCheck, 0, len(kinds)) + for _, kind := range kinds { + healthChecks = append(healthChecks, kustomize.CustomHealthCheck{ + APIVersion: fluxcdv1.GroupVersion.String(), + Kind: kind, + HealthCheckExpressions: kustomize.HealthCheckExpressions{ + Current: fluxcdv1.HealthCheckExpr, + }, + }) + } + + statusReaders, err := cel.PollerWithCustomHealthChecks(ctx, healthChecks) + if err != nil { + return nil, fmt.Errorf("failed to create custom health check readers: %w", err) + } + + impersonatorOpts = append(impersonatorOpts, + runtimeClient.WithPolling(r.ClusterReader, statusReaders...), + ) + } + + return runtimeClient.NewImpersonator(r.Client, impersonatorOpts...), nil +} + func (r *ResourceSetReconciler) recordMetrics(obj *fluxcdv1.ResourceSet) error { if !obj.ObjectMeta.DeletionTimestamp.IsZero() { reporter.DeleteMetricsFor(fluxcdv1.ResourceSetKind, obj.GetName(), obj.GetNamespace()) diff --git a/internal/install/client.go b/internal/install/client.go index ff3320d8..5f7943a5 100644 --- a/internal/install/client.go +++ b/internal/install/client.go @@ -89,7 +89,7 @@ func NewStatusPoller(ctx context.Context, reader client.Reader, mapper meta.REST APIVersion: fluxcdv1.GroupVersion.String(), Kind: kind, HealthCheckExpressions: kustomize.HealthCheckExpressions{ - Current: "status.conditions.filter(e, e.type == 'Ready').all(e, e.status == 'True' && e.observedGeneration == metadata.generation)", + Current: fluxcdv1.HealthCheckExpr, }, }) }