Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 68 additions & 44 deletions calico-vpp-agent/services/service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func getCnatVipDstPort(servicePort *v1.ServicePort, isNodePort bool) uint16 {
return uint16(servicePort.Port)
}

func (s *Server) buildCnatEntryForServicePort(servicePort *v1.ServicePort, service *v1.Service, epslices []*discoveryv1.EndpointSlice, serviceIP net.IP, isNodePort bool, svcInfo serviceInfo, isLocalOnly bool) *types.CnatTranslateEntry {
func (s *Server) buildCnatEntryForServicePort(servicePort *v1.ServicePort, epslices []*discoveryv1.EndpointSlice, serviceIP net.IP, isNodePort bool, svcInfo serviceInfo, isLocalOnly bool) *types.CnatTranslateEntry {
backends := make([]types.CnatEndpointTuple, 0)
// Find the endpoint subset port that exposes the port we're interested in
for _, epslice := range epslices {
Expand Down Expand Up @@ -167,15 +167,15 @@ func (s *Server) GetLocalService(service *v1.Service, epSlicesMap map[string]*di
for _, servicePort := range service.Spec.Ports {
for _, cip := range clusterIPs {
if !cip.IsUnspecified() && len(cip) > 0 {
entry := s.buildCnatEntryForServicePort(&servicePort, service, epSlices, cip, false /* isNodePort */, *serviceSpec, InternalIsLocalOnly(service))
entry := s.buildCnatEntryForServicePort(&servicePort, epSlices, cip, false /* isNodePort */, *serviceSpec, InternalIsLocalOnly(service))
localService.Entries = append(localService.Entries, *entry)
}
}

for _, eip := range service.Spec.ExternalIPs {
extIP := net.ParseIP(eip)
if !extIP.IsUnspecified() && len(extIP) > 0 {
entry := s.buildCnatEntryForServicePort(&servicePort, service, epSlices, extIP, false /* isNodePort */, *serviceSpec, ExternalIsLocalOnly(service))
entry := s.buildCnatEntryForServicePort(&servicePort, epSlices, extIP, false /* isNodePort */, *serviceSpec, ExternalIsLocalOnly(service))
localService.Entries = append(localService.Entries, *entry)
if ExternalIsLocalOnly(service) && len(entry.Backends) > 0 {
localService.SpecificRoutes = append(localService.SpecificRoutes, extIP)
Expand All @@ -186,7 +186,7 @@ func (s *Server) GetLocalService(service *v1.Service, epSlicesMap map[string]*di
for _, ingress := range service.Status.LoadBalancer.Ingress {
ingressIP := net.ParseIP(ingress.IP)
if !ingressIP.IsUnspecified() && len(ingressIP) > 0 {
entry := s.buildCnatEntryForServicePort(&servicePort, service, epSlices, ingressIP, false /* isNodePort */, *serviceSpec, ExternalIsLocalOnly(service))
entry := s.buildCnatEntryForServicePort(&servicePort, epSlices, ingressIP, false /* isNodePort */, *serviceSpec, ExternalIsLocalOnly(service))
localService.Entries = append(localService.Entries, *entry)
if ExternalIsLocalOnly(service) && len(entry.Backends) > 0 {
localService.SpecificRoutes = append(localService.SpecificRoutes, ingressIP)
Expand All @@ -197,7 +197,7 @@ func (s *Server) GetLocalService(service *v1.Service, epSlicesMap map[string]*di
if service.Spec.Type == v1.ServiceTypeNodePort {
for _, nip := range nodeIPs {
if !nip.IsUnspecified() && len(nip) > 0 {
entry := s.buildCnatEntryForServicePort(&servicePort, service, epSlices, nip, true /* isNodePort */, *serviceSpec, false)
entry := s.buildCnatEntryForServicePort(&servicePort, epSlices, nip, true /* isNodePort */, *serviceSpec, false)
localService.Entries = append(localService.Entries, *entry)
}
}
Expand All @@ -209,7 +209,7 @@ func (s *Server) GetLocalService(service *v1.Service, epSlicesMap map[string]*di
if service.Spec.Type == v1.ServiceTypeLoadBalancer && *service.Spec.AllocateLoadBalancerNodePorts {
for _, nip := range nodeIPs {
if !nip.IsUnspecified() && len(nip) > 0 {
entry := s.buildCnatEntryForServicePort(&servicePort, service, epSlices, nip, true /* isNodePort */, *serviceSpec, false)
entry := s.buildCnatEntryForServicePort(&servicePort, epSlices, nip, true /* isNodePort */, *serviceSpec, false)
localService.Entries = append(localService.Entries, *entry)
}
}
Expand Down Expand Up @@ -251,65 +251,89 @@ func (s *Server) advertiseSpecificRoute(added []net.IP, deleted []net.IP) {

func (s *Server) deleteServiceEntries(entries []types.CnatTranslateEntry, oldService *LocalService) {
for _, entry := range entries {
oldServiceState, found := s.serviceStateMap[entry.Key()]
if !found {
s.log.Infof("svc(del) key=%s Cnat entry not found", entry.Key())
continue
}
s.log.Infof("svc(del) key=%s %s vpp-id=%d", entry.Key(), entry.String(), oldServiceState.VppID)
if oldServiceState.OwnerServiceID != oldService.ServiceID {
s.log.Infof("Cnat entry found but changed owner since")
continue
}

err := s.vpp.CnatTranslateDel(oldServiceState.VppID)
err := s.vpp.CnatTranslateDel(s.cnatEntriesByService[entry.Key()][oldService.ServiceID].vppID)
if err != nil {
s.log.Errorf("Cnat entry delete errored %s", err)
continue
}
delete(s.serviceStateMap, entry.Key())
s.log.Infof("svc(del) deleting service %s (entry: %+v) from entry %s cache", oldService.ServiceID, s.cnatEntriesByService[oldService.ServiceID][entry.Key()], entry.Key())
delete(s.cnatEntriesByService[entry.Key()], oldService.ServiceID)

//if no more services: delete entry key from map
if len(s.cnatEntriesByService[entry.Key()]) == 0 {
delete(s.cnatEntriesByService, entry.Key())
} else {
// the entry is still referenced by another service, recreate another service entry randomly
s.log.Warnf("svc(del) entry %s was referenced by multiple services", entry.Key())
var randomService string
for randomService = range s.cnatEntriesByService[entry.Key()] {
break
}
s.log.Infof("svc(re-add) adding service %s for entry %s", randomService, entry.Key())
_, err := s.vpp.CnatTranslateAdd(&s.cnatEntriesByService[entry.Key()][randomService].entry)
if err != nil {
s.log.Errorf("svc(add) Error adding translation %s %s", s.cnatEntriesByService[entry.Key()][randomService].entry.String(), err)
continue
}
}
}

}

func (s *Server) deleteServiceByName(serviceID string) {
s.lock.Lock()
defer s.lock.Unlock()

for key, oldServiceState := range s.serviceStateMap {
if oldServiceState.OwnerServiceID != serviceID {
continue
}
err := s.vpp.CnatTranslateDel(oldServiceState.VppID)
if err != nil {
s.log.Errorf("Cnat entry delete errored %s", err)
continue
var deletedEntries []string
for key, serviceEntry := range s.cnatEntriesByService {
if _, found := serviceEntry[serviceID]; found {
err := s.vpp.CnatTranslateDel(serviceEntry[serviceID].vppID)
if err != nil {
s.log.Errorf("Cnat entry delete errored %s", err)
}
s.log.Infof("svc(del) deleting service %s (entry: %+v) from entry %s cache", serviceID, serviceEntry[serviceID].entry, key)
delete(s.cnatEntriesByService[key], serviceID)
//if no more services: delete entry key from map
if len(s.cnatEntriesByService[key]) == 0 {
deletedEntries = append(deletedEntries, key)
} else {
// the entry is still referenced by another service, recreate another service entry randomly
s.log.Warnf("entry %s was referenced by multiple services", key)
var randomService string
for randomService = range s.cnatEntriesByService[key] {
break
}
s.log.Infof("svc(re-add) adding service %s for entry %s", randomService, key)
_, err := s.vpp.CnatTranslateAdd(&s.cnatEntriesByService[key][randomService].entry)
if err != nil {
s.log.Errorf("svc(add) Error adding translation %s %s", s.cnatEntriesByService[key][randomService].entry.String(), err)
continue
}
}
}
delete(s.serviceStateMap, key)
}
}

func (s *Server) sameServiceEntries(entries []types.CnatTranslateEntry, service *LocalService) {
for _, entry := range entries {
if serviceState, found := s.serviceStateMap[entry.Key()]; found {
serviceState.OwnerServiceID = service.ServiceID
s.serviceStateMap[entry.Key()] = serviceState
} else {
s.log.Warnf("Cnat entry not found key=%s", entry.Key())
}
for _, deletedEntry := range deletedEntries {
s.log.Infof("svc(del) deleting entry %s from cache", deletedEntry)
delete(s.cnatEntriesByService, deletedEntry)
}
}

func (s *Server) addServiceEntries(entries []types.CnatTranslateEntry, service *LocalService) {
for _, entry := range entries {
if _, found := s.cnatEntriesByService[entry.Key()]; !found {
s.log.Infof("svc(add) adding entry key=%s to cache", entry.Key())
s.cnatEntriesByService[entry.Key()] = make(map[string]*cnatEntry)
}
s.log.Infof("svc(add) adding service %s to entry key=%s cache", service.ServiceID, entry.Key())
entryID, err := s.vpp.CnatTranslateAdd(&entry)
if err != nil {
s.log.Errorf("svc(add) Error adding translation %s %s", entry.String(), err)
continue
}
s.log.Infof("svc(add) key=%s %s vpp-id=%d", entry.Key(), entry.String(), entryID)
s.serviceStateMap[entry.Key()] = ServiceState{
OwnerServiceID: service.ServiceID,
VppID: entryID,
s.cnatEntriesByService[entry.Key()][service.ServiceID] = &cnatEntry{
entry: entry,
vppID: entryID,
}
if len(s.cnatEntriesByService[entry.Key()]) > 1 {
s.log.Warnf("svc(add) entry %s is referenced by multiple services; overriding previous value and using the latest", entry.Key())
}
}
}
20 changes: 9 additions & 11 deletions calico-vpp-agent/services/service_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ type LocalService struct {
}

/**
* Store VPP's state in a map [CnatTranslateEntry.Key()]->ServiceState
* Store VPP's state in a map [CnatTranslateEntry.Key()]->map[serviceId]->cnatEntry
*/
type ServiceState struct {
OwnerServiceID string /* serviceID(service.ObjectMeta) of the service that created this entry */
VppID uint32 /* cnat translation ID in VPP */
type cnatEntry struct {
entry types.CnatTranslateEntry
vppID uint32
}

type Server struct {
Expand All @@ -74,7 +74,8 @@ type Server struct {
BGPConf *calicov3.BGPConfigurationSpec
nodeBGPSpec *common.LocalNodeSpec

serviceStateMap map[string]ServiceState
// map entry key to a map of service id to entry in vpp
cnatEntriesByService map[string]map[string]*cnatEntry
// cache of all endpoint slices, by service name
endpointSlicesByService map[string]map[string]*discoveryv1.EndpointSlice
endpointSlices map[string]*discoveryv1.EndpointSlice
Expand Down Expand Up @@ -156,7 +157,7 @@ func NewServiceServer(vpp *vpplink.VppLink, k8sclient *kubernetes.Clientset, log
server := Server{
vpp: vpp,
log: log,
serviceStateMap: make(map[string]ServiceState),
cnatEntriesByService: make(map[string]map[string]*cnatEntry),
endpointSlicesByService: make(map[string]map[string]*discoveryv1.EndpointSlice),
endpointSlices: make(map[string]*discoveryv1.EndpointSlice),
}
Expand Down Expand Up @@ -404,7 +405,7 @@ func (s *Server) getServiceFromStore(key string) *v1.Service {
* who should be deleted (first) and then re-added. It supports update
* when the entries can be updated with the add call
*/
func compareEntryLists(service *LocalService, oldService *LocalService) (added, same, deleted []types.CnatTranslateEntry, changed bool) {
func compareEntryLists(service *LocalService, oldService *LocalService) (added, deleted []types.CnatTranslateEntry, changed bool) {
if service == nil && oldService == nil {
} else if service == nil {
deleted = oldService.Entries
Expand All @@ -426,8 +427,6 @@ func compareEntryLists(service *LocalService, oldService *LocalService) (added,
deleted = append(deleted, oldService)
} else if newService.Equal(&oldService) == types.ShouldRecreateObj {
deleted = append(deleted, oldService)
} else {
same = append(same, oldService)
}
}
for _, newService := range service.Entries {
Expand Down Expand Up @@ -464,9 +463,8 @@ func compareSpecificRoutes(service *LocalService, oldService *LocalService) (add
}

func (s *Server) handleServiceEndpointEvent(service *LocalService, oldService *LocalService) {
if added, same, deleted, changed := compareEntryLists(service, oldService); changed {
if added, deleted, changed := compareEntryLists(service, oldService); changed {
s.deleteServiceEntries(deleted, oldService)
s.sameServiceEntries(same, service)
s.addServiceEntries(added, service)
}
if added, deleted, changed := compareSpecificRoutes(service, oldService); changed {
Expand Down
Loading