diff --git a/calico-vpp-agent/services/service_handler.go b/calico-vpp-agent/services/service_handler.go index 1ade65fd..dc0971c6 100644 --- a/calico-vpp-agent/services/service_handler.go +++ b/calico-vpp-agent/services/service_handler.go @@ -84,7 +84,7 @@ func getCnatVipDstPort(servicePort *v1.ServicePort, isNodePort bool) uint16 { return uint16(servicePort.Port) } -func (s *Server) buildCnatEntryForServicePort(servicePort *v1.ServicePort, service *v1.Service, epslices []*discoveryv1.EndpointSlice, serviceIP net.IP, isNodePort bool, svcInfo serviceInfo, isLocalOnly bool) *types.CnatTranslateEntry { +func (s *Server) buildCnatEntryForServicePort(servicePort *v1.ServicePort, epslices []*discoveryv1.EndpointSlice, serviceIP net.IP, isNodePort bool, svcInfo serviceInfo, isLocalOnly bool) *types.CnatTranslateEntry { backends := make([]types.CnatEndpointTuple, 0) // Find the endpoint subset port that exposes the port we're interested in for _, epslice := range epslices { @@ -167,7 +167,7 @@ func (s *Server) GetLocalService(service *v1.Service, epSlicesMap map[string]*di for _, servicePort := range service.Spec.Ports { for _, cip := range clusterIPs { if !cip.IsUnspecified() && len(cip) > 0 { - entry := s.buildCnatEntryForServicePort(&servicePort, service, epSlices, cip, false /* isNodePort */, *serviceSpec, InternalIsLocalOnly(service)) + entry := s.buildCnatEntryForServicePort(&servicePort, epSlices, cip, false /* isNodePort */, *serviceSpec, InternalIsLocalOnly(service)) localService.Entries = append(localService.Entries, *entry) } } @@ -175,7 +175,7 @@ func (s *Server) GetLocalService(service *v1.Service, epSlicesMap map[string]*di for _, eip := range service.Spec.ExternalIPs { extIP := net.ParseIP(eip) if !extIP.IsUnspecified() && len(extIP) > 0 { - entry := s.buildCnatEntryForServicePort(&servicePort, service, epSlices, extIP, false /* isNodePort */, *serviceSpec, ExternalIsLocalOnly(service)) + entry := s.buildCnatEntryForServicePort(&servicePort, epSlices, extIP, false /* isNodePort */, *serviceSpec, ExternalIsLocalOnly(service)) localService.Entries = append(localService.Entries, *entry) if ExternalIsLocalOnly(service) && len(entry.Backends) > 0 { localService.SpecificRoutes = append(localService.SpecificRoutes, extIP) @@ -186,7 +186,7 @@ func (s *Server) GetLocalService(service *v1.Service, epSlicesMap map[string]*di for _, ingress := range service.Status.LoadBalancer.Ingress { ingressIP := net.ParseIP(ingress.IP) if !ingressIP.IsUnspecified() && len(ingressIP) > 0 { - entry := s.buildCnatEntryForServicePort(&servicePort, service, epSlices, ingressIP, false /* isNodePort */, *serviceSpec, ExternalIsLocalOnly(service)) + entry := s.buildCnatEntryForServicePort(&servicePort, epSlices, ingressIP, false /* isNodePort */, *serviceSpec, ExternalIsLocalOnly(service)) localService.Entries = append(localService.Entries, *entry) if ExternalIsLocalOnly(service) && len(entry.Backends) > 0 { localService.SpecificRoutes = append(localService.SpecificRoutes, ingressIP) @@ -197,7 +197,7 @@ func (s *Server) GetLocalService(service *v1.Service, epSlicesMap map[string]*di if service.Spec.Type == v1.ServiceTypeNodePort { for _, nip := range nodeIPs { if !nip.IsUnspecified() && len(nip) > 0 { - entry := s.buildCnatEntryForServicePort(&servicePort, service, epSlices, nip, true /* isNodePort */, *serviceSpec, false) + entry := s.buildCnatEntryForServicePort(&servicePort, epSlices, nip, true /* isNodePort */, *serviceSpec, false) localService.Entries = append(localService.Entries, *entry) } } @@ -209,7 +209,7 @@ func (s *Server) GetLocalService(service *v1.Service, epSlicesMap map[string]*di if service.Spec.Type == v1.ServiceTypeLoadBalancer && *service.Spec.AllocateLoadBalancerNodePorts { for _, nip := range nodeIPs { if !nip.IsUnspecified() && len(nip) > 0 { - entry := s.buildCnatEntryForServicePort(&servicePort, service, epSlices, nip, true /* isNodePort */, *serviceSpec, false) + entry := s.buildCnatEntryForServicePort(&servicePort, epSlices, nip, true /* isNodePort */, *serviceSpec, false) localService.Entries = append(localService.Entries, *entry) } } @@ -251,65 +251,89 @@ func (s *Server) advertiseSpecificRoute(added []net.IP, deleted []net.IP) { func (s *Server) deleteServiceEntries(entries []types.CnatTranslateEntry, oldService *LocalService) { for _, entry := range entries { - oldServiceState, found := s.serviceStateMap[entry.Key()] - if !found { - s.log.Infof("svc(del) key=%s Cnat entry not found", entry.Key()) - continue - } - s.log.Infof("svc(del) key=%s %s vpp-id=%d", entry.Key(), entry.String(), oldServiceState.VppID) - if oldServiceState.OwnerServiceID != oldService.ServiceID { - s.log.Infof("Cnat entry found but changed owner since") - continue - } - - err := s.vpp.CnatTranslateDel(oldServiceState.VppID) + err := s.vpp.CnatTranslateDel(s.cnatEntriesByService[entry.Key()][oldService.ServiceID].vppID) if err != nil { s.log.Errorf("Cnat entry delete errored %s", err) - continue } - delete(s.serviceStateMap, entry.Key()) + s.log.Infof("svc(del) deleting service %s (entry: %+v) from entry %s cache", oldService.ServiceID, s.cnatEntriesByService[oldService.ServiceID][entry.Key()], entry.Key()) + delete(s.cnatEntriesByService[entry.Key()], oldService.ServiceID) + + //if no more services: delete entry key from map + if len(s.cnatEntriesByService[entry.Key()]) == 0 { + delete(s.cnatEntriesByService, entry.Key()) + } else { + // the entry is still referenced by another service, recreate another service entry randomly + s.log.Warnf("svc(del) entry %s was referenced by multiple services", entry.Key()) + var randomService string + for randomService = range s.cnatEntriesByService[entry.Key()] { + break + } + s.log.Infof("svc(re-add) adding service %s for entry %s", randomService, entry.Key()) + _, err := s.vpp.CnatTranslateAdd(&s.cnatEntriesByService[entry.Key()][randomService].entry) + if err != nil { + s.log.Errorf("svc(add) Error adding translation %s %s", s.cnatEntriesByService[entry.Key()][randomService].entry.String(), err) + continue + } + } } + } func (s *Server) deleteServiceByName(serviceID string) { s.lock.Lock() defer s.lock.Unlock() - - for key, oldServiceState := range s.serviceStateMap { - if oldServiceState.OwnerServiceID != serviceID { - continue - } - err := s.vpp.CnatTranslateDel(oldServiceState.VppID) - if err != nil { - s.log.Errorf("Cnat entry delete errored %s", err) - continue + var deletedEntries []string + for key, serviceEntry := range s.cnatEntriesByService { + if _, found := serviceEntry[serviceID]; found { + err := s.vpp.CnatTranslateDel(serviceEntry[serviceID].vppID) + if err != nil { + s.log.Errorf("Cnat entry delete errored %s", err) + } + s.log.Infof("svc(del) deleting service %s (entry: %+v) from entry %s cache", serviceID, serviceEntry[serviceID].entry, key) + delete(s.cnatEntriesByService[key], serviceID) + //if no more services: delete entry key from map + if len(s.cnatEntriesByService[key]) == 0 { + deletedEntries = append(deletedEntries, key) + } else { + // the entry is still referenced by another service, recreate another service entry randomly + s.log.Warnf("entry %s was referenced by multiple services", key) + var randomService string + for randomService = range s.cnatEntriesByService[key] { + break + } + s.log.Infof("svc(re-add) adding service %s for entry %s", randomService, key) + _, err := s.vpp.CnatTranslateAdd(&s.cnatEntriesByService[key][randomService].entry) + if err != nil { + s.log.Errorf("svc(add) Error adding translation %s %s", s.cnatEntriesByService[key][randomService].entry.String(), err) + continue + } + } } - delete(s.serviceStateMap, key) } -} - -func (s *Server) sameServiceEntries(entries []types.CnatTranslateEntry, service *LocalService) { - for _, entry := range entries { - if serviceState, found := s.serviceStateMap[entry.Key()]; found { - serviceState.OwnerServiceID = service.ServiceID - s.serviceStateMap[entry.Key()] = serviceState - } else { - s.log.Warnf("Cnat entry not found key=%s", entry.Key()) - } + for _, deletedEntry := range deletedEntries { + s.log.Infof("svc(del) deleting entry %s from cache", deletedEntry) + delete(s.cnatEntriesByService, deletedEntry) } } func (s *Server) addServiceEntries(entries []types.CnatTranslateEntry, service *LocalService) { for _, entry := range entries { + if _, found := s.cnatEntriesByService[entry.Key()]; !found { + s.log.Infof("svc(add) adding entry key=%s to cache", entry.Key()) + s.cnatEntriesByService[entry.Key()] = make(map[string]*cnatEntry) + } + s.log.Infof("svc(add) adding service %s to entry key=%s cache", service.ServiceID, entry.Key()) entryID, err := s.vpp.CnatTranslateAdd(&entry) if err != nil { s.log.Errorf("svc(add) Error adding translation %s %s", entry.String(), err) continue } - s.log.Infof("svc(add) key=%s %s vpp-id=%d", entry.Key(), entry.String(), entryID) - s.serviceStateMap[entry.Key()] = ServiceState{ - OwnerServiceID: service.ServiceID, - VppID: entryID, + s.cnatEntriesByService[entry.Key()][service.ServiceID] = &cnatEntry{ + entry: entry, + vppID: entryID, + } + if len(s.cnatEntriesByService[entry.Key()]) > 1 { + s.log.Warnf("svc(add) entry %s is referenced by multiple services; overriding previous value and using the latest", entry.Key()) } } } diff --git a/calico-vpp-agent/services/service_server.go b/calico-vpp-agent/services/service_server.go index 0f89521b..d1869fbe 100644 --- a/calico-vpp-agent/services/service_server.go +++ b/calico-vpp-agent/services/service_server.go @@ -53,11 +53,11 @@ type LocalService struct { } /** - * Store VPP's state in a map [CnatTranslateEntry.Key()]->ServiceState + * Store VPP's state in a map [CnatTranslateEntry.Key()]->map[serviceId]->cnatEntry */ -type ServiceState struct { - OwnerServiceID string /* serviceID(service.ObjectMeta) of the service that created this entry */ - VppID uint32 /* cnat translation ID in VPP */ +type cnatEntry struct { + entry types.CnatTranslateEntry + vppID uint32 } type Server struct { @@ -74,7 +74,8 @@ type Server struct { BGPConf *calicov3.BGPConfigurationSpec nodeBGPSpec *common.LocalNodeSpec - serviceStateMap map[string]ServiceState + // map entry key to a map of service id to entry in vpp + cnatEntriesByService map[string]map[string]*cnatEntry // cache of all endpoint slices, by service name endpointSlicesByService map[string]map[string]*discoveryv1.EndpointSlice endpointSlices map[string]*discoveryv1.EndpointSlice @@ -156,7 +157,7 @@ func NewServiceServer(vpp *vpplink.VppLink, k8sclient *kubernetes.Clientset, log server := Server{ vpp: vpp, log: log, - serviceStateMap: make(map[string]ServiceState), + cnatEntriesByService: make(map[string]map[string]*cnatEntry), endpointSlicesByService: make(map[string]map[string]*discoveryv1.EndpointSlice), endpointSlices: make(map[string]*discoveryv1.EndpointSlice), } @@ -404,7 +405,7 @@ func (s *Server) getServiceFromStore(key string) *v1.Service { * who should be deleted (first) and then re-added. It supports update * when the entries can be updated with the add call */ -func compareEntryLists(service *LocalService, oldService *LocalService) (added, same, deleted []types.CnatTranslateEntry, changed bool) { +func compareEntryLists(service *LocalService, oldService *LocalService) (added, deleted []types.CnatTranslateEntry, changed bool) { if service == nil && oldService == nil { } else if service == nil { deleted = oldService.Entries @@ -426,8 +427,6 @@ func compareEntryLists(service *LocalService, oldService *LocalService) (added, deleted = append(deleted, oldService) } else if newService.Equal(&oldService) == types.ShouldRecreateObj { deleted = append(deleted, oldService) - } else { - same = append(same, oldService) } } for _, newService := range service.Entries { @@ -464,9 +463,8 @@ func compareSpecificRoutes(service *LocalService, oldService *LocalService) (add } func (s *Server) handleServiceEndpointEvent(service *LocalService, oldService *LocalService) { - if added, same, deleted, changed := compareEntryLists(service, oldService); changed { + if added, deleted, changed := compareEntryLists(service, oldService); changed { s.deleteServiceEntries(deleted, oldService) - s.sameServiceEntries(same, service) s.addServiceEntries(added, service) } if added, deleted, changed := compareSpecificRoutes(service, oldService); changed {