@@ -788,25 +788,7 @@ func (e *EgressIPController) addPodEgressIPAssignments(ni util.NetInfo, name str
788788 if len (statusAssignments ) == 0 {
789789 return nil
790790 }
791- // We need to proceed with add only under two conditions
792- // 1) egressNode present in at least one status is local to this zone
793- // (NOTE: The relation between egressIPName and nodeName is 1:1 i.e in the same object the given node will be present only in one status)
794- // 2) the pod being added is local to this zone
795- proceed := false
796- for _ , status := range statusAssignments {
797- e .nodeZoneState .LockKey (status .Node )
798- isLocalZoneEgressNode , loadedEgressNode := e .nodeZoneState .Load (status .Node )
799- if loadedEgressNode && isLocalZoneEgressNode {
800- proceed = true
801- e .nodeZoneState .UnlockKey (status .Node )
802- break
803- }
804- e .nodeZoneState .UnlockKey (status .Node )
805- }
806- if ! proceed && ! e .isPodScheduledinLocalZone (pod ) {
807- return nil // nothing to do if none of the status nodes are local to this master and pod is also remote
808- }
809- var remainingAssignments []egressipv1.EgressIPStatusItem
791+ var remainingAssignments , staleAssignments []egressipv1.EgressIPStatusItem
810792 nadName := ni .GetNetworkName ()
811793 if ni .IsUserDefinedNetwork () {
812794 nadNames := ni .GetNADs ()
@@ -836,15 +818,21 @@ func (e *EgressIPController) addPodEgressIPAssignments(ni util.NetInfo, name str
836818 podIPs : podIPs ,
837819 network : ni ,
838820 }
839- e .podAssignment .Store (podKey , podState )
840821 } else if podState .egressIPName == name || podState .egressIPName == "" {
841822 // We do the setup only if this egressIP object is the one serving this pod OR
842823 // podState.egressIPName can be empty if no re-routes were found in
843824 // syncPodAssignmentCache for the existing pod, we will treat this case as a new add
844825 for _ , status := range statusAssignments {
845- if exists := podState .egressStatuses .contains (status ); ! exists {
826+ // Add the status if it's not already in the cache, or if it exists but is in pending state
827+ // (meaning it was populated during EIP sync and needs to be processed for the pod).
828+ if value , exists := podState .egressStatuses .statusMap [status ]; ! exists || value == egressStatusStatePending {
846829 remainingAssignments = append (remainingAssignments , status )
847830 }
831+ // Detect stale EIP status entries (same EgressIP reassigned to a different node)
832+ // and queue the outdated entry for cleanup.
833+ if staleStatus := podState .egressStatuses .hasStaleEIPStatus (status ); staleStatus != nil {
834+ staleAssignments = append (staleAssignments , * staleStatus )
835+ }
848836 }
849837 podState .podIPs = podIPs
850838 podState .egressIPName = name
@@ -866,6 +854,36 @@ func (e *EgressIPController) addPodEgressIPAssignments(ni util.NetInfo, name str
866854 podState .standbyEgressIPNames .Insert (name )
867855 return nil
868856 }
857+ for _ , staleStatus := range staleAssignments {
858+ klog .V (2 ).Infof ("Deleting stale pod egress IP status: %v for EgressIP: %s and pod: %s/%s/%v" , staleStatus , name , pod .Namespace , pod .Name , podIPNets )
859+ err = e .deletePodEgressIPAssignments (ni , name , []egressipv1.EgressIPStatusItem {staleStatus }, pod )
860+ if err != nil {
861+ klog .Warningf ("Failed to delete stale EgressIP status %s/%v for pod %s: %v" , name , staleStatus , podKey , err )
862+ }
863+ delete (podState .egressStatuses .statusMap , staleStatus )
864+ }
865+ // We store podState into podAssignment cache at this place for two reasons.
866+ // 1. When podAssignmentState is newly created.
867+ // 2. deletePodEgressIPAssignments might clean the podAssignment cache, make sure we add it back.
868+ e .podAssignment .Store (podKey , podState )
869+ // We need to proceed with add only under two conditions
870+ // 1) egressNode present in at least one status is local to this zone
871+ // (NOTE: The relation between egressIPName and nodeName is 1:1 i.e in the same object the given node will be present only in one status)
872+ // 2) the pod being added is local to this zone
873+ proceed := false
874+ for _ , status := range statusAssignments {
875+ e .nodeZoneState .LockKey (status .Node )
876+ isLocalZoneEgressNode , loadedEgressNode := e .nodeZoneState .Load (status .Node )
877+ if loadedEgressNode && isLocalZoneEgressNode {
878+ proceed = true
879+ e .nodeZoneState .UnlockKey (status .Node )
880+ break
881+ }
882+ e .nodeZoneState .UnlockKey (status .Node )
883+ }
884+ if ! proceed && ! e .isPodScheduledinLocalZone (pod ) {
885+ return nil // nothing to do if none of the status nodes are local to this master and pod is also remote
886+ }
869887 for _ , status := range remainingAssignments {
870888 klog .V (2 ).Infof ("Adding pod egress IP status: %v for EgressIP: %s and pod: %s/%s/%v" , status , name , pod .Namespace , pod .Name , podIPNets )
871889 nodesToLock := []string {status .Node , pod .Spec .NodeName }
@@ -1155,6 +1173,8 @@ type egressIPCache struct {
11551173 egressLocalNodesCache sets.Set [string ]
11561174 // egressIP IP -> assigned node name
11571175 egressIPIPToNodeCache map [string ]string
1176+ // egressIP name -> egress IP -> assigned node name
1177+ egressIPToAssignedNodes map [string ]map [string ]string
11581178 // node name -> network name -> redirect IPs
11591179 egressNodeRedirectsCache nodeNetworkRedirects
11601180 // network name -> OVN cluster router name
@@ -1594,6 +1614,14 @@ func (e *EgressIPController) syncPodAssignmentCache(egressIPCache egressIPCache)
15941614 }
15951615 }
15961616
1617+ // populate podState.egressStatuses with assigned node for active egressIP IPs.
1618+ if podState .egressIPName == egressIPName {
1619+ for egressIPIP , nodeName := range egressIPCache .egressIPToAssignedNodes [egressIPName ] {
1620+ podState .egressStatuses .statusMap [egressipv1.EgressIPStatusItem {
1621+ EgressIP : egressIPIP , Node : nodeName }] = egressStatusStatePending
1622+ }
1623+ }
1624+
15971625 e .podAssignment .Store (podKey , podState )
15981626 return nil
15991627 }); err != nil {
@@ -1611,6 +1639,21 @@ func (e *EgressIPController) syncPodAssignmentCache(egressIPCache egressIPCache)
16111639// It also removes stale nexthops from router policies used by EgressIPs.
16121640// Upon failure, it may be invoked multiple times in order to avoid a pod restart.
16131641func (e * EgressIPController ) syncStaleEgressReroutePolicy (cache egressIPCache ) error {
1642+ // limit Nodes only to egress node(s) for the EgressIP name
1643+ limitToValidEgressNodes := func (eipName string , nodeRedirectCache map [string ]redirectIPs ) map [string ]redirectIPs {
1644+ filteredEgressNodesRedirectsCache := make (map [string ]redirectIPs , 0 )
1645+ egressNodeNames , ok := cache .egressIPNameToAssignedNodes [eipName ]
1646+ if ! ok {
1647+ return filteredEgressNodesRedirectsCache
1648+ }
1649+ for _ , egressNode := range egressNodeNames {
1650+ if nodeRedirect , ok := nodeRedirectCache [egressNode ]; ok {
1651+ filteredEgressNodesRedirectsCache [egressNode ] = nodeRedirect
1652+ }
1653+ }
1654+ return filteredEgressNodesRedirectsCache
1655+ }
1656+
16141657 for eipName , networkCache := range cache .egressIPNameToPods {
16151658 for networkName , data := range networkCache {
16161659 logicalRouterPolicyStaleNexthops := []* nbdb.LogicalRouterPolicy {}
@@ -1619,11 +1662,6 @@ func (e *EgressIPController) syncStaleEgressReroutePolicy(cache egressIPCache) e
16191662 if item .Priority != types .EgressIPReroutePriority || item .ExternalIDs [libovsdbops .NetworkKey .String ()] != networkName {
16201663 return false
16211664 }
1622- networkNodeRedirectCache , ok := cache .egressNodeRedirectsCache .cache [networkName ]
1623- if ! ok || len (networkNodeRedirectCache ) == 0 {
1624- klog .Infof ("syncStaleEgressReroutePolicy found invalid logical router policy (UUID: %s) because no assigned Nodes for EgressIP %s" , item .UUID , eipName )
1625- return true
1626- }
16271665 extractedEgressIPName , _ := getEIPLRPObjK8MetaData (item .ExternalIDs )
16281666 if extractedEgressIPName == "" {
16291667 klog .Errorf ("syncStaleEgressReroutePolicy found logical router policy (UUID: %s) with invalid meta data associated with network %s" , item .UUID , networkName )
@@ -1634,6 +1672,11 @@ func (e *EgressIPController) syncStaleEgressReroutePolicy(cache egressIPCache) e
16341672 _ , ok := cache .egressIPNameToPods [extractedEgressIPName ]
16351673 return ! ok
16361674 }
1675+ networkNodeRedirectCache := limitToValidEgressNodes (eipName , cache .egressNodeRedirectsCache .cache [networkName ])
1676+ if len (networkNodeRedirectCache ) == 0 {
1677+ klog .Infof ("syncStaleEgressReroutePolicy deleting invalid logical router policy %q because there are no existing nodes assigned to its EgressIP %s" , item .UUID , eipName )
1678+ return true
1679+ }
16371680 splitMatch := strings .Split (item .Match , " " )
16381681 podIPStr := splitMatch [len (splitMatch )- 1 ]
16391682 podIP := net .ParseIP (podIPStr )
@@ -1689,13 +1732,13 @@ func (e *EgressIPController) syncStaleEgressReroutePolicy(cache egressIPCache) e
16891732 // Update Logical Router Policies that have stale nexthops. Notice that we must do this separately
16901733 // because logicalRouterPolicyStaleNexthops must be populated first
16911734 for _ , staleNextHopLogicalRouterPolicy := range logicalRouterPolicyStaleNexthops {
1692- if staleNextHopLogicalRouterPolicy .Nexthop == nil {
1693- continue
1694- }
1695- klog .Infof ("syncStaleEgressReroutePolicy will remove stale nexthops for LRP %q for network %s: %s" ,
1696- staleNextHopLogicalRouterPolicy .UUID , networkName , * staleNextHopLogicalRouterPolicy .Nexthop )
1735+ klog .Infof ("syncStaleEgressReroutePolicy will remove stale nexthops for LRP %q for network %s: %v" ,
1736+ staleNextHopLogicalRouterPolicy .UUID , networkName , staleNextHopLogicalRouterPolicy .Nexthops )
1737+ }
1738+ // nothing to do if there's no stale next hops
1739+ if len (logicalRouterPolicyStaleNexthops ) == 0 {
1740+ continue
16971741 }
1698-
16991742 err = libovsdbops .DeleteNextHopsFromLogicalRouterPolicies (e .nbClient , cache .networkToRouter [networkName ], logicalRouterPolicyStaleNexthops ... )
17001743 if err != nil {
17011744 return fmt .Errorf ("unable to remove stale next hops from logical router policies for network %s: %v" , networkName , err )
@@ -1874,28 +1917,37 @@ func (e *EgressIPController) generateCacheForEgressIP() (egressIPCache, error) {
18741917 r := redirectIPs {}
18751918 mgmtPort := & nbdb.LogicalSwitchPort {Name : ni .GetNetworkScopedK8sMgmtIntfName (node .Name )}
18761919 mgmtPort , err := libovsdbops .GetLogicalSwitchPort (e .nbClient , mgmtPort )
1877- if err != nil {
1878- // if switch port isnt created, we can assume theres nothing to sync
1879- if errors .Is (err , libovsdbclient .ErrNotFound ) {
1880- continue
1881- }
1920+ // return if error is anything other than not found to allow retry
1921+ if err != nil && ! errors .Is (err , libovsdbclient .ErrNotFound ) {
18821922 return cache , fmt .Errorf ("failed to find management port for node %s: %v" , node .Name , err )
18831923 }
1884- mgmtPortAddresses := mgmtPort .GetAddresses ()
1885- if len (mgmtPortAddresses ) == 0 {
1886- return cache , fmt .Errorf ("management switch port %s for node %s does not contain any addresses" , ni .GetNetworkScopedK8sMgmtIntfName (node .Name ), node .Name )
1887- }
1888- // assuming only one IP per IP family
1889- for _ , mgmtPortAddress := range mgmtPortAddresses {
1890- mgmtPortAddressesStr := strings .Fields (mgmtPortAddress )
1891- mgmtPortIP := net .ParseIP (mgmtPortAddressesStr [1 ])
1892- if utilnet .IsIPv6 (mgmtPortIP ) {
1893- if ip := mgmtPortIP .To16 (); ip != nil {
1894- r .v6MgtPort = ip .String ()
1924+ // if management port is available, gather the data. If it's not available, OVN constructs that depend on a deleted
1925+ // management port IP will fail and be cleaned up in sync LRPs func.
1926+ if mgmtPort != nil {
1927+ mgmtPortAddresses := mgmtPort .GetAddresses ()
1928+ if len (mgmtPortAddresses ) == 0 {
1929+ return cache , fmt .Errorf ("management switch port %s for node %s does not contain any addresses" , ni .GetNetworkScopedK8sMgmtIntfName (node .Name ), node .Name )
1930+ }
1931+ // Extract at most one IP per family; entries are "MAC IP [IP ...]"
1932+ for _ , macPlusIPs := range mgmtPortAddresses {
1933+ parts := strings .Fields (macPlusIPs )
1934+ if len (parts ) < 2 {
1935+ continue // no IPs
18951936 }
1896- } else {
1897- if ip := mgmtPortIP .To4 (); ip != nil {
1898- r .v4MgtPort = ip .String ()
1937+ for _ , ipStr := range parts [1 :] {
1938+ ip := net .ParseIP (ipStr )
1939+ if ip == nil {
1940+ continue
1941+ }
1942+ if utilnet .IsIPv6 (ip ) {
1943+ if r .v6MgtPort == "" && ip .To16 () != nil {
1944+ r .v6MgtPort = ip .String ()
1945+ }
1946+ } else {
1947+ if r .v4MgtPort == "" && ip .To4 () != nil {
1948+ r .v4MgtPort = ip .String ()
1949+ }
1950+ }
18991951 }
19001952 }
19011953 }
@@ -1951,6 +2003,9 @@ func (e *EgressIPController) generateCacheForEgressIP() (egressIPCache, error) {
19512003 // egressIP IP -> node name. Assigned node for EIP.
19522004 egressIPIPNodeCache := make (map [string ]string , 0 )
19532005 cache .egressIPIPToNodeCache = egressIPIPNodeCache
2006+ // egressIP name -> egressIP IP -> node name.
2007+ egressIPToAssignedNodes := make (map [string ]map [string ]string , 0 )
2008+ cache .egressIPToAssignedNodes = egressIPToAssignedNodes
19542009 cache .markCache = make (map [string ]string )
19552010 egressIPs , err := e .watchFactory .GetEgressIPs ()
19562011 if err != nil {
@@ -1964,13 +2019,15 @@ func (e *EgressIPController) generateCacheForEgressIP() (egressIPCache, error) {
19642019 cache .markCache [egressIP .Name ] = mark .String ()
19652020 egressIPsCache [egressIP .Name ] = make (map [string ]selectedPods , 0 )
19662021 egressIPNameNodesCache [egressIP .Name ] = make ([]string , 0 , len (egressIP .Status .Items ))
2022+ egressIPToAssignedNodes [egressIP .Name ] = make (map [string ]string , 0 )
19672023 for _ , status := range egressIP .Status .Items {
19682024 eipIP := net .ParseIP (status .EgressIP )
19692025 if eipIP == nil {
19702026 klog .Errorf ("Failed to parse EgressIP %s IP %q from status" , egressIP .Name , status .EgressIP )
19712027 continue
19722028 }
19732029 egressIPIPNodeCache [eipIP .String ()] = status .Node
2030+ egressIPToAssignedNodes [egressIP .Name ][eipIP .String ()] = status .Node
19742031 if localZoneNodes .Has (status .Node ) {
19752032 egressLocalNodesCache .Insert (status .Node )
19762033 }
@@ -2227,9 +2284,18 @@ func InitClusterEgressPolicies(nbClient libovsdbclient.Client, addressSetFactory
22272284 return nil
22282285}
22292286
2287+ // egressStatusStatePending marks entries populated during EIP sync and
2288+ // indicates they must be reconciled again for the pod.
2289+ const egressStatusStatePending = "pending"
2290+
22302291type statusMap map [egressipv1.EgressIPStatusItem ]string
22312292
22322293type egressStatuses struct {
2294+ // statusMap tracks per EIP status assignment for a pod.
2295+ // Key: egressipv1.EgressIPStatusItem {EgressIP, Node}
2296+ // Values:
2297+ // "" -> applied/reconciled
2298+ // egressStatusStatePending -> populated during EIP sync, pending reconcile.
22332299 statusMap
22342300}
22352301
@@ -2241,6 +2307,21 @@ func (e egressStatuses) contains(potentialStatus egressipv1.EgressIPStatusItem)
22412307 return false
22422308}
22432309
2310+ // hasStaleEIPStatus checks for stale EIP status entries already in cache.
2311+ // This addresses the race condition where an EIP is reassigned to a different node
2312+ // but the cache still contains the old assignment, leading to stale SNAT/LRP entries.
2313+ func (e egressStatuses ) hasStaleEIPStatus (potentialStatus egressipv1.EgressIPStatusItem ) * egressipv1.EgressIPStatusItem {
2314+ var staleStatus * egressipv1.EgressIPStatusItem
2315+ for status := range e .statusMap {
2316+ if status .EgressIP == potentialStatus .EgressIP &&
2317+ status .Node != potentialStatus .Node {
2318+ staleStatus = & egressipv1.EgressIPStatusItem {EgressIP : status .EgressIP , Node : status .Node }
2319+ break
2320+ }
2321+ }
2322+ return staleStatus
2323+ }
2324+
22442325func (e egressStatuses ) delete (deleteStatus egressipv1.EgressIPStatusItem ) {
22452326 delete (e .statusMap , deleteStatus )
22462327}
0 commit comments