From 1ae683f35751e76250ec7906b0f91881675d34dc Mon Sep 17 00:00:00 2001 From: ajssmith Date: Fri, 17 Oct 2025 15:54:07 -0400 Subject: [PATCH 1/2] detect existing svc --- internal/kube/site/site.go | 15 ++++++++-- internal/kube/site/site_test.go | 50 +++++++++++++++++++++++++++++++++ internal/qdr/port_mapping.go | 1 - 3 files changed, 62 insertions(+), 4 deletions(-) diff --git a/internal/kube/site/site.go b/internal/kube/site/site.go index 7daa75caf..c8da7c025 100644 --- a/internal/kube/site/site.go +++ b/internal/kube/site/site.go @@ -866,11 +866,10 @@ func (s *Site) CheckConnector(name string, connector *skupperv2alpha1.Connector) func (s *Site) updateListenerStatus(listener *skupperv2alpha1.Listener, err error) error { if listener.SetConfigured(err) { - updated, err := s.clients.GetSkupperClient().SkupperV2alpha1().Listeners(listener.ObjectMeta.Namespace).UpdateStatus(context.TODO(), listener, metav1.UpdateOptions{}) + _, err := s.clients.GetSkupperClient().SkupperV2alpha1().Listeners(listener.ObjectMeta.Namespace).UpdateStatus(context.TODO(), listener, metav1.UpdateOptions{}) if err != nil { return err } - s.bindings.UpdateListener(updated.Name, updated) } return nil } @@ -906,13 +905,23 @@ func (s *Site) CheckListenerService(svc *corev1.Service) error { } func (s *Site) CheckListener(name string, listener *skupperv2alpha1.Listener) error { - update, err1 := s.bindings.UpdateListener(name, listener) if s.site == nil { if listener == nil { return nil } return s.updateListenerStatus(listener, stderrors.New("No active site in namespace")) } + if listener != nil { + ctxt := context.TODO() + current, err := s.clients.GetKubeClient().CoreV1().Services(s.namespace).Get(ctxt, listener.Spec.Host, metav1.GetOptions{}) + if current != nil && err == nil { + if !isOwned(current) { + return s.updateListenerStatus(listener, fmt.Errorf("Service to expose %s already exists in namespace", listener.Spec.Host)) + } + } + } + + update, err1 := s.bindings.UpdateListener(name, listener) if update == nil { return nil } diff --git a/internal/kube/site/site_test.go b/internal/kube/site/site_test.go index bf1e0f4b5..4e97130df 100644 --- a/internal/kube/site/site_test.go +++ b/internal/kube/site/site_test.go @@ -331,6 +331,56 @@ func TestSite_CheckListener(t *testing.T) { wantErr: false, wantListeners: 1, }, + { + name: "pre-existing service for listener added", + args: args{ + name: "listener1", + listener: &skupperv2alpha1.Listener{ + ObjectMeta: v1.ObjectMeta{ + Name: "listener1", + Namespace: "test", + UID: "8a96ffdf-403b-4e4a-83a8-97d3d459adb6", + }, + Spec: skupperv2alpha1.ListenerSpec{ + RoutingKey: "backend", + Port: 8080, + Type: "tcp", + Host: "backend", + }, + }, + }, + skupperObjects: []runtime.Object{ + &skupperv2alpha1.Listener{ + ObjectMeta: v1.ObjectMeta{ + Name: "listener1", + Namespace: "test", + }, + }, + }, + k8sObjects: []runtime.Object{ + &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "backend", + Namespace: "test", + }, + Status: corev1.ServiceStatus{ + Conditions: []v1.Condition{ + { + Type: "Configured", + Status: "True", + }, + }, + }, + }, + }, + want: "initialized", + wantErr: false, + wantListeners: 0, + }, /* TBD updateListenerStatus if kube command err == nil it just returns and doesn't update s.bindings.UpdateListener ??? { diff --git a/internal/qdr/port_mapping.go b/internal/qdr/port_mapping.go index 85efdcbcd..a5b9efa57 100644 --- a/internal/qdr/port_mapping.go +++ b/internal/qdr/port_mapping.go @@ -14,7 +14,6 @@ type PortMapping struct { func (p *PortMapping) GetPortForKey(key string) (int, error) { if existing, ok := p.mappings[key]; ok { - log.Printf("Port %d already allocated for key %s", existing, key) return existing, nil } allocated, err := p.pool.NextFreePort() From 5297632053fd7e39d211285566caef915694c5cf Mon Sep 17 00:00:00 2001 From: ajssmith Date: Tue, 28 Oct 2025 12:38:35 -0400 Subject: [PATCH 2/2] switch to tracking services --- internal/kube/controller/controller.go | 34 +++++++++++++++++++++-- internal/kube/site/per_target_listener.go | 2 +- internal/kube/site/site.go | 12 ++------ internal/kube/site/site_test.go | 9 ++++-- 4 files changed, 42 insertions(+), 15 deletions(-) diff --git a/internal/kube/controller/controller.go b/internal/kube/controller/controller.go index ce06283bd..327e4f52f 100644 --- a/internal/kube/controller/controller.go +++ b/internal/kube/controller/controller.go @@ -40,6 +40,7 @@ type Controller struct { connectorWatcher *watchers.ConnectorWatcher linkAccessWatcher *watchers.RouterAccessWatcher grantWatcher *watchers.AccessGrantWatcher + serviceWatcher *watchers.ServiceWatcher sites map[string]*site.Site startGrantServer func() accessMgr *securedaccess.SecuredAccessManager @@ -52,6 +53,7 @@ type Controller struct { attachableConnectors map[string]*skupperv2alpha1.AttachedConnector log *slog.Logger namespaces *NamespaceConfig + observedServices map[string]string } func skupperRouterConfig() internalinterfaces.TweakListOptionsFunc { @@ -71,6 +73,12 @@ func listenerServices() internalinterfaces.TweakListOptionsFunc { } } +func sansSkupperListenerServices() internalinterfaces.TweakListOptionsFunc { + return func(options *metav1.ListOptions) { + options.LabelSelector = "!internal.skupper.io/listener" + } +} + func skupperSiteSizingConfig() internalinterfaces.TweakListOptionsFunc { return func(options *metav1.ListOptions) { options.LabelSelector = sizing.SiteSizingLabel @@ -91,6 +99,7 @@ func NewController(cli internalclient.Clients, config *Config) (*Controller, err labelling: labels.NewLabelsAndAnnotations(config.Namespace), attachableConnectors: map[string]*skupperv2alpha1.AttachedConnector{}, log: slog.New(slog.Default().Handler()).With(slog.String("component", "kube.controller")), + observedServices: map[string]string{}, } hostname := os.Getenv("HOSTNAME") @@ -120,6 +129,7 @@ func NewController(cli internalclient.Clients, config *Config) (*Controller, err controller.siteWatcher = controller.eventProcessor.WatchSites(config.WatchNamespace, filter(controller, controller.checkSite)) controller.listenerWatcher = controller.eventProcessor.WatchListeners(config.WatchNamespace, filter(controller, controller.checkListener)) controller.eventProcessor.WatchServices(listenerServices(), config.WatchNamespace, filter(controller, controller.checkListenerService)) + controller.serviceWatcher = controller.eventProcessor.WatchServices(sansSkupperListenerServices(), config.WatchNamespace, filter(controller, controller.checkObservedService)) controller.connectorWatcher = controller.eventProcessor.WatchConnectors(config.WatchNamespace, filter(controller, controller.checkConnector)) controller.linkAccessWatcher = controller.eventProcessor.WatchRouterAccesses(config.WatchNamespace, filter(controller, controller.checkRouterAccess)) controller.eventProcessor.WatchAttachedConnectors(config.WatchNamespace, filter(controller, controller.checkAttachedConnector)) @@ -220,6 +230,10 @@ func (c *Controller) init(stopCh <-chan struct{}) error { ) c.labelling.Update(config.Namespace+"/"+config.Name, config) } + // get observed services prior to restoring listeners + for _, svc := range c.serviceWatcher.List() { + c.observedServices[svc.Namespace+"/"+svc.ObjectMeta.Name] = svc.ObjectMeta.Name + } //recover existing sites & bindings siteRecovery := site.NewSiteRecovery(c.eventProcessor.GetKubeClient()) for _, site := range c.siteWatcher.List() { @@ -266,7 +280,8 @@ func (c *Controller) init(stopCh <-chan struct{}) error { slog.String("name", listener.Name), slog.String("namespace", listener.Namespace), ) - site.CheckListener(listener.ObjectMeta.Name, listener) + _, svcExists := c.observedServices[listener.ObjectMeta.Namespace+"/"+listener.Spec.Host] + site.CheckListener(listener.ObjectMeta.Name, listener, svcExists) } for _, la := range c.linkAccessWatcher.List() { if !c.namespaces.isControlled(la.Namespace) { @@ -367,7 +382,11 @@ func (c *Controller) checkListener(key string, listener *skupperv2alpha1.Listene if err != nil { return err } - return c.getSite(namespace).CheckListener(name, listener) + svcExists := false + if listener != nil { + _, svcExists = c.observedServices[namespace+"/"+listener.Spec.Host] + } + return c.getSite(namespace).CheckListener(name, listener, svcExists) } func (c *Controller) checkListenerService(key string, svc *corev1.Service) error { @@ -378,6 +397,17 @@ func (c *Controller) checkListenerService(key string, svc *corev1.Service) error return c.getSite(svc.Namespace).CheckListenerService(svc) } +func (c *Controller) checkObservedService(key string, svc *corev1.Service) error { + c.log.Debug("checkObservedService", slog.String("key", key)) + + if svc == nil { + delete(c.observedServices, key) + } else { + c.observedServices[key] = svc.ObjectMeta.Name + } + return nil +} + func (c *Controller) checkLink(key string, linkconfig *skupperv2alpha1.Link) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { diff --git a/internal/kube/site/per_target_listener.go b/internal/kube/site/per_target_listener.go index fce46120e..af6c0212a 100644 --- a/internal/kube/site/per_target_listener.go +++ b/internal/kube/site/per_target_listener.go @@ -31,7 +31,7 @@ func (p *PerTargetListener) updateListener(l *skupperv2alpha1.Listener) bool { } func (p *PerTargetListener) extractTargets(network []skupperv2alpha1.SiteRecord, mapping *qdr.PortMapping, exposedPorts ExposedPorts, context BindingContext) (bool, error) { - p.logger.Info("Extracting targets for listener", + p.logger.Debug("Extracting targets for listener", slog.String("namespace", p.definition.Namespace), slog.String("listener", p.definition.Name)) targets := extractTargets(p.address(""), network) diff --git a/internal/kube/site/site.go b/internal/kube/site/site.go index c8da7c025..f82f77fe6 100644 --- a/internal/kube/site/site.go +++ b/internal/kube/site/site.go @@ -904,21 +904,15 @@ func (s *Site) CheckListenerService(svc *corev1.Service) error { return nil } -func (s *Site) CheckListener(name string, listener *skupperv2alpha1.Listener) error { +func (s *Site) CheckListener(name string, listener *skupperv2alpha1.Listener, svcExists bool) error { if s.site == nil { if listener == nil { return nil } return s.updateListenerStatus(listener, stderrors.New("No active site in namespace")) } - if listener != nil { - ctxt := context.TODO() - current, err := s.clients.GetKubeClient().CoreV1().Services(s.namespace).Get(ctxt, listener.Spec.Host, metav1.GetOptions{}) - if current != nil && err == nil { - if !isOwned(current) { - return s.updateListenerStatus(listener, fmt.Errorf("Service to expose %s already exists in namespace", listener.Spec.Host)) - } - } + if listener != nil && svcExists { + return s.updateListenerStatus(listener, fmt.Errorf("Service %s already exists in namespace", listener.Spec.Host)) } update, err1 := s.bindings.UpdateListener(name, listener) diff --git a/internal/kube/site/site_test.go b/internal/kube/site/site_test.go index 4e97130df..7852985d5 100644 --- a/internal/kube/site/site_test.go +++ b/internal/kube/site/site_test.go @@ -281,8 +281,9 @@ func TestSite_ExposeUnexpose(t *testing.T) { } func TestSite_CheckListener(t *testing.T) { type args struct { - name string - listener *skupperv2alpha1.Listener + name string + listener *skupperv2alpha1.Listener + svcExists bool } tests := []struct { name string @@ -318,6 +319,7 @@ func TestSite_CheckListener(t *testing.T) { Host: "1.2.3.4", }, }, + svcExists: false, }, skupperObjects: []runtime.Object{ &skupperv2alpha1.Listener{ @@ -348,6 +350,7 @@ func TestSite_CheckListener(t *testing.T) { Host: "backend", }, }, + svcExists: true, }, skupperObjects: []runtime.Object{ &skupperv2alpha1.Listener{ @@ -419,7 +422,7 @@ func TestSite_CheckListener(t *testing.T) { assert.Assert(t, err) } - if err := s.CheckListener(tt.args.name, tt.args.listener); (err != nil) != tt.wantErr { + if err := s.CheckListener(tt.args.name, tt.args.listener, tt.args.svcExists); (err != nil) != tt.wantErr { t.Errorf("Site.CheckListener() error = %v", err) }