Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions internal/kube/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Controller struct {
connectorWatcher *watchers.ConnectorWatcher
linkAccessWatcher *watchers.RouterAccessWatcher
grantWatcher *watchers.AccessGrantWatcher
serviceWatcher *watchers.ServiceWatcher
sites map[string]*site.Site
startGrantServer func()
accessMgr *securedaccess.SecuredAccessManager
Expand All @@ -52,6 +53,7 @@ type Controller struct {
attachableConnectors map[string]*skupperv2alpha1.AttachedConnector
log *slog.Logger
namespaces *NamespaceConfig
observedServices map[string]string
}

func skupperRouterConfig() internalinterfaces.TweakListOptionsFunc {
Expand All @@ -71,6 +73,12 @@ func listenerServices() internalinterfaces.TweakListOptionsFunc {
}
}

func sansSkupperListenerServices() internalinterfaces.TweakListOptionsFunc {
return func(options *metav1.ListOptions) {
options.LabelSelector = "!internal.skupper.io/listener"
}
}

func skupperSiteSizingConfig() internalinterfaces.TweakListOptionsFunc {
return func(options *metav1.ListOptions) {
options.LabelSelector = sizing.SiteSizingLabel
Expand All @@ -91,6 +99,7 @@ func NewController(cli internalclient.Clients, config *Config) (*Controller, err
labelling: labels.NewLabelsAndAnnotations(config.Namespace),
attachableConnectors: map[string]*skupperv2alpha1.AttachedConnector{},
log: slog.New(slog.Default().Handler()).With(slog.String("component", "kube.controller")),
observedServices: map[string]string{},
}

hostname := os.Getenv("HOSTNAME")
Expand Down Expand Up @@ -120,6 +129,7 @@ func NewController(cli internalclient.Clients, config *Config) (*Controller, err
controller.siteWatcher = controller.eventProcessor.WatchSites(config.WatchNamespace, filter(controller, controller.checkSite))
controller.listenerWatcher = controller.eventProcessor.WatchListeners(config.WatchNamespace, filter(controller, controller.checkListener))
controller.eventProcessor.WatchServices(listenerServices(), config.WatchNamespace, filter(controller, controller.checkListenerService))
controller.serviceWatcher = controller.eventProcessor.WatchServices(sansSkupperListenerServices(), config.WatchNamespace, filter(controller, controller.checkObservedService))
controller.connectorWatcher = controller.eventProcessor.WatchConnectors(config.WatchNamespace, filter(controller, controller.checkConnector))
controller.linkAccessWatcher = controller.eventProcessor.WatchRouterAccesses(config.WatchNamespace, filter(controller, controller.checkRouterAccess))
controller.eventProcessor.WatchAttachedConnectors(config.WatchNamespace, filter(controller, controller.checkAttachedConnector))
Expand Down Expand Up @@ -220,6 +230,10 @@ func (c *Controller) init(stopCh <-chan struct{}) error {
)
c.labelling.Update(config.Namespace+"/"+config.Name, config)
}
// get observed services prior to restoring listeners
for _, svc := range c.serviceWatcher.List() {
c.observedServices[svc.Namespace+"/"+svc.ObjectMeta.Name] = svc.ObjectMeta.Name
}
//recover existing sites & bindings
siteRecovery := site.NewSiteRecovery(c.eventProcessor.GetKubeClient())
for _, site := range c.siteWatcher.List() {
Expand Down Expand Up @@ -266,7 +280,8 @@ func (c *Controller) init(stopCh <-chan struct{}) error {
slog.String("name", listener.Name),
slog.String("namespace", listener.Namespace),
)
site.CheckListener(listener.ObjectMeta.Name, listener)
_, svcExists := c.observedServices[listener.ObjectMeta.Namespace+"/"+listener.Spec.Host]
site.CheckListener(listener.ObjectMeta.Name, listener, svcExists)
}
for _, la := range c.linkAccessWatcher.List() {
if !c.namespaces.isControlled(la.Namespace) {
Expand Down Expand Up @@ -367,7 +382,11 @@ func (c *Controller) checkListener(key string, listener *skupperv2alpha1.Listene
if err != nil {
return err
}
return c.getSite(namespace).CheckListener(name, listener)
svcExists := false
if listener != nil {
_, svcExists = c.observedServices[namespace+"/"+listener.Spec.Host]
}
return c.getSite(namespace).CheckListener(name, listener, svcExists)
}

func (c *Controller) checkListenerService(key string, svc *corev1.Service) error {
Expand All @@ -378,6 +397,17 @@ func (c *Controller) checkListenerService(key string, svc *corev1.Service) error
return c.getSite(svc.Namespace).CheckListenerService(svc)
}

func (c *Controller) checkObservedService(key string, svc *corev1.Service) error {
c.log.Debug("checkObservedService", slog.String("key", key))

if svc == nil {
delete(c.observedServices, key)
} else {
c.observedServices[key] = svc.ObjectMeta.Name
}
return nil
}

func (c *Controller) checkLink(key string, linkconfig *skupperv2alpha1.Link) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/kube/site/per_target_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (p *PerTargetListener) updateListener(l *skupperv2alpha1.Listener) bool {
}

func (p *PerTargetListener) extractTargets(network []skupperv2alpha1.SiteRecord, mapping *qdr.PortMapping, exposedPorts ExposedPorts, context BindingContext) (bool, error) {
p.logger.Info("Extracting targets for listener",
p.logger.Debug("Extracting targets for listener",
slog.String("namespace", p.definition.Namespace),
slog.String("listener", p.definition.Name))
targets := extractTargets(p.address(""), network)
Expand Down
11 changes: 7 additions & 4 deletions internal/kube/site/site.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,11 +866,10 @@ func (s *Site) CheckConnector(name string, connector *skupperv2alpha1.Connector)

func (s *Site) updateListenerStatus(listener *skupperv2alpha1.Listener, err error) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker: should be this method called something like updateListenerErrorStatus, given that it is only used in error cases?

if listener.SetConfigured(err) {
updated, err := s.clients.GetSkupperClient().SkupperV2alpha1().Listeners(listener.ObjectMeta.Namespace).UpdateStatus(context.TODO(), listener, metav1.UpdateOptions{})
_, err := s.clients.GetSkupperClient().SkupperV2alpha1().Listeners(listener.ObjectMeta.Namespace).UpdateStatus(context.TODO(), listener, metav1.UpdateOptions{})
if err != nil {
return err
}
s.bindings.UpdateListener(updated.Name, updated)
}
return nil
}
Expand Down Expand Up @@ -905,14 +904,18 @@ func (s *Site) CheckListenerService(svc *corev1.Service) error {
return nil
}

func (s *Site) CheckListener(name string, listener *skupperv2alpha1.Listener) error {
update, err1 := s.bindings.UpdateListener(name, listener)
func (s *Site) CheckListener(name string, listener *skupperv2alpha1.Listener, svcExists bool) error {
if s.site == nil {
if listener == nil {
return nil
}
return s.updateListenerStatus(listener, stderrors.New("No active site in namespace"))
}
if listener != nil && svcExists {
return s.updateListenerStatus(listener, fmt.Errorf("Service %s already exists in namespace", listener.Spec.Host))
}

update, err1 := s.bindings.UpdateListener(name, listener)
if update == nil {
return nil
}
Expand Down
59 changes: 56 additions & 3 deletions internal/kube/site/site_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,9 @@ func TestSite_ExposeUnexpose(t *testing.T) {
}
func TestSite_CheckListener(t *testing.T) {
type args struct {
name string
listener *skupperv2alpha1.Listener
name string
listener *skupperv2alpha1.Listener
svcExists bool
}
tests := []struct {
name string
Expand Down Expand Up @@ -318,6 +319,7 @@ func TestSite_CheckListener(t *testing.T) {
Host: "1.2.3.4",
},
},
svcExists: false,
},
skupperObjects: []runtime.Object{
&skupperv2alpha1.Listener{
Expand All @@ -331,6 +333,57 @@ func TestSite_CheckListener(t *testing.T) {
wantErr: false,
wantListeners: 1,
},
{
name: "pre-existing service for listener added",
args: args{
name: "listener1",
listener: &skupperv2alpha1.Listener{
ObjectMeta: v1.ObjectMeta{
Name: "listener1",
Namespace: "test",
UID: "8a96ffdf-403b-4e4a-83a8-97d3d459adb6",
},
Spec: skupperv2alpha1.ListenerSpec{
RoutingKey: "backend",
Port: 8080,
Type: "tcp",
Host: "backend",
},
},
svcExists: true,
},
skupperObjects: []runtime.Object{
&skupperv2alpha1.Listener{
ObjectMeta: v1.ObjectMeta{
Name: "listener1",
Namespace: "test",
},
},
},
k8sObjects: []runtime.Object{
&corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: "backend",
Namespace: "test",
},
Status: corev1.ServiceStatus{
Conditions: []v1.Condition{
{
Type: "Configured",
Status: "True",
},
},
},
},
},
want: "initialized",
wantErr: false,
wantListeners: 0,
},
/* TBD updateListenerStatus if kube command err == nil it just
returns and doesn't update s.bindings.UpdateListener ???
{
Expand Down Expand Up @@ -369,7 +422,7 @@ func TestSite_CheckListener(t *testing.T) {
assert.Assert(t, err)
}

if err := s.CheckListener(tt.args.name, tt.args.listener); (err != nil) != tt.wantErr {
if err := s.CheckListener(tt.args.name, tt.args.listener, tt.args.svcExists); (err != nil) != tt.wantErr {
t.Errorf("Site.CheckListener() error = %v", err)
}

Expand Down
1 change: 0 additions & 1 deletion internal/qdr/port_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type PortMapping struct {

func (p *PortMapping) GetPortForKey(key string) (int, error) {
if existing, ok := p.mappings[key]; ok {
log.Printf("Port %d already allocated for key %s", existing, key)
return existing, nil
}
allocated, err := p.pool.NextFreePort()
Expand Down