Skip to content

Commit 2c22c24

Browse files
author
Aritra Basu
committed
Remove peers_watcher dependency on pubsub
Signed-off-by: Aritra Basu <[email protected]>
1 parent 05e1278 commit 2c22c24

File tree

9 files changed

+624
-450
lines changed

9 files changed

+624
-450
lines changed

calico-vpp-agent/cmd/calico_vpp_dataplane.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ func main() {
136136
linkWatcher := watchers.NewLinkWatcher(common.VppManagerInfo.UplinkStatuses, log.WithFields(logrus.Fields{"subcomponent": "host-link-watcher"}))
137137
bgpConfigurationWatcher := watchers.NewBGPConfigurationWatcher(clientv3, log.WithFields(logrus.Fields{"subcomponent": "bgp-conf-watch"}))
138138
prefixWatcher := watchers.NewPrefixWatcher(client, log.WithFields(logrus.Fields{"subcomponent": "prefix-watcher"}))
139-
peerWatcher := watchers.NewPeerWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "peer-watcher"}))
140139
bgpFilterWatcher := watchers.NewBGPFilterWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "BGPFilter-watcher"}))
141140
netWatcher := watchers.NewNetWatcher(vpp, log.WithFields(logrus.Fields{"component": "net-watcher"}))
142141
routingServer := routing.NewRoutingServer(vpp, bgpServer, log.WithFields(logrus.Fields{"component": "routing"}))
@@ -159,9 +158,13 @@ func main() {
159158
log.Fatalf("cannot get default BGP config %s", err)
160159
}
161160

162-
peerWatcher.SetBGPConf(bgpConf)
161+
peerManager, err := felix.NewPeerManager(clientv3, k8sclient, felixServer, log.WithFields(logrus.Fields{"component": "peer-manager"}))
162+
if err != nil {
163+
log.Fatalf("could not create peer manager: %s", err)
164+
}
165+
163166
routingServer.SetBGPConf(bgpConf)
164-
felixServer.SetBGPConf(bgpConf)
167+
peerManager.SetBGPConf(bgpConf)
165168

166169
watchDog := watchdog.NewWatchDog(log.WithFields(logrus.Fields{"component": "watchDog"}), &t)
167170
Go(felixServer.ServeFelix)
@@ -192,7 +195,7 @@ func main() {
192195
Go(linkWatcher.WatchLinks)
193196
Go(bgpConfigurationWatcher.WatchBGPConfiguration)
194197
Go(prefixWatcher.WatchPrefix)
195-
Go(peerWatcher.WatchBGPPeers)
198+
Go(peerManager.Start)
196199
Go(bgpFilterWatcher.WatchBGPFilters)
197200
Go(routingServer.ServeRouting)
198201
Go(serviceServer.ServeService)

calico-vpp-agent/common/pubsub.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@ type CalicoVppEventType string
2626
const (
2727
ChanSize = 500
2828

29-
PeerNodeStateChanged CalicoVppEventType = "PeerNodeStateChanged"
30-
IpamConfChanged CalicoVppEventType = "IpamConfChanged"
31-
BGPConfChanged CalicoVppEventType = "BGPConfChanged"
29+
IpamConfChanged CalicoVppEventType = "IpamConfChanged"
30+
BGPConfChanged CalicoVppEventType = "BGPConfChanged"
3231

3332
ConnectivityAdded CalicoVppEventType = "ConnectivityAdded"
3433
ConnectivityDeleted CalicoVppEventType = "ConnectivityDeleted"
@@ -45,10 +44,9 @@ const (
4544
TunnelAdded CalicoVppEventType = "TunnelAdded"
4645
TunnelDeleted CalicoVppEventType = "TunnelDeleted"
4746

48-
BGPPeerAdded CalicoVppEventType = "BGPPeerAdded"
49-
BGPPeerDeleted CalicoVppEventType = "BGPPeerDeleted"
50-
BGPPeerUpdated CalicoVppEventType = "BGPPeerUpdated"
51-
BGPSecretChanged CalicoVppEventType = "BGPSecretChanged"
47+
BGPPeerAdded CalicoVppEventType = "BGPPeerAdded"
48+
BGPPeerDeleted CalicoVppEventType = "BGPPeerDeleted"
49+
BGPPeerUpdated CalicoVppEventType = "BGPPeerUpdated"
5250

5351
BGPFilterAddedOrUpdated CalicoVppEventType = "BGPFilterAddedOrUpdated"
5452
BGPFilterDeleted CalicoVppEventType = "BGPFilterDeleted"

calico-vpp-agent/common/types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package common
1717

1818
import (
19+
calicov3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
1920
v1 "k8s.io/api/core/v1"
2021
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2122
)
@@ -70,3 +71,11 @@ type ServiceEndpointsUpdate struct {
7071
type ServiceEndpointsDelete struct {
7172
Meta *metav1.ObjectMeta
7273
}
74+
75+
// BGPPeerState represents the state of a BGP peer
76+
type BGPPeerState struct {
77+
AS uint32
78+
SweepFlag bool
79+
BGPPeerSpec *calicov3.BGPPeerSpec
80+
SecretChanged bool
81+
}

calico-vpp-agent/felix/connectivity/connectivity_handler.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ import (
3232
"github.com/projectcalico/vpp-dataplane/v3/vpplink"
3333
)
3434

35+
// PeerNodeStateChangeHandler defines the interface for handling peer node state changes
36+
type PeerNodeStateChangeHandler interface {
37+
OnPeerNodeStateChanged(old, new *common.LocalNodeSpec)
38+
}
39+
3540
type ConnectivityHandler struct {
3641
log *logrus.Entry
3742
vpp *vpplink.VppLink
@@ -40,6 +45,9 @@ type ConnectivityHandler struct {
4045
providers map[string]ConnectivityProvider
4146
connectivityMap map[string]common.NodeConnectivity
4247
nodeByWGPublicKey map[string]string
48+
49+
// Peer node state change handlers
50+
peerNodeStateChangeHandlers []PeerNodeStateChangeHandler
4351
}
4452

4553
func NewConnectivityHandler(vpp *vpplink.VppLink, cache *cache.Cache, clientv3 calicov3cli.Interface, log *logrus.Entry) *ConnectivityHandler {
@@ -56,10 +64,16 @@ func NewConnectivityHandler(vpp *vpplink.VppLink, cache *cache.Cache, clientv3 c
5664
WIREGUARD: NewWireguardProvider(vpp, clientv3, cache, log),
5765
SRv6: NewSRv6Provider(vpp, clientv3, cache, log),
5866
},
59-
nodeByWGPublicKey: make(map[string]string),
67+
nodeByWGPublicKey: make(map[string]string),
68+
peerNodeStateChangeHandlers: make([]PeerNodeStateChangeHandler, 0),
6069
}
6170
}
6271

72+
// RegisterPeerNodeStateChangeHandler registers a handler for peer node state changes
73+
func (s *ConnectivityHandler) RegisterPeerNodeStateChangeHandler(handler PeerNodeStateChangeHandler) {
74+
s.peerNodeStateChangeHandlers = append(s.peerNodeStateChangeHandlers, handler)
75+
}
76+
6377
type change uint8
6478

6579
const (
@@ -112,11 +126,11 @@ func (s *ConnectivityHandler) OnPeerNodeStateChanged(old, new *common.LocalNodeS
112126
s.cache.NodeByAddr[new.IPv6Address.IP.String()] = *new
113127
}
114128
}
115-
common.SendEvent(common.CalicoVppEvent{
116-
Type: common.PeerNodeStateChanged,
117-
Old: old,
118-
New: new,
119-
})
129+
130+
// Notify all registered peer node state change handlers
131+
for _, handler := range s.peerNodeStateChangeHandlers {
132+
handler.OnPeerNodeStateChanged(old, new)
133+
}
120134
}
121135

122136
func (s *ConnectivityHandler) UpdateSRv6Policy(cn *common.NodeConnectivity, IsWithdraw bool) (err error) {

calico-vpp-agent/felix/felix_server.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"github.com/sirupsen/logrus"
2525
"gopkg.in/tomb.v2"
2626

27-
calicov3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
2827
"github.com/projectcalico/calico/felix/proto"
2928

3029
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
@@ -64,6 +63,7 @@ type Server struct {
6463
// NewFelixServer creates a felix server
6564
func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *logrus.Entry) *Server {
6665
cache := cache.NewCache(log)
66+
6767
server := &Server{
6868
log: log,
6969
vpp: vpp,
@@ -104,8 +104,9 @@ func (s *Server) GetCache() *cache.Cache {
104104
return s.cache
105105
}
106106

107-
func (s *Server) SetBGPConf(bgpConf *calicov3.BGPConfigurationSpec) {
108-
s.cache.BGPConf = bgpConf
107+
// GetConnectivityHandler returns the connectivity handler
108+
func (s *Server) GetConnectivityHandler() *connectivity.ConnectivityHandler {
109+
return s.connectivityHandler
109110
}
110111

111112
func (s *Server) getMainInterface() *config.UplinkStatus {

0 commit comments

Comments
 (0)