Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion kwok/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,13 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont

subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval))
securityGroupProvider := securitygroup.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
instanceProfileProvider := instanceprofile.NewDefaultProvider(iam.NewFromConfig(cfg), cache.New(awscache.InstanceProfileTTL, awscache.DefaultCleanupInterval), cache.New(awscache.ProtectedProfilesTTL, awscache.DefaultCleanupInterval), cfg.Region)
instanceProfileProvider := instanceprofile.NewDefaultProvider(
iam.NewFromConfig(cfg),
cache.New(awscache.InstanceProfileTTL, awscache.DefaultCleanupInterval),
cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval),
cache.New(awscache.ProtectedProfilesTTL, awscache.DefaultCleanupInterval),
cfg.Region,
)
pricingProvider := pricing.NewDefaultProvider(
pricing.NewAPI(cfg),
ec2api,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func (c *Controller) Reconcile(ctx context.Context) (reconciler.Result, error) {
if err := c.cleanupInactiveProfiles(ctx); err != nil {
return reconciler.Result{}, err
}
// Requeue after 30 minutes
return reconciler.Result{RequeueAfter: 30 * time.Minute}, nil
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/controllers/nodeclass/garbagecollection/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,6 @@ var _ = Describe("Instance Profile GarbageCollection", func() {
// Run GC with no profiles to clean up
result, err := gcController.Reconcile(ctx)
Expect(err).To(BeNil())

// Verify requeue time is 30 minutes
Expect(result.RequeueAfter).To(Equal(30 * time.Minute))
})

Expand Down
6 changes: 5 additions & 1 deletion pkg/controllers/nodeclass/instanceprofile.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (ip *InstanceProfile) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeC
profile, err := ip.instanceProfileProvider.Get(ctx, nodeClass.Status.InstanceProfile)
if err != nil {
if !awserrors.IsNotFound(err) {
return reconcile.Result{}, fmt.Errorf("getting instance profile %s: %w", nodeClass.Status.InstanceProfile, err)
return reconcile.Result{}, fmt.Errorf("getting instance profile %s, %w", nodeClass.Status.InstanceProfile, err)
}
} else if len(profile.Roles) > 0 {
currentRole = lo.FromPtr(profile.Roles[0].RoleName)
Expand All @@ -86,6 +86,10 @@ func (ip *InstanceProfile) Reconcile(ctx context.Context, nodeClass *v1.EC2NodeC
nodeClass.InstanceProfileTags(options.FromContext(ctx).ClusterName, ip.region),
string(nodeClass.UID),
); err != nil {
// If we failed Create, we may have successfully created the instance profile but failed to either attach the new
// role or remove the existing role. To prevent runaway instance profile creation, we'll attempt to delete the
// profile. We'll fail open here and rely on the garbage collector as a backstop.
_ = ip.instanceProfileProvider.Delete(ctx, newProfileName)
return reconcile.Result{}, fmt.Errorf("creating instance profile, %w", err)
}
ip.recreationCache.SetDefault(generateCacheKey(nodeClass), newProfileName)
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/nodeclass/instanceprofile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,4 +537,15 @@ var _ = Describe("NodeClass InstanceProfile Status Controller", func() {
Expect(profile.Roles).To(HaveLen(1))
Expect(*profile.Roles[0].RoleName).To(Equal("role-A"))
})

It("should attempt to delete the instance profile if there was a creation failure", func() {
awsEnv.IAMAPI.AddRoleToInstanceProfileBehavior.Error.Set(fmt.Errorf("failed to attach role"))
nodeClass.Spec.Role = "role-A"
ExpectApplied(ctx, env.Client, nodeClass)
_ = ExpectObjectReconcileFailed(ctx, env.Client, controller, nodeClass)

nodeClass = ExpectExists(ctx, env.Client, nodeClass)
Expect(awsEnv.IAMAPI.InstanceProfiles).To(HaveLen(0))
Expect(awsEnv.IAMAPI.DeleteInstanceProfileBehavior.CalledWithInput.Len()).To(Equal(1))
})
})
17 changes: 13 additions & 4 deletions pkg/fake/iamapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ type IAMAPI struct {
IAMAPIBehavior

InstanceProfiles map[string]*iamtypes.InstanceProfile
Roles map[string]*iamtypes.Role

// TODO (jmdeal@): Update remaining tests to pass role validation
EnableRoleValidation bool
Roles map[string]*iamtypes.Role
}

func NewIAMAPI() *IAMAPI {
Expand All @@ -71,6 +74,7 @@ func (s *IAMAPI) Reset() {
s.ListInstanceProfilesBehavior.Reset()
s.GetRoleBehavior.Reset()
s.InstanceProfiles = map[string]*iamtypes.InstanceProfile{}
s.EnableRoleValidation = false
s.Roles = map[string]*iamtypes.Role{}
}

Expand Down Expand Up @@ -162,6 +166,12 @@ func (s *IAMAPI) AddRoleToInstanceProfile(_ context.Context, input *iam.AddRoleT
s.Lock()
defer s.Unlock()

if _, ok := s.Roles[aws.ToString(input.RoleName)]; !ok && s.EnableRoleValidation {
return nil, &smithy.GenericAPIError{
Code: "NoSuchEntity",
Message: fmt.Sprintf("The role with name %s cannot be found", aws.ToString(input.RoleName)),
}
}
if i, ok := s.InstanceProfiles[aws.ToString(input.InstanceProfileName)]; ok {
if len(i.Roles) > 0 {
return nil, &smithy.GenericAPIError{
Expand All @@ -174,9 +184,8 @@ func (s *IAMAPI) AddRoleToInstanceProfile(_ context.Context, input *iam.AddRoleT
return nil, nil
}
return nil, &smithy.GenericAPIError{
Code: "NoSuchEntity",
Message: fmt.Sprintf("Instance Profile %s cannot be found",
aws.ToString(input.InstanceProfileName)),
Code: "NoSuchEntity",
Message: fmt.Sprintf("Instance Profile %s cannot be found", aws.ToString(input.InstanceProfileName)),
}
})
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont

subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval))
securityGroupProvider := securitygroup.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
instanceProfileProvider := instanceprofile.NewDefaultProvider(iam.NewFromConfig(cfg), cache.New(awscache.InstanceProfileTTL, awscache.DefaultCleanupInterval), cache.New(awscache.ProtectedProfilesTTL, awscache.DefaultCleanupInterval), cfg.Region)
instanceProfileProvider := instanceprofile.NewDefaultProvider(
iam.NewFromConfig(cfg),
cache.New(awscache.InstanceProfileTTL, awscache.DefaultCleanupInterval),
cache.New(awscache.DefaultCleanupInterval, awscache.DefaultCleanupInterval),
cache.New(awscache.ProtectedProfilesTTL, awscache.DefaultCleanupInterval),
cfg.Region,
)
pricingProvider := pricing.NewDefaultProvider(
pricing.NewAPI(cfg),
ec2api,
Expand Down
141 changes: 100 additions & 41 deletions pkg/providers/instanceprofile/instanceprofile.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/aws/aws-sdk-go-v2/service/iam"
iamtypes "github.com/aws/aws-sdk-go-v2/service/iam/types"
"github.com/aws/smithy-go"
"github.com/awslabs/operatorpkg/serrors"
cache "github.com/patrickmn/go-cache"
"github.com/samber/lo"
Expand All @@ -44,32 +45,36 @@ type Provider interface {
}

type DefaultProvider struct {
iamapi sdk.IAMAPI
cache *cache.Cache // instanceProfileName -> *iamtypes.InstanceProfile
protectedProfiles *cache.Cache // Cache to account for eventual consistency delays when garbage collecting
region string
iamapi sdk.IAMAPI
instanceProfileCache *cache.Cache // instanceProfileName -> *iamtypes.InstanceProfile
roleNotFoundErrorCache RoleNotFoundErrorCache
protectedProfileCache *cache.Cache // Cache to account for eventual consistency delays when garbage collecting
region string
}

func NewDefaultProvider(iamapi sdk.IAMAPI, cache *cache.Cache, protectedProfiles *cache.Cache, region string) *DefaultProvider {
func NewDefaultProvider(
iamapi sdk.IAMAPI,
instanceProfileCache *cache.Cache,
roleCache *cache.Cache,
protectedProfileCache *cache.Cache,
region string,
) *DefaultProvider {
return &DefaultProvider{
iamapi: iamapi,
cache: cache,
protectedProfiles: protectedProfiles,
region: region,
iamapi: iamapi,
instanceProfileCache: instanceProfileCache,
roleNotFoundErrorCache: RoleNotFoundErrorCache{Cache: roleCache},
protectedProfileCache: protectedProfileCache,
region: region,
}
}

func getProfileCacheKey(profileName string) string {
return "instance-profile:" + profileName
}

func getRoleCacheKey(roleName string) string {
return "role:" + roleName
}

func (p *DefaultProvider) Get(ctx context.Context, instanceProfileName string) (*iamtypes.InstanceProfile, error) {
profileCacheKey := getProfileCacheKey(instanceProfileName)
if instanceProfile, ok := p.cache.Get(profileCacheKey); ok {
if instanceProfile, ok := p.instanceProfileCache.Get(profileCacheKey); ok {
return instanceProfile.(*iamtypes.InstanceProfile), nil
}
out, err := p.iamapi.GetInstanceProfile(ctx, &iam.GetInstanceProfileInput{
Expand All @@ -78,22 +83,18 @@ func (p *DefaultProvider) Get(ctx context.Context, instanceProfileName string) (
if err != nil {
return nil, err
}
p.cache.SetDefault(profileCacheKey, out.InstanceProfile)
p.instanceProfileCache.SetDefault(profileCacheKey, out.InstanceProfile)
return out.InstanceProfile, nil
}

func (p *DefaultProvider) Create(ctx context.Context, instanceProfileName string, roleName string, tags map[string]string, nodeClassUID string) error {
profileCacheKey := getProfileCacheKey(instanceProfileName)
roleCacheKey := getRoleCacheKey(roleName)
if _, ok := p.cache.Get(roleCacheKey); !ok {
out, err := p.iamapi.GetRole(ctx, &iam.GetRoleInput{
RoleName: lo.ToPtr(roleName),
})
if err != nil {
return serrors.Wrap(fmt.Errorf("role does not exist, %w", err), "role", roleName)
}
p.cache.SetDefault(roleCacheKey, out.Role)
// Don't attempt to create an instance profile if the role hasn't been found. This prevents runaway instance profile
// creation by the NodeClass controller when there's a missing role.
if err, ok := p.roleNotFoundErrorCache.HasError(roleName); ok {
return fmt.Errorf("role not found, %w", err)
}

profileCacheKey := getProfileCacheKey(instanceProfileName)
instanceProfile, err := p.Get(ctx, instanceProfileName)
if err != nil {
if !awserrors.IsNotFound(err) {
Expand All @@ -105,37 +106,59 @@ func (p *DefaultProvider) Create(ctx context.Context, instanceProfileName string
Path: lo.ToPtr(fmt.Sprintf("/karpenter/%s/%s/%s/", p.region, options.FromContext(ctx).ClusterName, nodeClassUID)),
})
if err != nil {
return serrors.Wrap(fmt.Errorf("creating instance profile, %w", err), "instance-profile", instanceProfileName)
return serrors.Wrap(err, "instance-profile", instanceProfileName)
}

instanceProfile = o.InstanceProfile
}
if err := p.ensureRole(ctx, instanceProfile, roleName); err != nil {
return fmt.Errorf("ensuring role attached, %w", err)
}
// Add the role to the cached instance profile for detection in the ensureRole check based on the cache entry
instanceProfile.Roles = []iamtypes.Role{{
RoleName: lo.ToPtr(roleName),
}}
p.instanceProfileCache.SetDefault(profileCacheKey, instanceProfile)
return nil
}

// ensureRole ensures that the correct role is attached to the provided instance profile. If a non-matching role is
// found already attached, it's removed.
func (p *DefaultProvider) ensureRole(ctx context.Context, instanceProfile *iamtypes.InstanceProfile, roleName string) error {
// Instance profiles can only have a single role assigned to them so this profile either has 1 or 0 roles
// https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_switch-role-ec2_instance-profiles.html
if len(instanceProfile.Roles) == 1 {
if lo.FromPtr(instanceProfile.Roles[0].RoleName) == roleName {
return nil
}
if _, err = p.iamapi.RemoveRoleFromInstanceProfile(ctx, &iam.RemoveRoleFromInstanceProfileInput{
InstanceProfileName: lo.ToPtr(instanceProfileName),
if _, err := p.iamapi.RemoveRoleFromInstanceProfile(ctx, &iam.RemoveRoleFromInstanceProfileInput{
InstanceProfileName: instanceProfile.InstanceProfileName,
RoleName: instanceProfile.Roles[0].RoleName,
}); err != nil {
return serrors.Wrap(fmt.Errorf("removing role for instance profile, %w", err), "role", lo.FromPtr(instanceProfile.Roles[0].RoleName), "instance-profile", instanceProfileName)
return serrors.Wrap(
fmt.Errorf("removing role for instance profile, %w", err),
"role", lo.FromPtr(instanceProfile.Roles[0].RoleName),
"instance-profile", lo.FromPtr(instanceProfile.InstanceProfileName),
)
}
}

// If the role has a path, ignore the path and take the role name only since AddRoleToInstanceProfile
// does not support paths in the role name.
roleName = lo.LastOr(strings.Split(roleName, "/"), roleName)
if _, err = p.iamapi.AddRoleToInstanceProfile(ctx, &iam.AddRoleToInstanceProfileInput{
InstanceProfileName: lo.ToPtr(instanceProfileName),
if _, err := p.iamapi.AddRoleToInstanceProfile(ctx, &iam.AddRoleToInstanceProfileInput{
InstanceProfileName: instanceProfile.InstanceProfileName,
RoleName: lo.ToPtr(roleName),
}); err != nil {
return serrors.Wrap(fmt.Errorf("adding role to instance profile, %w", err), "role", roleName, "instance-profile", instanceProfileName)
err = serrors.Wrap(
fmt.Errorf("adding role to instance profile, %w", err),
"role", roleName,
"instance-profile", lo.FromPtr(instanceProfile.InstanceProfileName),
)
if IsRoleNotFoundError(err) {
p.roleNotFoundErrorCache.SetError(roleName, err)
}
return err
}
instanceProfile.Roles = []iamtypes.Role{{
RoleName: lo.ToPtr(roleName),
}}
p.cache.SetDefault(profileCacheKey, instanceProfile)
return nil
}

Expand All @@ -162,7 +185,7 @@ func (p *DefaultProvider) Delete(ctx context.Context, instanceProfileName string
}); err != nil {
return awserrors.IgnoreNotFound(serrors.Wrap(fmt.Errorf("deleting instance profile, %w", err), "instance-profile", instanceProfileName))
}
p.cache.Delete(profileCacheKey)
p.instanceProfileCache.Delete(profileCacheKey)
p.SetProtectedState(instanceProfileName, false)
return nil
}
Expand Down Expand Up @@ -204,14 +227,50 @@ func (p *DefaultProvider) ListNodeClassProfiles(ctx context.Context, nodeClass *
}

func (p *DefaultProvider) IsProtected(profileName string) bool {
_, exists := p.protectedProfiles.Get(profileName)
_, exists := p.protectedProfileCache.Get(profileName)
return exists
}

func (p *DefaultProvider) SetProtectedState(profileName string, protected bool) {
if !protected {
p.protectedProfiles.Delete(profileName)
p.protectedProfileCache.Delete(profileName)
} else {
p.protectedProfiles.SetDefault(profileName, struct{}{})
p.protectedProfileCache.SetDefault(profileName, struct{}{})
}
}

// IsRoleNotFoundError converts a smithy.APIError returned by AddRoleToInstanceProfile to a RoleNotFoundError.
func IsRoleNotFoundError(err error) bool {
if err == nil {
return false
}
apiErr, ok := lo.ErrorsAs[smithy.APIError](err)
if !ok {
return false
}
if apiErr.ErrorCode() != "NoSuchEntity" {
return false
}
// Differentiate between the instance profile not being found, and the role.
if !strings.Contains(apiErr.ErrorMessage(), "role") {
return false
}
return true
}

// RoleNotFoundErrorCache is a wrapper around a go-cache for handling role not found errors returned by AddRoleToInstanceProfile.
type RoleNotFoundErrorCache struct {
*cache.Cache
}

// HasError returns the last RoleNotFoundError encountered when attempting to add the given role to an instance profile.
func (rc RoleNotFoundErrorCache) HasError(roleName string) (error, bool) {
if err, ok := rc.Get(roleName); ok {
return err.(error), true
}
return nil, false
}

func (rc RoleNotFoundErrorCache) SetError(roleName string, err error) {
rc.SetDefault(roleName, err)
}
34 changes: 34 additions & 0 deletions pkg/providers/instanceprofile/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package instanceprofile_test

import (
"context"
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -320,4 +321,37 @@ var _ = Describe("InstanceProfileProvider", func() {
awsEnv.InstanceProfileProvider.SetProtectedState(profileName, false)
Expect(awsEnv.InstanceProfileProvider.IsProtected(profileName)).To(BeFalse())
})

Context("Role Cache", func() {
const roleName = "test-role"
BeforeEach(func() {
awsEnv.IAMAPI.EnableRoleValidation = true
awsEnv.IAMAPI.Roles = map[string]*iamtypes.Role{
roleName: &iamtypes.Role{RoleName: lo.ToPtr(roleName)},
}
})
It("should not cache role not found errors when the role exists", func() {
err := awsEnv.InstanceProfileProvider.Create(ctx, "test-profile", roleName, nil, "test-uid")
Expect(err).ToNot(HaveOccurred())
_, ok := awsEnv.RoleCache.Get(roleName)
Expect(ok).To(BeFalse())
})
It("should cache role not found errors when the role does not", func() {
missingRoleName := "non-existent-role"
err := awsEnv.InstanceProfileProvider.Create(ctx, "test-profile", missingRoleName, nil, "test-uid")
Expect(err).To(HaveOccurred())
_, ok := awsEnv.RoleCache.Get(missingRoleName)
Expect(ok).To(BeTrue())
})
It("should not attempt to create instance profile when role is cached as not found", func() {
missingRoleName := "non-existent-role"
awsEnv.RoleCache.SetDefault(missingRoleName, errors.New("role not found"))

err := awsEnv.InstanceProfileProvider.Create(ctx, "test-profile", missingRoleName, nil, "test-uid")
Expect(err).To(HaveOccurred())

Expect(awsEnv.IAMAPI.InstanceProfiles).To(HaveLen(0))
Expect(awsEnv.IAMAPI.CreateInstanceProfileBehavior.Calls()).To(BeZero())
})
})
})
Loading