Skip to content

Commit d231e16

Browse files
author
Aritra Basu
committed
refactor: decouple routingServer into BGPWatcher, BGPHandler and routingHandler
The refactoring splits the monolithic routingServer into three focused components: i) BGPWatcher - observes GoBGP RIB state ii) BGPHandler - handles BGP protocol business logic iii) RoutingHandler - handles route installation business logic There is a clear separation of monitoring (BGPWatcher) and business logic: i) GoBGP programming via the BGPHandler ii) Linux Kernel programming via the routingHandler iii) VPP programming via the connectivityHandler Signed-off-by: Aritra Basu <[email protected]>
1 parent eb5adeb commit d231e16

File tree

12 files changed

+1269
-1164
lines changed

12 files changed

+1269
-1164
lines changed

calico-vpp-agent/cmd/calico_vpp_dataplane.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
3838
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix"
3939
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/prometheus"
40-
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/routing"
4140
"github.com/projectcalico/vpp-dataplane/v3/config"
4241

4342
watchdog "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watch_dog"
@@ -138,10 +137,10 @@ func main() {
138137
prefixWatcher := watchers.NewPrefixWatcher(client, log.WithFields(logrus.Fields{"subcomponent": "prefix-watcher"}))
139138
bgpFilterWatcher := watchers.NewBGPFilterWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "BGPFilter-watcher"}))
140139
netWatcher := watchers.NewNetWatcher(vpp, log.WithFields(logrus.Fields{"component": "net-watcher"}))
141-
routingServer := routing.NewRoutingServer(vpp, bgpServer, log.WithFields(logrus.Fields{"component": "routing"}))
142140
prometheusServer := prometheus.NewPrometheusServer(vpp, log.WithFields(logrus.Fields{"component": "prometheus"}))
143141
localSIDWatcher := watchers.NewLocalSIDWatcher(vpp, clientv3, log.WithFields(logrus.Fields{"subcomponent": "localsid-watcher"}))
144142
felixServer := felix.NewFelixServer(vpp, clientv3, log.WithFields(logrus.Fields{"component": "policy"}))
143+
felixServer.SetBGPServer(bgpServer)
145144
felixWatcher := watchers.NewFelixWatcher(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "felix watcher"}))
146145
cniServer := watchers.NewCNIServer(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "cni"}))
147146
serviceServer := watchers.NewServiceServer(felixServer.GetFelixServerEventChan(), k8sclient, log.WithFields(logrus.Fields{"component": "services"}))
@@ -158,15 +157,14 @@ func main() {
158157
log.Fatalf("cannot get default BGP config %s", err)
159158
}
160159

161-
routingServer.SetBGPConf(bgpConf)
162160
felixServer.SetBGPConf(bgpConf)
163161

162+
bgpWatcher := watchers.NewBGPWatcher(felixServer.GetCache(), log.WithFields(logrus.Fields{"component": "bgp-watcher"}))
163+
bgpWatcher.SetBGPServer(bgpServer)
164+
bgpWatcher.SetBGPHandler(felixServer.GetBGPHandler())
165+
164166
secretWatcher := watchers.NewSecretWatcher(k8sclient, log.WithFields(logrus.Fields{"component": "secret-watcher"}))
165-
// Set secret watcher in peer handler
166167
felixServer.GetPeerHandler().SetSecretWatcher(secretWatcher)
167-
// Set peer handler in routing server
168-
routingServer.SetPeerHandler(felixServer.GetPeerHandler())
169-
// Create peer watcher
170168
peerWatcher := watchers.NewPeerWatcher(clientv3, felixServer.GetPeerHandler(), secretWatcher, log.WithFields(logrus.Fields{"component": "peer-watcher"}))
171169

172170
watchDog := watchdog.NewWatchDog(log.WithFields(logrus.Fields{"component": "watchDog"}), &t)
@@ -184,7 +182,8 @@ func main() {
184182
panic("ourBGPSpec is not *common.LocalNodeSpec")
185183
}
186184
prefixWatcher.SetOurBGPSpec(bgpSpec)
187-
routingServer.SetOurBGPSpec(bgpSpec)
185+
bgpWatcher.SetOurBGPSpec(bgpSpec)
186+
felixServer.GetRoutingHandler().SetOurBGPSpec(bgpSpec)
188187
localSIDWatcher.SetOurBGPSpec(bgpSpec)
189188
netWatcher.SetOurBGPSpec(bgpSpec)
190189
}
@@ -200,7 +199,8 @@ func main() {
200199
Go(prefixWatcher.WatchPrefix)
201200
Go(peerWatcher.WatchBGPPeers)
202201
Go(bgpFilterWatcher.WatchBGPFilters)
203-
Go(routingServer.ServeRouting)
202+
Go(bgpWatcher.WatchBGPPath)
203+
Go(felixServer.GetRoutingHandler().ServeRoutingHandler)
204204
Go(serviceServer.ServeService)
205205
Go(cniServer.ServeCNI)
206206
Go(prometheusServer.ServePrometheus)

calico-vpp-agent/felix/felix_server.go

Lines changed: 115 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"fmt"
2020
"net"
2121

22+
bgpapi "github.com/osrg/gobgp/v3/api"
23+
bgpserver "github.com/osrg/gobgp/v3/pkg/server"
2224
"github.com/pkg/errors"
2325
calicov3cli "github.com/projectcalico/calico/libcalico-go/lib/clientv3"
2426
"github.com/sirupsen/logrus"
@@ -33,8 +35,8 @@ import (
3335
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/cni/model"
3436
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/connectivity"
3537
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/policies"
38+
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/routing"
3639
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix/services"
37-
"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/routing"
3840
"github.com/projectcalico/vpp-dataplane/v3/config"
3941
"github.com/projectcalico/vpp-dataplane/v3/vpplink"
4042
"github.com/projectcalico/vpp-dataplane/v3/vpplink/types"
@@ -61,6 +63,8 @@ type Server struct {
6163
connectivityHandler *connectivity.ConnectivityHandler
6264
serviceHandler *services.ServiceHandler
6365
peerHandler *routing.PeerHandler
66+
routingHandler *routing.RoutingHandler
67+
bgpHandler *routing.BGPHandler
6468
}
6569

6670
// NewFelixServer creates a felix server
@@ -80,6 +84,8 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l
8084
connectivityHandler: connectivity.NewConnectivityHandler(vpp, cache, clientv3, log),
8185
serviceHandler: services.NewServiceHandler(vpp, cache, log),
8286
peerHandler: routing.NewPeerHandler(cache, log),
87+
routingHandler: routing.NewRoutingHandler(vpp, cache, log),
88+
bgpHandler: routing.NewBGPHandler(log),
8389
}
8490

8591
reg := common.RegisterHandler(server.felixServerEventChan, "felix server events")
@@ -90,10 +96,21 @@ func NewFelixServer(vpp *vpplink.VppLink, clientv3 calicov3cli.Interface, log *l
9096
common.TunnelDeleted,
9197
common.NetAddedOrUpdated,
9298
common.NetDeleted,
99+
common.BGPPathAdded,
100+
common.BGPPathDeleted,
101+
common.BGPPeerAdded,
102+
common.BGPPeerUpdated,
103+
common.BGPPeerDeleted,
104+
common.BGPFilterAddedOrUpdated,
105+
common.BGPFilterDeleted,
106+
common.BGPDefinedSetAdded,
107+
common.BGPDefinedSetDeleted,
93108
common.ConnectivityAdded,
94109
common.ConnectivityDeleted,
95110
common.SRv6PolicyAdded,
96111
common.SRv6PolicyDeleted,
112+
common.LocalPodAddressAdded,
113+
common.LocalPodAddressDeleted,
97114
)
98115

99116
return server
@@ -111,10 +128,26 @@ func (s *Server) GetPeerHandler() *routing.PeerHandler {
111128
return s.peerHandler
112129
}
113130

131+
func (s *Server) GetRoutingHandler() *routing.RoutingHandler {
132+
return s.routingHandler
133+
}
134+
135+
func (s *Server) GetConnectivityHandler() *connectivity.ConnectivityHandler {
136+
return s.connectivityHandler
137+
}
138+
139+
func (s *Server) GetBGPHandler() *routing.BGPHandler {
140+
return s.bgpHandler
141+
}
142+
114143
func (s *Server) SetBGPConf(bgpConf *calicov3.BGPConfigurationSpec) {
115144
s.cache.BGPConf = bgpConf
116145
}
117146

147+
func (s *Server) SetBGPServer(bgpServer *bgpserver.BgpServer) {
148+
s.bgpHandler.SetBGPServer(bgpServer)
149+
}
150+
118151
func (s *Server) getMainInterface() *config.UplinkStatus {
119152
for _, i := range common.VppManagerInfo.UplinkStatuses {
120153
if i.IsMain {
@@ -375,42 +408,100 @@ func (s *Server) handleFelixServerEvents(msg interface{}) (err error) {
375408
return fmt.Errorf("evt.Old not a uint32 %v", evt.Old)
376409
}
377410
s.policiesHandler.OnTunnelDelete(swIfIndex)
378-
case common.ConnectivityAdded:
379-
new, ok := evt.New.(*common.NodeConnectivity)
411+
case common.BGPPathAdded:
412+
path, ok := evt.New.(*bgpapi.Path)
380413
if !ok {
381-
s.log.Errorf("evt.New is not a *common.NodeConnectivity %v", evt.New)
414+
return fmt.Errorf("evt.New is not a (*bgpapi.Path) %v", evt.New)
382415
}
383-
err := s.connectivityHandler.UpdateIPConnectivity(new, false /* isWithdraw */)
384-
if err != nil {
385-
s.log.Errorf("Error while adding connectivity %s", err)
416+
err = s.bgpHandler.HandleBGPPathAdded(path)
417+
case common.BGPPathDeleted:
418+
path, ok := evt.Old.(*bgpapi.Path)
419+
if !ok {
420+
return fmt.Errorf("evt.Old is not a (*bgpapi.Path) %v", evt.Old)
386421
}
387-
case common.ConnectivityDeleted:
388-
old, ok := evt.Old.(*common.NodeConnectivity)
422+
err = s.bgpHandler.HandleBGPPathDeleted(path)
423+
case common.BGPPeerAdded:
424+
peer, ok := evt.New.(*routing.LocalBGPPeer)
389425
if !ok {
390-
s.log.Errorf("evt.Old is not a *common.NodeConnectivity %v", evt.Old)
426+
return fmt.Errorf("evt.New is not a (*routing.LocalBGPPeer) %v", evt.New)
391427
}
392-
err := s.connectivityHandler.UpdateIPConnectivity(old, true /* isWithdraw */)
393-
if err != nil {
394-
s.log.Errorf("Error while deleting connectivity %s", err)
428+
err = s.bgpHandler.HandleBGPPeerAdded(peer)
429+
case common.BGPPeerUpdated:
430+
newPeer, ok := evt.New.(*routing.LocalBGPPeer)
431+
if !ok {
432+
return fmt.Errorf("evt.New is not a (*routing.LocalBGPPeer) %v", evt.New)
395433
}
396-
case common.SRv6PolicyAdded:
397-
new, ok := evt.New.(*common.NodeConnectivity)
434+
oldPeer, ok := evt.Old.(*routing.LocalBGPPeer)
398435
if !ok {
399-
s.log.Errorf("evt.New is not a *common.NodeConnectivity %v", evt.New)
436+
return fmt.Errorf("evt.Old is not a (*routing.LocalBGPPeer) %v", evt.Old)
400437
}
401-
err := s.connectivityHandler.UpdateSRv6Policy(new, false /* isWithdraw */)
402-
if err != nil {
403-
s.log.Errorf("Error while adding SRv6 Policy %s", err)
438+
err = s.bgpHandler.HandleBGPPeerUpdated(newPeer, oldPeer)
439+
case common.BGPPeerDeleted:
440+
peerIP, ok := evt.Old.(string)
441+
if !ok {
442+
return fmt.Errorf("evt.Old is not a string %v", evt.Old)
443+
}
444+
err = s.bgpHandler.HandleBGPPeerDeleted(peerIP)
445+
case common.BGPFilterAddedOrUpdated:
446+
filter, ok := evt.New.(calicov3.BGPFilter)
447+
if !ok {
448+
return fmt.Errorf("evt.New is not a (calicov3.BGPFilter) %v", evt.New)
404449
}
450+
err = s.bgpHandler.HandleBGPFilterAddedOrUpdated(filter)
451+
case common.BGPFilterDeleted:
452+
filter, ok := evt.Old.(calicov3.BGPFilter)
453+
if !ok {
454+
return fmt.Errorf("evt.Old is not a (calicov3.BGPFilter) %v", evt.Old)
455+
}
456+
err = s.bgpHandler.HandleBGPFilterDeleted(filter)
457+
case common.BGPDefinedSetAdded:
458+
definedSet, ok := evt.New.(*bgpapi.DefinedSet)
459+
if !ok {
460+
return fmt.Errorf("evt.New is not a (*bgpapi.DefinedSet) %v", evt.New)
461+
}
462+
err = s.bgpHandler.HandleBGPDefinedSetAdded(definedSet)
463+
case common.BGPDefinedSetDeleted:
464+
definedSet, ok := evt.Old.(*bgpapi.DefinedSet)
465+
if !ok {
466+
return fmt.Errorf("evt.Old is not a (*bgpapi.DefinedSet) %v", evt.Old)
467+
}
468+
err = s.bgpHandler.HandleBGPDefinedSetDeleted(definedSet)
469+
case common.ConnectivityAdded:
470+
connectivity, ok := evt.New.(*common.NodeConnectivity)
471+
if !ok {
472+
return fmt.Errorf("evt.New is not a (*common.NodeConnectivity) %v", evt.New)
473+
}
474+
err = s.connectivityHandler.UpdateIPConnectivity(connectivity, false /* isWithdraw */)
475+
case common.ConnectivityDeleted:
476+
connectivity, ok := evt.Old.(*common.NodeConnectivity)
477+
if !ok {
478+
return fmt.Errorf("evt.Old is not a (*common.NodeConnectivity) %v", evt.Old)
479+
}
480+
err = s.connectivityHandler.UpdateIPConnectivity(connectivity, true /* isWithdraw */)
481+
case common.SRv6PolicyAdded:
482+
connectivity, ok := evt.New.(*common.NodeConnectivity)
483+
if !ok {
484+
return fmt.Errorf("evt.New is not a (*common.NodeConnectivity) %v", evt.New)
485+
}
486+
err = s.connectivityHandler.UpdateSRv6Policy(connectivity, false /* isWithdraw */)
405487
case common.SRv6PolicyDeleted:
406-
old, ok := evt.Old.(*common.NodeConnectivity)
488+
connectivity, ok := evt.Old.(*common.NodeConnectivity)
407489
if !ok {
408-
s.log.Errorf("evt.Old is not a *common.NodeConnectivity %v", evt.Old)
490+
return fmt.Errorf("evt.Old is not a (*common.NodeConnectivity) %v", evt.Old)
409491
}
410-
err := s.connectivityHandler.UpdateSRv6Policy(old, true /* isWithdraw */)
411-
if err != nil {
412-
s.log.Errorf("Error while deleting SRv6 Policy %s", err)
492+
err = s.connectivityHandler.UpdateSRv6Policy(connectivity, true /* isWithdraw */)
493+
case common.LocalPodAddressAdded:
494+
networkPod, ok := evt.New.(cni.NetworkPod)
495+
if !ok {
496+
return fmt.Errorf("evt.New is not a (cni.NetworkPod) %v", evt.New)
497+
}
498+
err = s.routingHandler.AnnounceLocalAddress(networkPod.ContainerIP, networkPod.NetworkVni)
499+
case common.LocalPodAddressDeleted:
500+
networkPod, ok := evt.Old.(cni.NetworkPod)
501+
if !ok {
502+
return fmt.Errorf("evt.Old is not a (cni.NetworkPod) %v", evt.Old)
413503
}
504+
err = s.routingHandler.WithdrawLocalAddress(networkPod.ContainerIP, networkPod.NetworkVni)
414505
default:
415506
s.log.Warnf("Unhandled CalicoVppEvent.Type: %s", evt.Type)
416507
}

0 commit comments

Comments
 (0)