From d77c172c60900c952ac967fb113c06231b1e3bef Mon Sep 17 00:00:00 2001 From: Aritra Basu Date: Wed, 10 Sep 2025 21:55:07 -0700 Subject: [PATCH] Remove the prometheus dependancy on pubsub Signed-off-by: Aritra Basu --- calico-vpp-agent/cmd/calico_vpp_dataplane.go | 3 - calico-vpp-agent/felix/felix_server.go | 20 +++ calico-vpp-agent/prometheus/prometheus.go | 117 ++++++++---------- .../prometheus/prometheus_test.go | 19 +-- 4 files changed, 80 insertions(+), 79 deletions(-) diff --git a/calico-vpp-agent/cmd/calico_vpp_dataplane.go b/calico-vpp-agent/cmd/calico_vpp_dataplane.go index c5c4a605..aa6170f7 100644 --- a/calico-vpp-agent/cmd/calico_vpp_dataplane.go +++ b/calico-vpp-agent/cmd/calico_vpp_dataplane.go @@ -39,7 +39,6 @@ import ( "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/connectivity" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/felix" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/health" - "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/prometheus" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/routing" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/services" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers" @@ -152,7 +151,6 @@ func main() { netWatcher := watchers.NewNetWatcher(vpp, log.WithFields(logrus.Fields{"component": "net-watcher"})) routingServer := routing.NewRoutingServer(vpp, bgpServer, log.WithFields(logrus.Fields{"component": "routing"})) serviceServer := services.NewServiceServer(vpp, k8sclient, log.WithFields(logrus.Fields{"component": "services"})) - prometheusServer := prometheus.NewPrometheusServer(vpp, log.WithFields(logrus.Fields{"component": "prometheus"})) localSIDWatcher := watchers.NewLocalSIDWatcher(vpp, clientv3, log.WithFields(logrus.Fields{"subcomponent": "localsid-watcher"})) felixServer, err := felix.NewFelixServer(vpp, log.WithFields(logrus.Fields{"component": "felix"})) if err != nil { @@ -265,7 +263,6 @@ func main() { Go(routingServer.ServeRouting) Go(serviceServer.ServeService) Go(cniServer.ServeCNI) - Go(prometheusServer.ServePrometheus) // watch LocalSID if SRv6 is enabled if *config.GetCalicoVppFeatureGates().SRv6Enabled { diff --git a/calico-vpp-agent/felix/felix_server.go b/calico-vpp-agent/felix/felix_server.go index 211fe133..291567cd 100644 --- a/calico-vpp-agent/felix/felix_server.go +++ b/calico-vpp-agent/felix/felix_server.go @@ -36,6 +36,7 @@ import ( "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/cni/model" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common" + "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/prometheus" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/watchers" "github.com/projectcalico/vpp-dataplane/v3/config" "github.com/projectcalico/vpp-dataplane/v3/vpplink" @@ -114,6 +115,8 @@ type Server struct { GotOurNodeBGPchan chan interface{} GotOurNodeBGPchanOnce sync.Once + + prometheusServer *prometheus.PrometheusServer } // NewFelixServer creates a felix server @@ -145,6 +148,8 @@ func NewFelixServer(vpp *vpplink.VppLink, log *logrus.Entry) (*Server, error) { nodeStatesByName: make(map[string]*common.LocalNodeSpec), GotOurNodeBGPchan: make(chan interface{}), + + prometheusServer: prometheus.NewPrometheusServer(vpp, log.WithFields(logrus.Fields{"component": "prometheus"})), } reg := common.RegisterHandler(server.felixServerEventChan, "felix server events") @@ -364,6 +369,8 @@ func (s *Server) handleFelixServerEvents(evt common.CalicoVppEvent) error { EndpointID: podSpec.EndpointID, Network: podSpec.NetworkName, }, swIfIndex, podSpec.InterfaceName, podSpec.GetContainerIPs()) + // Notify prometheus server of pod addition + s.prometheusServer.OnPodAdded(podSpec) case common.PodDeleted: podSpec, ok := evt.Old.(*model.LocalPodSpec) if !ok { @@ -376,6 +383,8 @@ func (s *Server) handleFelixServerEvents(evt common.CalicoVppEvent) error { EndpointID: podSpec.EndpointID, Network: podSpec.NetworkName, }, podSpec.GetContainerIPs()) + // Notify prometheus server of pod deletion + s.prometheusServer.OnPodDeleted(podSpec) } case common.TunnelAdded: swIfIndex, ok := evt.New.(uint32) @@ -435,6 +444,17 @@ func (s *Server) handleFelixServerEvents(evt common.CalicoVppEvent) error { func (s *Server) ServeFelix(t *tomb.Tomb) error { s.log.Info("Starting felix server") + // Start prometheus server + if t.Alive() { + t.Go(func() error { + err := s.prometheusServer.ServePrometheus(t) + if err != nil { + s.log.Warnf("Prometheus server errored with %s", err) + } + return err + }) + } + listener, err := net.Listen("unix", config.FelixDataplaneSocket) if err != nil { return errors.Wrapf(err, "Could not bind to unix://%s", config.FelixDataplaneSocket) diff --git a/calico-vpp-agent/prometheus/prometheus.go b/calico-vpp-agent/prometheus/prometheus.go index e74eade5..5b9c2ef7 100644 --- a/calico-vpp-agent/prometheus/prometheus.go +++ b/calico-vpp-agent/prometheus/prometheus.go @@ -32,7 +32,6 @@ import ( "gopkg.in/tomb.v2" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/cni/model" - "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common" "github.com/projectcalico/vpp-dataplane/v3/config" "github.com/projectcalico/vpp-dataplane/v3/vpplink" ) @@ -49,7 +48,6 @@ type PrometheusServer struct { podInterfacesDetailsBySwifIndex map[uint32]podInterfaceDetails podInterfacesByKey map[string]model.LocalPodSpec statsclient *statsclient.StatsClient - channel chan common.CalicoVppEvent lock sync.Mutex httpServer *http.Server exporter *prometheusExporter.Exporter @@ -65,7 +63,6 @@ func NewPrometheusServer(vpp *vpplink.VppLink, log *logrus.Entry) *PrometheusSer server := &PrometheusServer{ log: log, vpp: vpp, - channel: make(chan common.CalicoVppEvent, 10), podInterfacesByKey: make(map[string]model.LocalPodSpec), podInterfacesDetailsBySwifIndex: make(map[uint32]podInterfaceDetails), statsclient: statsclient.NewStatsClient("" /* default socket name */), @@ -76,10 +73,6 @@ func NewPrometheusServer(vpp *vpplink.VppLink, log *logrus.Entry) *PrometheusSer exporter: exporter, } - if *config.GetCalicoVppFeatureGates().PrometheusEnabled { - reg := common.RegisterHandler(server.channel, "prometheus events") - reg.ExpectEvents(common.PodAdded, common.PodDeleted) - } return server } @@ -407,66 +400,66 @@ func (p *PrometheusServer) exportSessionScalarStat(name string, value int64) { } } +// OnPodAdded handles pod addition events directly +func (p *PrometheusServer) OnPodAdded(podSpec *model.LocalPodSpec) { + if !(*config.GetCalicoVppFeatureGates().PrometheusEnabled) { + return + } + + splittedWorkloadID := strings.SplitN(podSpec.WorkloadID, "/", 2) + if len(splittedWorkloadID) != 2 { + return + } + + p.lock.Lock() + defer p.lock.Unlock() + + if podSpec.MemifSwIfIndex != vpplink.InvalidSwIfIndex { + memifName := podSpec.InterfaceName + if podSpec.NetworkName == "" { + memifName = "vpp/memif-" + podSpec.InterfaceName + } + p.podInterfacesDetailsBySwifIndex[podSpec.MemifSwIfIndex] = podInterfaceDetails{ + podNamespace: splittedWorkloadID[0], + podName: splittedWorkloadID[1], + interfaceName: memifName, + } + } + if podSpec.TunTapSwIfIndex != vpplink.InvalidSwIfIndex { + p.podInterfacesDetailsBySwifIndex[podSpec.TunTapSwIfIndex] = podInterfaceDetails{ + podNamespace: splittedWorkloadID[0], + podName: splittedWorkloadID[1], + interfaceName: podSpec.InterfaceName, + } + } + p.podInterfacesByKey[podSpec.Key()] = *podSpec +} + +// OnPodDeleted handles pod deletion events directly +func (p *PrometheusServer) OnPodDeleted(podSpec *model.LocalPodSpec) { + if !(*config.GetCalicoVppFeatureGates().PrometheusEnabled) { + return + } + + p.lock.Lock() + defer p.lock.Unlock() + + initialPod := p.podInterfacesByKey[podSpec.Key()] + delete(p.podInterfacesByKey, initialPod.Key()) + if podSpec.MemifSwIfIndex != vpplink.InvalidSwIfIndex { + delete(p.podInterfacesDetailsBySwifIndex, initialPod.MemifSwIfIndex) + } + if podSpec.TunTapSwIfIndex != vpplink.InvalidSwIfIndex { + delete(p.podInterfacesDetailsBySwifIndex, initialPod.TunTapSwIfIndex) + } +} + func (p *PrometheusServer) ServePrometheus(t *tomb.Tomb) error { if !(*config.GetCalicoVppFeatureGates().PrometheusEnabled) { return nil } p.log.Infof("Serve() Prometheus exporter") - go func() { - for t.Alive() { - /* Note: we will only receive events we ask for when registering the chan */ - evt := <-p.channel - switch evt.Type { - case common.PodAdded: - podSpec, ok := evt.New.(*model.LocalPodSpec) - if !ok { - p.log.Errorf("evt.New is not a *model.LocalPodSpec %v", evt.New) - continue - } - splittedWorkloadID := strings.SplitN(podSpec.WorkloadID, "/", 2) - if len(splittedWorkloadID) != 2 { - continue - } - p.lock.Lock() - if podSpec.MemifSwIfIndex != vpplink.InvalidSwIfIndex { - memifName := podSpec.InterfaceName - if podSpec.NetworkName == "" { - memifName = "vpp/memif-" + podSpec.InterfaceName - } - p.podInterfacesDetailsBySwifIndex[podSpec.MemifSwIfIndex] = podInterfaceDetails{ - podNamespace: splittedWorkloadID[0], - podName: splittedWorkloadID[1], - interfaceName: memifName, - } - } - if podSpec.TunTapSwIfIndex != vpplink.InvalidSwIfIndex { - p.podInterfacesDetailsBySwifIndex[podSpec.TunTapSwIfIndex] = podInterfaceDetails{ - podNamespace: splittedWorkloadID[0], - podName: splittedWorkloadID[1], - interfaceName: podSpec.InterfaceName, - } - } - p.podInterfacesByKey[podSpec.Key()] = *podSpec - p.lock.Unlock() - case common.PodDeleted: - podSpec, ok := evt.Old.(*model.LocalPodSpec) - if !ok { - p.log.Errorf("evt.Old is not a *model.LocalPodSpec %v", evt.Old) - continue - } - p.lock.Lock() - initialPod := p.podInterfacesByKey[podSpec.Key()] - delete(p.podInterfacesByKey, initialPod.Key()) - if podSpec.MemifSwIfIndex != vpplink.InvalidSwIfIndex { - delete(p.podInterfacesDetailsBySwifIndex, initialPod.MemifSwIfIndex) - } - if podSpec.TunTapSwIfIndex != vpplink.InvalidSwIfIndex { - delete(p.podInterfacesDetailsBySwifIndex, initialPod.TunTapSwIfIndex) - } - p.lock.Unlock() - } - } - }() + err := p.statsclient.Connect() if err != nil { return errors.Wrap(err, "could not connect statsclient") diff --git a/calico-vpp-agent/prometheus/prometheus_test.go b/calico-vpp-agent/prometheus/prometheus_test.go index c0e27f44..7a73d380 100644 --- a/calico-vpp-agent/prometheus/prometheus_test.go +++ b/calico-vpp-agent/prometheus/prometheus_test.go @@ -30,7 +30,6 @@ import ( "gopkg.in/tomb.v2" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/cni/model" - "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/common" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/prometheus" "github.com/projectcalico/vpp-dataplane/v3/calico-vpp-agent/testutils" agentConf "github.com/projectcalico/vpp-dataplane/v3/config" @@ -91,7 +90,6 @@ var _ = Describe("Prometheus exporter functionality", func() { BeforeEach(func() { log = logrus.New() - common.ThePubSub = common.NewPubSub(log.WithFields(logrus.Fields{"component": "pubsub"})) // Enable prometheus feature gate agentConf.GetCalicoVppFeatureGates().PrometheusEnabled = &agentConf.True @@ -132,8 +130,8 @@ var _ = Describe("Prometheus exporter functionality", func() { // Add some fake containers to test interface stats using actual VPP interface indices // Use the uplink interface (tap0) and local0 interface for testing - addFakeContainer("test-namespace-1", "test-pod-1", "eth0", vpplink.InvalidSwIfIndex, uplinkSwIfIndex) - addFakeContainer("test-namespace-2", "test-pod-2", "eth0", vpplink.InvalidSwIfIndex, 0) + addFakeContainer(prometheusServer, "test-namespace-1", "test-pod-1", "eth0", vpplink.InvalidSwIfIndex, uplinkSwIfIndex) + addFakeContainer(prometheusServer, "test-namespace-2", "test-pod-2", "eth0", vpplink.InvalidSwIfIndex, 0) // Start prometheus server testTomb = &tomb.Tomb{} @@ -254,7 +252,7 @@ var _ = Describe("Prometheus exporter functionality", func() { By("Adding a new pod") // Use a different interface name to distinguish this pod // Use tap interface with the uplink interface index - addFakeContainer("dynamic-namespace", "dynamic-pod", "eth1", vpplink.InvalidSwIfIndex, uplinkSwIfIndex) + addFakeContainer(prometheusServer, "dynamic-namespace", "dynamic-pod", "eth1", vpplink.InvalidSwIfIndex, uplinkSwIfIndex) // Give more time for event processing and metrics collection time.Sleep(2 * time.Second) @@ -476,7 +474,7 @@ func parseMetrics(metricsOutput string, metricName string) []*MetricInfo { } // addFakeContainer simulates adding a container/pod to the prometheus server -func addFakeContainer(namespace, podName, interfaceName string, memifSwIfIndex, tunTapSwIfIndex uint32) { +func addFakeContainer(prometheusServer *prometheus.PrometheusServer, namespace, podName, interfaceName string, memifSwIfIndex, tunTapSwIfIndex uint32) { // Create fake pod spec podSpec := &model.LocalPodSpec{ WorkloadID: fmt.Sprintf("%s/%s", namespace, podName), @@ -487,12 +485,5 @@ func addFakeContainer(namespace, podName, interfaceName string, memifSwIfIndex, }, } - // Simulate pod addition event via PubSub mechanism - event := common.CalicoVppEvent{ - Type: common.PodAdded, - New: podSpec, - } - - // Send event using the common PubSub mechanism - common.SendEvent(event) + prometheusServer.OnPodAdded(podSpec) }