Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ const (

ReconciliationDisabledReason = "ReconciliationDisabled"
ReconciliationDisabledMessage = "Reconciliation is disabled"

HealthCheckExpr = "status.conditions.filter(c, c.type == 'Ready').all(c, c.status == 'True' && c.observedGeneration == metadata.generation)"
)

var (
Expand Down
73 changes: 63 additions & 10 deletions internal/controller/resourceset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine"
"github.com/fluxcd/cli-utils/pkg/kstatus/status"
"github.com/fluxcd/cli-utils/pkg/object"
"github.com/fluxcd/pkg/apis/kustomize"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/cel"
runtimeClient "github.com/fluxcd/pkg/runtime/client"
Expand Down Expand Up @@ -314,7 +315,9 @@ func (r *ResourceSetReconciler) checkDependencies(ctx context.Context,
}

if dep.Ready {
if dep.ReadyExpr != "" {
switch {
// Custom CEL ready expression.
case dep.ReadyExpr != "":
isReady, err := exprs[i].EvaluateBoolean(ctx, depObj.UnstructuredContent())
if err != nil {
return err
Expand All @@ -323,7 +326,23 @@ func (r *ResourceSetReconciler) checkDependencies(ctx context.Context,
if !isReady {
return fmt.Errorf("dependency %s/%s not ready: expression '%s'", dep.APIVersion, ssautil.FmtObjMetadata(depMd), dep.ReadyExpr)
}
} else {
// Built-in CEL ready expression for ResourceSet and ResourceSetInputProvider.
case dep.Kind == fluxcdv1.ResourceSetKind || dep.Kind == fluxcdv1.ResourceSetInputProviderKind:
expr, err := cel.NewExpression(fluxcdv1.HealthCheckExpr)
if err != nil {
return err
}

isReady, err := expr.EvaluateBoolean(ctx, depObj.UnstructuredContent())
if err != nil {
return err
}

if !isReady {
return fmt.Errorf("dependency %s/%s not ready", dep.APIVersion, ssautil.FmtObjMetadata(depMd))
}
// Default status check using kstatus.
default:
stat, err := status.Compute(depObj)
if err != nil {
return fmt.Errorf("dependency %s/%s not ready: %w", dep.APIVersion, ssautil.FmtObjMetadata(depMd), err)
Expand Down Expand Up @@ -432,15 +451,10 @@ func (r *ResourceSetReconciler) apply(ctx context.Context,
}

// Configure the Kubernetes client for impersonation.
var impersonatorOpts []runtimeClient.ImpersonatorOption
if r.DefaultServiceAccount != "" || obj.Spec.ServiceAccountName != "" {
impersonatorOpts = append(impersonatorOpts,
runtimeClient.WithServiceAccount(r.DefaultServiceAccount, obj.Spec.ServiceAccountName, obj.GetNamespace()))
}
if r.ClusterReader != nil {
impersonatorOpts = append(impersonatorOpts, runtimeClient.WithPolling(r.ClusterReader))
impersonation, err := r.makeImpersonator(ctx, obj)
if err != nil {
return "", err
}
impersonation := runtimeClient.NewImpersonator(r.Client, impersonatorOpts...)

// Create the Kubernetes client that runs under impersonation.
kubeClient, statusPoller, err := impersonation.GetClient(ctx)
Expand Down Expand Up @@ -795,6 +809,45 @@ func (r *ResourceSetReconciler) patch(ctx context.Context,
return nil
}

// makeImpersonator creates an impersonator for the ResourceSet.
// It configures service account impersonation and custom health check readers.
func (r *ResourceSetReconciler) makeImpersonator(ctx context.Context, obj *fluxcdv1.ResourceSet) (*runtimeClient.Impersonator, error) {
var impersonatorOpts []runtimeClient.ImpersonatorOption

// Configure service account for impersonation.
if r.DefaultServiceAccount != "" || obj.Spec.ServiceAccountName != "" {
impersonatorOpts = append(impersonatorOpts,
runtimeClient.WithServiceAccount(r.DefaultServiceAccount, obj.Spec.ServiceAccountName, obj.GetNamespace()))
}

// Configure the kstatus poller with custom health checks for
// Flux Operator owned resources.
if r.ClusterReader != nil {
kinds := []string{fluxcdv1.FluxInstanceKind, fluxcdv1.ResourceSetKind, fluxcdv1.ResourceSetInputProviderKind}
healthChecks := make([]kustomize.CustomHealthCheck, 0, len(kinds))
for _, kind := range kinds {
healthChecks = append(healthChecks, kustomize.CustomHealthCheck{
APIVersion: fluxcdv1.GroupVersion.String(),
Kind: kind,
HealthCheckExpressions: kustomize.HealthCheckExpressions{
Current: fluxcdv1.HealthCheckExpr,
},
})
}

statusReaders, err := cel.PollerWithCustomHealthChecks(ctx, healthChecks)
if err != nil {
return nil, fmt.Errorf("failed to create custom health check readers: %w", err)
}

impersonatorOpts = append(impersonatorOpts,
runtimeClient.WithPolling(r.ClusterReader, statusReaders...),
)
}

return runtimeClient.NewImpersonator(r.Client, impersonatorOpts...), nil
}

func (r *ResourceSetReconciler) recordMetrics(obj *fluxcdv1.ResourceSet) error {
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
reporter.DeleteMetricsFor(fluxcdv1.ResourceSetKind, obj.GetName(), obj.GetNamespace())
Expand Down
2 changes: 1 addition & 1 deletion internal/install/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func NewStatusPoller(ctx context.Context, reader client.Reader, mapper meta.REST
APIVersion: fluxcdv1.GroupVersion.String(),
Kind: kind,
HealthCheckExpressions: kustomize.HealthCheckExpressions{
Current: "status.conditions.filter(e, e.type == 'Ready').all(e, e.status == 'True' && e.observedGeneration == metadata.generation)",
Current: fluxcdv1.HealthCheckExpr,
},
})
}
Expand Down
Loading