Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion calico-vpp-agent/cmd/calico_vpp_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func main() {
*/
routeWatcher := watchers.NewRouteWatcher(log.WithFields(logrus.Fields{"subcomponent": "host-route-watcher"}))
linkWatcher := watchers.NewLinkWatcher(common.VppManagerInfo.UplinkStatuses, log.WithFields(logrus.Fields{"subcomponent": "host-link-watcher"}))
bgpConfigurationWatcher := watchers.NewBGPConfigurationWatcher(clientv3, log.WithFields(logrus.Fields{"subcomponent": "bgp-conf-watch"}))
prefixWatcher := watchers.NewPrefixWatcher(client, log.WithFields(logrus.Fields{"subcomponent": "prefix-watcher"}))
peerWatcher := watchers.NewPeerWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "peer-watcher"}))
bgpFilterWatcher := watchers.NewBGPFilterWatcher(clientv3, k8sclient, log.WithFields(logrus.Fields{"subcomponent": "BGPFilter-watcher"}))
Expand All @@ -143,6 +142,7 @@ func main() {
prometheusServer := prometheus.NewPrometheusServer(vpp, log.WithFields(logrus.Fields{"component": "prometheus"}))
localSIDWatcher := watchers.NewLocalSIDWatcher(vpp, clientv3, log.WithFields(logrus.Fields{"subcomponent": "localsid-watcher"}))
felixServer := felix.NewFelixServer(vpp, clientv3, log.WithFields(logrus.Fields{"component": "policy"}))
bgpConfigurationWatcher := watchers.NewBGPConfigurationWatcher(clientv3, log.WithFields(logrus.Fields{"subcomponent": "bgp-conf-watch"}), felixServer.HandleBGPConfigurationChange)
felixWatcher := watchers.NewFelixWatcher(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "felix watcher"}))
cniServer := watchers.NewCNIServer(felixServer.GetFelixServerEventChan(), log.WithFields(logrus.Fields{"component": "cni"}))
serviceServer := watchers.NewServiceServer(felixServer.GetFelixServerEventChan(), k8sclient, log.WithFields(logrus.Fields{"component": "services"}))
Expand Down
1 change: 0 additions & 1 deletion calico-vpp-agent/common/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ const (

PeerNodeStateChanged CalicoVppEventType = "PeerNodeStateChanged"
IpamConfChanged CalicoVppEventType = "IpamConfChanged"
BGPConfChanged CalicoVppEventType = "BGPConfChanged"

ConnectivityAdded CalicoVppEventType = "ConnectivityAdded"
ConnectivityDeleted CalicoVppEventType = "ConnectivityDeleted"
Expand Down
11 changes: 8 additions & 3 deletions calico-vpp-agent/felix/felix_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ func (s *Server) SetBGPConf(bgpConf *calicov3.BGPConfigurationSpec) {
s.cache.BGPConf = bgpConf
}

// HandleBGPConfigurationChange is called when the BGPConfiguration changes.
// Handling of BGPConfiguration updates is not yet implemented, instead,
// we log and trigger a restart to ensure the system reloads configuration.
func (s *Server) HandleBGPConfigurationChange() error {
s.log.Error("BGPConf updated")
return errors.Errorf("BGPConf updated, restarting")
}

func (s *Server) getMainInterface() *config.UplinkStatus {
for _, i := range common.VppManagerInfo.UplinkStatuses {
if i.IsMain {
Expand Down Expand Up @@ -294,9 +302,6 @@ func (s *Server) handleFelixServerEvents(msg interface{}) (err error) {
s.log.Debugf("Ignoring NamespaceRemove")
case *proto.GlobalBGPConfigUpdate:
s.log.Infof("Got GlobalBGPConfigUpdate")
common.SendEvent(common.CalicoVppEvent{
Type: common.BGPConfChanged,
})
case *proto.WireguardEndpointUpdate:
err = s.connectivityHandler.OnWireguardEndpointUpdate(evt)
case *proto.WireguardEndpointRemove:
Expand Down
120 changes: 90 additions & 30 deletions calico-vpp-agent/watchers/bgp_configuration_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,31 @@ import (
calicov3cli "github.com/projectcalico/calico/libcalico-go/lib/clientv3"
calicoerr "github.com/projectcalico/calico/libcalico-go/lib/errors"
"github.com/projectcalico/calico/libcalico-go/lib/options"
"github.com/projectcalico/calico/libcalico-go/lib/watch"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"gopkg.in/tomb.v2"

"github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common"
"github.com/projectcalico/vpp-dataplane/v3/config"
)

type BGPConfigurationWatcher struct {
log *logrus.Entry
clientv3 calicov3cli.Interface
BGPConfigurationWatcherEventChan chan any
BGPConf *calicov3.BGPConfigurationSpec
log *logrus.Entry
clientv3 calicov3cli.Interface
BGPConf *calicov3.BGPConfigurationSpec
// Watch interface for monitoring BGP configuration changes
watcher watch.Interface
currentWatchRevision string
// Callback function to handle BGP configuration changes
onBGPConfigChanged func() error
}

func NewBGPConfigurationWatcher(clientv3 calicov3cli.Interface, log *logrus.Entry) *BGPConfigurationWatcher {
func NewBGPConfigurationWatcher(clientv3 calicov3cli.Interface, log *logrus.Entry, configChangeHandler func() error) *BGPConfigurationWatcher {
w := BGPConfigurationWatcher{
log: log,
clientv3: clientv3,
BGPConfigurationWatcherEventChan: make(chan any, common.ChanSize),
log: log,
clientv3: clientv3,
onBGPConfigChanged: configChangeHandler,
}
reg := common.RegisterHandler(w.BGPConfigurationWatcherEventChan, "BGP Config watcher events")
reg.ExpectEvents(common.BGPConfChanged)
return &w
}

Expand Down Expand Up @@ -126,31 +128,89 @@ func (w *BGPConfigurationWatcher) getDefaultBGPConfig() (*calicov3.BGPConfigurat
}
}

// WatchBGPConfiguration watches for changes in BGP configuration using Calico API
func (w *BGPConfigurationWatcher) WatchBGPConfiguration(t *tomb.Tomb) error {
w.log.Info("BGP configuration watcher started")
for t.Alive() {
select {
case <-t.Dying():
w.log.Warn("BGPConf watcher stopped")
return nil
case msg := <-w.BGPConfigurationWatcherEventChan:
evt, ok := msg.(common.CalicoVppEvent)
if !ok {
continue
}
switch evt.Type {
case common.BGPConfChanged:
oldBGPConf := w.BGPConf
newBGPConf, err := w.GetBGPConf()
if err != nil {
return errors.Wrap(err, "error getting BGP configuration")
w.currentWatchRevision = ""
err := w.resyncAndCreateWatcher()
if err != nil {
w.log.WithError(err).Error("Failed to create BGP configuration watcher")
goto restart
}
for {
select {
case <-t.Dying():
w.log.Info("BGP configuration watcher asked to stop")
w.cleanExistingWatcher()
return nil
case event, ok := <-w.watcher.ResultChan():
if !ok {
w.log.Debug("BGP configuration watcher closed, restarting...")
goto restart
}
if !reflect.DeepEqual(newBGPConf, oldBGPConf) {
w.log.Error("BGPConf updated")
return errors.Errorf("BGPConf updated, restarting")
w.currentWatchRevision = event.Object.(*calicov3.BGPConfiguration).GetResourceVersion()
switch event.Type {
case watch.Error:
w.log.Debug("BGP configuration watch returned error, restarting...")
goto restart
case watch.Added, watch.Modified:
w.handleBGPConfigurationUpdate()
case watch.Deleted:
w.log.Debug("BGP configuration deleted, using defaults")
w.handleBGPConfigurationUpdate()
}
default:
}
}
restart:
w.cleanExistingWatcher()
w.log.Debug("Restarting BGP configuration watcher...")
}
return nil
}

// resyncAndCreateWatcher creates a new watcher for BGP configurations
func (w *BGPConfigurationWatcher) resyncAndCreateWatcher() error {
w.cleanExistingWatcher()

opts := options.ListOptions{
ResourceVersion: w.currentWatchRevision,
}

watcher, err := w.clientv3.BGPConfigurations().Watch(context.Background(), opts)
if err != nil {
return errors.Wrap(err, "failed to create BGP configuration watcher")
}
w.watcher = watcher
return nil
}

// cleanExistingWatcher closes the existing watcher if it exists
func (w *BGPConfigurationWatcher) cleanExistingWatcher() {
if w.watcher != nil {
w.watcher.Stop()
w.watcher = nil
}
}

// handleBGPConfigurationUpdate handles BGP configuration update events
func (w *BGPConfigurationWatcher) handleBGPConfigurationUpdate() {
if w.onBGPConfigChanged == nil {
w.log.Debug("No BGP configuration change handler set")
return
}

oldConf := w.BGPConf
newConf, err := w.GetBGPConf()
if err != nil {
w.log.WithError(err).Error("Failed to get updated BGP configuration")
return
}

// Only call the callback if the config actually changed
if !reflect.DeepEqual(oldConf, newConf) {
if err := w.onBGPConfigChanged(); err != nil {
w.log.WithError(err).Error("BGP configuration change handler failed")
}
}
}