From 0022d04171152e5e3f587198f1ff63cc9ea8ecfa Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Thu, 9 Oct 2025 12:13:20 +0200 Subject: [PATCH 01/29] feat(inputs.nats_consumer): Ack messages when accepted on the output --- plugins/inputs/nats_consumer/nats_consumer.go | 55 +++++++++++++------ 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 43531cc53e912..d96b6a5012f54 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -47,7 +47,8 @@ type NatsConsumer struct { parser telegraf.Parser // channel for all incoming NATS messages - in chan *nats.Msg + in chan *nats.Msg + undelivered map[telegraf.TrackingID]*nats.Msg // channel for all NATS read errors errs chan error acc telegraf.TrackingAccumulator @@ -144,22 +145,14 @@ func (n *NatsConsumer) Start(acc telegraf.Accumulator) error { if len(n.JsSubjects) > 0 { var connErr error - n.jsConn, connErr = n.conn.JetStream(nats.PublishAsyncMaxPending(256)) + n.jsConn, connErr = n.conn.JetStream(nats.PublishAsyncMaxPending(n.PendingMessageLimit)) if connErr != nil { return connErr } if n.jsConn != nil { for _, jsSub := range n.JsSubjects { - sub, err := n.jsConn.QueueSubscribe(jsSub, n.QueueGroup, func(m *nats.Msg) { - n.in <- m - }) - if err != nil { - return err - } - - // set the subscription pending limits - err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit) + sub, err := n.jsConn.ChanQueueSubscribe(jsSub, n.QueueGroup, n.in, nats.ManualAck()) if err != nil { return err } @@ -207,14 +200,17 @@ func (n *NatsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro // receiver() reads all incoming messages from NATS, and parses them into // telegraf metrics. func (n *NatsConsumer) receiver(ctx context.Context) { + n.undelivered = make(map[telegraf.TrackingID]*nats.Msg) sem := make(semaphore, n.MaxUndeliveredMessages) for { select { case <-ctx.Done(): return - case <-n.acc.Delivered(): - <-sem + case track := <-n.acc.Delivered(): + if n.onDelivery(track) { + <-sem + } case err := <-n.errs: n.Log.Error(err) case sem <- empty{}: @@ -224,13 +220,15 @@ func (n *NatsConsumer) receiver(ctx context.Context) { case err := <-n.errs: <-sem n.Log.Error(err) - case <-n.acc.Delivered(): - <-sem - <-sem + case track := <-n.acc.Delivered(): + if n.onDelivery(track) { + <-sem + <-sem + } case msg := <-n.in: metrics, err := n.parser.Parse(msg.Data) if err != nil { - n.Log.Errorf("Subject: %s, error: %s", msg.Subject, err.Error()) + n.Log.Errorf("Failed to parse message in subject %s: %s", msg.Subject, err.Error()) <-sem continue } @@ -248,6 +246,29 @@ func (n *NatsConsumer) receiver(ctx context.Context) { } } +func (n *NatsConsumer) onDelivery(track telegraf.DeliveryInfo) bool { + msg, ok := n.undelivered[track.ID()] + if !ok { + // Added by a previous connection + return false + } + + if track.Delivered() { + err := msg.Ack() + if err != nil { + n.Log.Errorf("Failed to Ack message on subject %s: %s", msg.Subject, msg.Sub.Queue, err.Error()) + } + } else { + err := msg.Nak() + if err != nil { + n.Log.Errorf("Failed to Nak message on subject %s: %s", msg.Subject, err.Error()) + } + } + + delete(n.undelivered, track.ID()) + return true +} + func (n *NatsConsumer) clean() { for _, sub := range n.subs { if err := sub.Unsubscribe(); err != nil { From fe4722b5131a5c6e271ef483ae4b01afc3caafeb Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Thu, 9 Oct 2025 12:14:55 +0200 Subject: [PATCH 02/29] Change to channel based queue subscription --- plugins/inputs/nats_consumer/nats_consumer.go | 15 +++------------ .../inputs/nats_consumer/nats_consumer_test.go | 2 -- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index d96b6a5012f54..f55a50cbdde36 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -35,7 +35,7 @@ type NatsConsumer struct { NkeySeed string `toml:"nkey_seed"` JsSubjects []string `toml:"jetstream_subjects"` PendingMessageLimit int `toml:"pending_message_limit"` - PendingBytesLimit int `toml:"pending_bytes_limit"` + PendingBytesLimit int `toml:"pending_bytes_limit" deprecated:"1.37.0;1.40.0;unused"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` Log telegraf.Logger `toml:"-"` tls.ClientConfig @@ -125,17 +125,9 @@ func (n *NatsConsumer) Start(acc telegraf.Accumulator) error { // Setup message and error channels n.errs = make(chan error) - n.in = make(chan *nats.Msg, 1000) + n.in = make(chan *nats.Msg, n.PendingMessageLimit) for _, subj := range n.Subjects { - sub, err := n.conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) { - n.in <- m - }) - if err != nil { - return err - } - - // set the subscription pending limits - err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit) + sub, err := n.conn.ChanQueueSubscribe(subj, n.QueueGroup, n.in) if err != nil { return err } @@ -295,7 +287,6 @@ func init() { Servers: []string{"nats://localhost:4222"}, Subjects: []string{"telegraf"}, QueueGroup: "telegraf_consumers", - PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, } diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 27b1408747012..83a5c575b7a48 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -32,7 +32,6 @@ func TestStartStop(t *testing.T) { Servers: []string{fmt.Sprintf("nats://%s:%s", container.Address, container.Ports["4222"])}, Subjects: []string{"telegraf"}, QueueGroup: "telegraf_consumers", - PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, Log: testutil.Logger{}, @@ -144,7 +143,6 @@ func TestSendReceive(t *testing.T) { Servers: []string{addr}, Subjects: subjects, QueueGroup: "telegraf_consumers", - PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, Log: testutil.Logger{}, From 2b545020a07ddc089523300fd1d4a9ab00582a81 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Thu, 9 Oct 2025 13:17:37 +0200 Subject: [PATCH 03/29] Linter fix --- plugins/inputs/nats_consumer/nats_consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index f55a50cbdde36..ee9a40ce8ae8c 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -248,7 +248,7 @@ func (n *NatsConsumer) onDelivery(track telegraf.DeliveryInfo) bool { if track.Delivered() { err := msg.Ack() if err != nil { - n.Log.Errorf("Failed to Ack message on subject %s: %s", msg.Subject, msg.Sub.Queue, err.Error()) + n.Log.Errorf("Failed to Ack message on subject %s: %s", msg.Subject, err.Error()) } } else { err := msg.Nak() From 95f15a77285be9d4bdb4dd612bd0e3beb5d11719 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 10 Oct 2025 11:24:30 +0200 Subject: [PATCH 04/29] Apply suggestion from @srebhan Co-authored-by: Sven Rebhan <36194019+srebhan@users.noreply.github.com> --- plugins/inputs/nats_consumer/nats_consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 2cec5a900bd69..a13cb9758a3a4 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -233,7 +233,7 @@ func (n *NatsConsumer) receiver(ctx context.Context) { case msg := <-n.in: metrics, err := n.parser.Parse(msg.Data) if err != nil { - n.Log.Errorf("Failed to parse message in subject %s: %s", msg.Subject, err.Error()) + n.Log.Errorf("Failed to parse message in subject %s: %v", msg.Subject, err) <-sem continue } From f73359c0b4353822d1b4c0ff02fe69ca9c6c8432 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 10 Oct 2025 11:41:28 +0200 Subject: [PATCH 05/29] Pending limit not applicable for channel subscriptions --- plugins/inputs/nats_consumer/nats_consumer.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index a13cb9758a3a4..f8605dd5b4323 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -156,12 +156,6 @@ func (n *NatsConsumer) Start(acc telegraf.Accumulator) error { return err } - // set the subscription pending limits - err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit) - if err != nil { - return err - } - n.jsSubs = append(n.jsSubs, sub) } } From 2d198a953d75f6f5c57deea88ec242b5436d608f Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 10 Oct 2025 11:52:00 +0200 Subject: [PATCH 06/29] Don't use deprecated option. --- plugins/inputs/nats_consumer/README.md | 5 ++--- plugins/inputs/nats_consumer/nats_consumer_test.go | 3 --- plugins/inputs/nats_consumer/sample.conf | 5 ++--- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/plugins/inputs/nats_consumer/README.md b/plugins/inputs/nats_consumer/README.md index 862b8b8ec6d22..e66f456896b03 100644 --- a/plugins/inputs/nats_consumer/README.md +++ b/plugins/inputs/nats_consumer/README.md @@ -88,10 +88,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Use TLS but skip chain & host verification # insecure_skip_verify = false - ## Sets the limits for pending msgs and bytes for each subscription - ## These shouldn't need to be adjusted except in very high throughput scenarios + ## Sets the limit for pending messages for each subscription + ## This shouldn't need to be adjusted except in very high throughput scenarios # pending_message_limit = 65536 - # pending_bytes_limit = 67108864 ## Max undelivered messages ## This plugin uses tracking metrics, which ensure messages are read to diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 51efe467a8427..9802d6df89fc8 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -218,7 +218,6 @@ func TestJetStreamIntegrationSendReceive(t *testing.T) { JsSubjects: []string{subject}, JsStream: streamName, QueueGroup: "telegraf_consumers", - PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, Log: testutil.Logger{}, @@ -296,7 +295,6 @@ func TestJetStreamIntegrationSourcedStreamNotFound(t *testing.T) { Servers: []string{addr}, JsSubjects: []string{"TESTSTREAM"}, QueueGroup: "telegraf_consumers", - PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, Log: testutil.Logger{}, @@ -352,7 +350,6 @@ func TestJetStreamIntegrationSourcedStreamFound(t *testing.T) { JsSubjects: []string{"TESTSTREAM"}, JsStream: streamName, QueueGroup: "telegraf_consumers", - PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, Log: testutil.Logger{}, diff --git a/plugins/inputs/nats_consumer/sample.conf b/plugins/inputs/nats_consumer/sample.conf index 42a94af5c9158..f962a3f0c93be 100644 --- a/plugins/inputs/nats_consumer/sample.conf +++ b/plugins/inputs/nats_consumer/sample.conf @@ -45,10 +45,9 @@ ## Use TLS but skip chain & host verification # insecure_skip_verify = false - ## Sets the limits for pending msgs and bytes for each subscription - ## These shouldn't need to be adjusted except in very high throughput scenarios + ## Sets the limit for pending messages for each subscription + ## This shouldn't need to be adjusted except in very high throughput scenarios # pending_message_limit = 65536 - # pending_bytes_limit = 67108864 ## Max undelivered messages ## This plugin uses tracking metrics, which ensure messages are read to From 065b6b5f5adefd2d34ce1f4eafdcd6075166f3d0 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 10 Oct 2025 11:52:22 +0200 Subject: [PATCH 07/29] Actually store undelivered messages --- plugins/inputs/nats_consumer/nats_consumer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index f8605dd5b4323..97684982f5e53 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -239,7 +239,8 @@ func (n *NatsConsumer) receiver(ctx context.Context) { for _, m := range metrics { m.AddTag("subject", msg.Subject) } - n.acc.AddTrackingMetricGroup(metrics) + id := n.acc.AddTrackingMetricGroup(metrics) + n.undelivered[id] = msg } } } From 824ebbe974c7e0457c208616856d5f2d83a4f709 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 10 Oct 2025 13:13:36 +0200 Subject: [PATCH 08/29] Add tests --- .../nats_consumer/nats_consumer_test.go | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 9802d6df89fc8..797cad766e18f 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -167,7 +167,6 @@ func TestSendReceive(t *testing.T) { require.NoError(t, publisher.send(topic, msg)) } } - publisher.disconnect() // Wait for the metrics to be collected require.Eventually(t, func() bool { @@ -178,6 +177,16 @@ func TestSendReceive(t *testing.T) { actual := acc.GetTelegrafMetrics() testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.IgnoreTime(), testutil.SortMetrics()) + + // Acknowledge the message and check undelivered tracking + require.Len(t, plugin.undelivered, int(publisher.conn.Stats().OutMsgs)) + for _, m := range actual { + m.Accept() + } + + require.Eventually(t, func() bool { + return len(plugin.undelivered) == 0 + }, time.Second, 100*time.Millisecond, "undelivered messages not cleared") }) } } @@ -213,6 +222,7 @@ func TestJetStreamIntegrationSendReceive(t *testing.T) { require.NoError(t, err) // Setup the plugin for JetStream + log := testutil.CaptureLogger{} plugin := &NatsConsumer{ Servers: []string{addr}, JsSubjects: []string{subject}, @@ -220,7 +230,7 @@ func TestJetStreamIntegrationSendReceive(t *testing.T) { QueueGroup: "telegraf_consumers", PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, - Log: testutil.Logger{}, + Log: &log, } parser := &influx.Parser{} @@ -256,6 +266,19 @@ func TestJetStreamIntegrationSendReceive(t *testing.T) { ), } testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime(), testutil.SortMetrics()) + + // Acknowledge the message and check undelivered tracking + log.Clear() + require.Len(t, plugin.undelivered, 1) + for _, m := range actual { + m.Accept() + } + + require.Eventually(t, func() bool { + return len(plugin.undelivered) == 0 + }, time.Second, 100*time.Millisecond, "undelivered messages not cleared") + + require.Empty(t, log.Messages(), "no warnings or errors should be logged") } func TestJetStreamIntegrationSourcedStreamNotFound(t *testing.T) { From 16fd2ecd971ee532e69eec4fec707e65b139ec93 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 10 Oct 2025 13:15:56 +0200 Subject: [PATCH 09/29] Reset value --- plugins/inputs/nats_consumer/nats_consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 97684982f5e53..e2bd19f7c04cd 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -144,7 +144,7 @@ func (n *NatsConsumer) Start(acc telegraf.Accumulator) error { if n.JsStream != "" { subOptions = append(subOptions, nats.BindStream(n.JsStream)) } - n.jsConn, connErr = n.conn.JetStream(nats.PublishAsyncMaxPending(n.PendingMessageLimit)) + n.jsConn, connErr = n.conn.JetStream(nats.PublishAsyncMaxPending(256)) if connErr != nil { return connErr } From c05920e9e56ed493acbbcfdfc03177fb0c4bc57b Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 10 Oct 2025 13:27:40 +0200 Subject: [PATCH 10/29] Add locking --- plugins/inputs/nats_consumer/nats_consumer.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index e2bd19f7c04cd..1db142d131b53 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -25,6 +25,8 @@ var ( ) type NatsConsumer struct { + sync.Mutex + QueueGroup string `toml:"queue_group"` Subjects []string `toml:"subjects"` Servers []string `toml:"servers"` @@ -247,6 +249,9 @@ func (n *NatsConsumer) receiver(ctx context.Context) { } func (n *NatsConsumer) onDelivery(track telegraf.DeliveryInfo) bool { + n.Lock() + defer n.Unlock() + msg, ok := n.undelivered[track.ID()] if !ok { // Added by a previous connection From 47e47ae1e321301d9107c561c4718b3642c912b1 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 10 Oct 2025 14:04:47 +0200 Subject: [PATCH 11/29] Rewrite delivery handler. --- plugins/inputs/nats_consumer/nats_consumer.go | 78 +++++++++++-------- 1 file changed, 44 insertions(+), 34 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 1db142d131b53..a5d7dc5982915 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -55,6 +55,7 @@ type NatsConsumer struct { // channel for all NATS read errors errs chan error acc telegraf.TrackingAccumulator + sem semaphore wg sync.WaitGroup cancel context.CancelFunc } @@ -85,7 +86,9 @@ func (n *NatsConsumer) SetParser(parser telegraf.Parser) { // Start the nats consumer. Caller must call *NatsConsumer.Stop() to clean up. func (n *NatsConsumer) Start(acc telegraf.Accumulator) error { + n.sem = make(semaphore, n.MaxUndeliveredMessages) n.acc = acc.WithTracking(n.MaxUndeliveredMessages) + n.undelivered = make(map[telegraf.TrackingID]*nats.Msg) options := []nats.Option{ nats.MaxReconnects(-1), @@ -167,6 +170,13 @@ func (n *NatsConsumer) Start(acc telegraf.Accumulator) error { ctx, cancel := context.WithCancel(context.Background()) n.cancel = cancel + // Start goroutine to handle delivery notifications from accumulator. + n.wg.Add(1) + go func() { + defer n.wg.Done() + n.waitForDelivery(ctx) + }() + // Start the message reader n.wg.Add(1) go func() { @@ -201,36 +211,24 @@ func (n *NatsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro // receiver() reads all incoming messages from NATS, and parses them into // telegraf metrics. func (n *NatsConsumer) receiver(ctx context.Context) { - n.undelivered = make(map[telegraf.TrackingID]*nats.Msg) - sem := make(semaphore, n.MaxUndeliveredMessages) - for { select { case <-ctx.Done(): return - case track := <-n.acc.Delivered(): - if n.onDelivery(track) { - <-sem - } case err := <-n.errs: n.Log.Error(err) - case sem <- empty{}: + case n.sem <- empty{}: select { case <-ctx.Done(): return case err := <-n.errs: - <-sem + <-n.sem n.Log.Error(err) - case track := <-n.acc.Delivered(): - if n.onDelivery(track) { - <-sem - <-sem - } case msg := <-n.in: metrics, err := n.parser.Parse(msg.Data) if err != nil { n.Log.Errorf("Failed to parse message in subject %s: %v", msg.Subject, err) - <-sem + <-n.sem continue } if len(metrics) == 0 { @@ -248,30 +246,42 @@ func (n *NatsConsumer) receiver(ctx context.Context) { } } -func (n *NatsConsumer) onDelivery(track telegraf.DeliveryInfo) bool { +func (n *NatsConsumer) waitForDelivery(parentCtx context.Context) { + for { + select { + case <-parentCtx.Done(): + return + case track := <-n.acc.Delivered(): + <-n.sem + msg := n.removeDelivered(track.ID()) + + if msg != nil { + if track.Delivered() { + err := msg.Ack() + if err != nil { + n.Log.Errorf("Failed to Ack message on subject %s: %v", msg.Subject, err) + } + } else { + err := msg.Nak() + if err != nil { + n.Log.Errorf("Failed to Nak message on subject %s: %v", msg.Subject, err) + } + } + } + } + } +} + +func (n *NatsConsumer) removeDelivered(id telegraf.TrackingID) *nats.Msg { n.Lock() defer n.Unlock() - msg, ok := n.undelivered[track.ID()] + msg, ok := n.undelivered[id] if !ok { - // Added by a previous connection - return false - } - - if track.Delivered() { - err := msg.Ack() - if err != nil { - n.Log.Errorf("Failed to Ack message on subject %s: %s", msg.Subject, err.Error()) - } - } else { - err := msg.Nak() - if err != nil { - n.Log.Errorf("Failed to Nak message on subject %s: %s", msg.Subject, err.Error()) - } + return nil } - - delete(n.undelivered, track.ID()) - return true + delete(n.undelivered, id) + return msg } func (n *NatsConsumer) clean() { From b70746df2345637c44a4ad5b5fb28f16731b2cab Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 10 Oct 2025 14:28:59 +0200 Subject: [PATCH 12/29] Add extra locking --- plugins/inputs/nats_consumer/nats_consumer.go | 2 ++ plugins/inputs/nats_consumer/nats_consumer_test.go | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index a5d7dc5982915..5feead64e638e 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -239,8 +239,10 @@ func (n *NatsConsumer) receiver(ctx context.Context) { for _, m := range metrics { m.AddTag("subject", msg.Subject) } + n.Lock() id := n.acc.AddTrackingMetricGroup(metrics) n.undelivered[id] = msg + n.Unlock() } } } diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 797cad766e18f..0c30a4252686e 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -179,12 +179,16 @@ func TestSendReceive(t *testing.T) { testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.IgnoreTime(), testutil.SortMetrics()) // Acknowledge the message and check undelivered tracking + plugin.Lock() require.Len(t, plugin.undelivered, int(publisher.conn.Stats().OutMsgs)) + plugin.Unlock() for _, m := range actual { m.Accept() } require.Eventually(t, func() bool { + plugin.Lock() + defer plugin.Unlock() return len(plugin.undelivered) == 0 }, time.Second, 100*time.Millisecond, "undelivered messages not cleared") }) @@ -269,12 +273,16 @@ func TestJetStreamIntegrationSendReceive(t *testing.T) { // Acknowledge the message and check undelivered tracking log.Clear() + plugin.Lock() require.Len(t, plugin.undelivered, 1) + plugin.Unlock() for _, m := range actual { m.Accept() } require.Eventually(t, func() bool { + plugin.Lock() + defer plugin.Unlock() return len(plugin.undelivered) == 0 }, time.Second, 100*time.Millisecond, "undelivered messages not cleared") From 935a5eb7a7690dd2771467870c25633e85635e07 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 10 Oct 2025 17:05:08 +0200 Subject: [PATCH 13/29] Try to figure out if message is from a JetStream.. --- plugins/inputs/nats_consumer/nats_consumer.go | 9 ++++++--- .../inputs/nats_consumer/nats_consumer_test.go | 15 +-------------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 5feead64e638e..925d5a25a9193 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -239,10 +240,12 @@ func (n *NatsConsumer) receiver(ctx context.Context) { for _, m := range metrics { m.AddTag("subject", msg.Subject) } - n.Lock() id := n.acc.AddTrackingMetricGroup(metrics) - n.undelivered[id] = msg - n.Unlock() + if !choice.Contains(msg.Subject, n.Subjects) { + n.Lock() + n.undelivered[id] = msg + n.Unlock() + } } } } diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 0c30a4252686e..670a46c084ded 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -167,6 +167,7 @@ func TestSendReceive(t *testing.T) { require.NoError(t, publisher.send(topic, msg)) } } + publisher.disconnect() // Wait for the metrics to be collected require.Eventually(t, func() bool { @@ -177,20 +178,6 @@ func TestSendReceive(t *testing.T) { actual := acc.GetTelegrafMetrics() testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.IgnoreTime(), testutil.SortMetrics()) - - // Acknowledge the message and check undelivered tracking - plugin.Lock() - require.Len(t, plugin.undelivered, int(publisher.conn.Stats().OutMsgs)) - plugin.Unlock() - for _, m := range actual { - m.Accept() - } - - require.Eventually(t, func() bool { - plugin.Lock() - defer plugin.Unlock() - return len(plugin.undelivered) == 0 - }, time.Second, 100*time.Millisecond, "undelivered messages not cleared") }) } } From f8600068d9ba192415aa875fb7090ee9441d2604 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 10 Oct 2025 17:38:55 +0200 Subject: [PATCH 14/29] Better jetstream detection --- plugins/inputs/nats_consumer/nats_consumer.go | 12 +++++++----- plugins/inputs/nats_consumer/nats_consumer_test.go | 4 ++++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 925d5a25a9193..8f8ae9f28660e 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -12,7 +12,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -241,10 +240,13 @@ func (n *NatsConsumer) receiver(ctx context.Context) { m.AddTag("subject", msg.Subject) } id := n.acc.AddTrackingMetricGroup(metrics) - if !choice.Contains(msg.Subject, n.Subjects) { - n.Lock() - n.undelivered[id] = msg - n.Unlock() + for _, s := range n.jsSubs { + if msg.Sub == s { + n.Lock() + n.undelivered[id] = msg + n.Unlock() + break + } } } } diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 670a46c084ded..fa91354ddf3b7 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -178,6 +178,10 @@ func TestSendReceive(t *testing.T) { actual := acc.GetTelegrafMetrics() testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.IgnoreTime(), testutil.SortMetrics()) + + plugin.Lock() + defer plugin.Unlock() + require.Empty(t, plugin.undelivered) }) } } From adef87c76afac8b7b98e6db5946046a1f2c1e317 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Tue, 14 Oct 2025 14:39:07 +0200 Subject: [PATCH 15/29] Terminate message when unable to parse --- plugins/inputs/nats_consumer/nats_consumer.go | 60 +++++++++++++------ 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 8f8ae9f28660e..a625938f6359b 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -218,6 +218,7 @@ func (n *NatsConsumer) receiver(ctx context.Context) { case err := <-n.errs: n.Log.Error(err) case n.sem <- empty{}: + L: select { case <-ctx.Done(): return @@ -225,34 +226,55 @@ func (n *NatsConsumer) receiver(ctx context.Context) { <-n.sem n.Log.Error(err) case msg := <-n.in: - metrics, err := n.parser.Parse(msg.Data) - if err != nil { - n.Log.Errorf("Failed to parse message in subject %s: %v", msg.Subject, err) - <-n.sem - continue - } - if len(metrics) == 0 { - once.Do(func() { - n.Log.Debug(internal.NoMetricsCreatedMsg) - }) - } - for _, m := range metrics { - m.AddTag("subject", msg.Subject) - } - id := n.acc.AddTrackingMetricGroup(metrics) for _, s := range n.jsSubs { if msg.Sub == s { - n.Lock() - n.undelivered[id] = msg - n.Unlock() - break + n.handleJetstreamMessage(msg) + break L } } + n.handleMessage(msg) } } } } +func (n *NatsConsumer) handleMessage(msg *nats.Msg) (telegraf.TrackingID, error) { + metrics, err := n.parser.Parse(msg.Data) + if err != nil { + n.Log.Errorf("Failed to parse message in subject %s: %v", msg.Subject, err) + <-n.sem + return 0, err + } + if len(metrics) == 0 { + once.Do(func() { + n.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } + for _, m := range metrics { + m.AddTag("subject", msg.Subject) + } + return n.acc.AddTrackingMetricGroup(metrics), nil +} + +func (n *NatsConsumer) handleJetstreamMessage(msg *nats.Msg) { + n.Lock() + defer n.Unlock() + + if err := msg.InProgress(); err != nil { + n.Log.Warnf("Failed to mark message as in progress on subject %s: %v", msg.Subject, err) + } + + id, err := n.handleMessage(msg) + if err != nil { + if err := msg.Term(); err != nil { + n.Log.Errorf("Failed to terminate message on subject %s: %v", msg.Subject, err) + } + return + } + + n.undelivered[id] = msg +} + func (n *NatsConsumer) waitForDelivery(parentCtx context.Context) { for { select { From 27160493c3b0dfbfe96ce5975d4ee7fe4a7a96de Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Tue, 14 Oct 2025 14:42:30 +0200 Subject: [PATCH 16/29] Rename context argument --- plugins/inputs/nats_consumer/nats_consumer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index a625938f6359b..5f9b7c6c65091 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -275,10 +275,10 @@ func (n *NatsConsumer) handleJetstreamMessage(msg *nats.Msg) { n.undelivered[id] = msg } -func (n *NatsConsumer) waitForDelivery(parentCtx context.Context) { +func (n *NatsConsumer) waitForDelivery(ctx context.Context) { for { select { - case <-parentCtx.Done(): + case <-ctx.Done(): return case track := <-n.acc.Delivered(): <-n.sem From 0f60378c530e69bba097f5940b2177957c426dc4 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Tue, 14 Oct 2025 15:00:14 +0200 Subject: [PATCH 17/29] Move sync.Mutex --- plugins/inputs/nats_consumer/nats_consumer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 5f9b7c6c65091..1b2d38720bd5d 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -25,8 +25,6 @@ var ( ) type NatsConsumer struct { - sync.Mutex - QueueGroup string `toml:"queue_group"` Subjects []string `toml:"subjects"` Servers []string `toml:"servers"` @@ -58,6 +56,7 @@ type NatsConsumer struct { sem semaphore wg sync.WaitGroup cancel context.CancelFunc + sync.Mutex } type ( From 8d0eb118f5b84c76ec7c438f8ef61e5437e3aed3 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Tue, 14 Oct 2025 17:16:11 +0200 Subject: [PATCH 18/29] Change error handling --- plugins/inputs/nats_consumer/nats_consumer.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 1b2d38720bd5d..f2bffbc503d9f 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -231,7 +231,9 @@ func (n *NatsConsumer) receiver(ctx context.Context) { break L } } - n.handleMessage(msg) + if _, err := n.handleMessage(msg); err != nil { + n.Log.Errorf("Failed to handle message on subject %s: %v", msg.Subject, err) + } } } } @@ -240,9 +242,8 @@ func (n *NatsConsumer) receiver(ctx context.Context) { func (n *NatsConsumer) handleMessage(msg *nats.Msg) (telegraf.TrackingID, error) { metrics, err := n.parser.Parse(msg.Data) if err != nil { - n.Log.Errorf("Failed to parse message in subject %s: %v", msg.Subject, err) <-n.sem - return 0, err + return 0, fmt.Errorf("failed to parse: %w", err) } if len(metrics) == 0 { once.Do(func() { @@ -260,13 +261,14 @@ func (n *NatsConsumer) handleJetstreamMessage(msg *nats.Msg) { defer n.Unlock() if err := msg.InProgress(); err != nil { - n.Log.Warnf("Failed to mark message as in progress on subject %s: %v", msg.Subject, err) + n.Log.Warnf("Failed to mark JetStream message as in progress on subject %s: %v", msg.Subject, err) } id, err := n.handleMessage(msg) if err != nil { + n.Log.Errorf("Failed to handle JetStream message on subject %s: %v", msg.Subject, err) if err := msg.Term(); err != nil { - n.Log.Errorf("Failed to terminate message on subject %s: %v", msg.Subject, err) + n.Log.Errorf("Failed to terminate JetStream message on subject %s: %v", msg.Subject, err) } return } @@ -287,12 +289,12 @@ func (n *NatsConsumer) waitForDelivery(ctx context.Context) { if track.Delivered() { err := msg.Ack() if err != nil { - n.Log.Errorf("Failed to Ack message on subject %s: %v", msg.Subject, err) + n.Log.Errorf("Failed to Ack JetStream message on subject %s: %v", msg.Subject, err) } } else { err := msg.Nak() if err != nil { - n.Log.Errorf("Failed to Nak message on subject %s: %v", msg.Subject, err) + n.Log.Errorf("Failed to Nak JetStream message on subject %s: %v", msg.Subject, err) } } } From 3fa2ccc36f96af86cfc6efc72726ddf5f71d4100 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Mon, 3 Nov 2025 10:56:13 +0100 Subject: [PATCH 19/29] Apply suggestion from @srebhan Co-authored-by: Sven Rebhan <36194019+srebhan@users.noreply.github.com> --- plugins/inputs/nats_consumer/nats_consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index f2bffbc503d9f..1f2341a3005eb 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -87,7 +87,7 @@ func (n *NatsConsumer) SetParser(parser telegraf.Parser) { func (n *NatsConsumer) Start(acc telegraf.Accumulator) error { n.sem = make(semaphore, n.MaxUndeliveredMessages) n.acc = acc.WithTracking(n.MaxUndeliveredMessages) - n.undelivered = make(map[telegraf.TrackingID]*nats.Msg) + n.undelivered = make(map[telegraf.TrackingID]*nats.Msg, n.MaxUndeliveredMessages) options := []nats.Option{ nats.MaxReconnects(-1), From 85f7f5f6e743b4fb003e333f439a37804c36827d Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Mon, 3 Nov 2025 11:09:02 +0100 Subject: [PATCH 20/29] Remove NAK on not delivered messages --- plugins/inputs/nats_consumer/nats_consumer.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 1f2341a3005eb..407a05686229b 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -291,11 +291,6 @@ func (n *NatsConsumer) waitForDelivery(ctx context.Context) { if err != nil { n.Log.Errorf("Failed to Ack JetStream message on subject %s: %v", msg.Subject, err) } - } else { - err := msg.Nak() - if err != nil { - n.Log.Errorf("Failed to Nak JetStream message on subject %s: %v", msg.Subject, err) - } } } } From 74a50d4e99f1960ce0b74fe01269c8648cfae990 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Mon, 3 Nov 2025 12:40:32 +0100 Subject: [PATCH 21/29] Terminate undelivered messages --- plugins/inputs/nats_consumer/nats_consumer.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 407a05686229b..fba31177bbbd4 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -289,7 +289,12 @@ func (n *NatsConsumer) waitForDelivery(ctx context.Context) { if track.Delivered() { err := msg.Ack() if err != nil { - n.Log.Errorf("Failed to Ack JetStream message on subject %s: %v", msg.Subject, err) + n.Log.Errorf("Failed to acknowledge JetStream message on subject %s: %v", msg.Subject, err) + } + } else { + err := msg.Term() + if err != nil { + n.Log.Errorf("Failed to terminate JetStream message on subject %s: %v", msg.Subject, err) } } } From 540a2a38aa674b4ad150b8ef0200f18efce9affc Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Mon, 3 Nov 2025 12:52:41 +0100 Subject: [PATCH 22/29] Revert using more optimal ChanQueueSubscribe --- plugins/inputs/nats_consumer/nats_consumer.go | 22 ++++++++++++++++--- .../nats_consumer/nats_consumer_test.go | 5 +++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index fba31177bbbd4..1390af58645f9 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -36,7 +36,7 @@ type NatsConsumer struct { JsSubjects []string `toml:"jetstream_subjects"` JsStream string `toml:"jetstream_stream"` PendingMessageLimit int `toml:"pending_message_limit"` - PendingBytesLimit int `toml:"pending_bytes_limit" deprecated:"1.37.0;1.40.0;unused"` + PendingBytesLimit int `toml:"pending_bytes_limit"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` Log telegraf.Logger `toml:"-"` tls.ClientConfig @@ -132,7 +132,15 @@ func (n *NatsConsumer) Start(acc telegraf.Accumulator) error { n.in = make(chan *nats.Msg, n.PendingMessageLimit) for _, subj := range n.Subjects { - sub, err := n.conn.ChanQueueSubscribe(subj, n.QueueGroup, n.in) + sub, err := n.conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) { + n.in <- m + }) + if err != nil { + return err + } + + // set the subscription pending limits + err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit) if err != nil { return err } @@ -155,7 +163,15 @@ func (n *NatsConsumer) Start(acc telegraf.Accumulator) error { if n.jsConn != nil { for _, jsSub := range n.JsSubjects { - sub, err := n.jsConn.ChanQueueSubscribe(jsSub, n.QueueGroup, n.in, subOptions...) + sub, err := n.jsConn.QueueSubscribe(jsSub, n.QueueGroup, func(m *nats.Msg) { + n.in <- m + }, subOptions...) + if err != nil { + return err + } + + // set the subscription pending limits + err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit) if err != nil { return err } diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index 3ff5101da0ba4..996594b17805f 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -31,6 +31,7 @@ func TestIntegrationStartStop(t *testing.T) { Servers: []string{"nats://" + container.Address + ":" + container.Ports["4222"]}, Subjects: []string{"telegraf"}, QueueGroup: "telegraf_consumers", + PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, Log: testutil.Logger{}, @@ -142,6 +143,7 @@ func TestIntegrationSendReceive(t *testing.T) { Servers: []string{addr}, Subjects: subjects, QueueGroup: "telegraf_consumers", + PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, Log: testutil.Logger{}, @@ -222,6 +224,7 @@ func TestJetStreamIntegrationSendReceive(t *testing.T) { JsSubjects: []string{subject}, JsStream: streamName, QueueGroup: "telegraf_consumers", + PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, Log: &log, @@ -316,6 +319,7 @@ func TestJetStreamIntegrationSourcedStreamNotFound(t *testing.T) { Servers: []string{addr}, JsSubjects: []string{"TESTSTREAM"}, QueueGroup: "telegraf_consumers", + PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, Log: testutil.Logger{}, @@ -371,6 +375,7 @@ func TestJetStreamIntegrationSourcedStreamFound(t *testing.T) { JsSubjects: []string{"TESTSTREAM"}, JsStream: streamName, QueueGroup: "telegraf_consumers", + PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, Log: testutil.Logger{}, From e82eb69459265de7ef8e496c8e12b108aeb6729e Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Mon, 3 Nov 2025 15:32:01 +0100 Subject: [PATCH 23/29] Rewrite receiver and waitForDelivery methods --- plugins/inputs/nats_consumer/nats_consumer.go | 133 ++++++++---------- 1 file changed, 56 insertions(+), 77 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 1390af58645f9..2a4d3cc46b037 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -5,6 +5,7 @@ import ( "context" _ "embed" "fmt" + "slices" "strings" "sync" @@ -227,69 +228,54 @@ func (n *NatsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro // telegraf metrics. func (n *NatsConsumer) receiver(ctx context.Context) { for { + // Acquire a semaphore to block consumption if the number of undelivered messages + // reached it's limit select { case <-ctx.Done(): return - case err := <-n.errs: - n.Log.Error(err) case n.sem <- empty{}: - L: - select { - case <-ctx.Done(): - return - case err := <-n.errs: - <-n.sem - n.Log.Error(err) - case msg := <-n.in: - for _, s := range n.jsSubs { - if msg.Sub == s { - n.handleJetstreamMessage(msg) - break L - } - } - if _, err := n.handleMessage(msg); err != nil { - n.Log.Errorf("Failed to handle message on subject %s: %v", msg.Subject, err) - } - } } - } -} -func (n *NatsConsumer) handleMessage(msg *nats.Msg) (telegraf.TrackingID, error) { - metrics, err := n.parser.Parse(msg.Data) - if err != nil { - <-n.sem - return 0, fmt.Errorf("failed to parse: %w", err) - } - if len(metrics) == 0 { - once.Do(func() { - n.Log.Debug(internal.NoMetricsCreatedMsg) - }) - } - for _, m := range metrics { - m.AddTag("subject", msg.Subject) - } - return n.acc.AddTrackingMetricGroup(metrics), nil -} + // Consume messages and errors + select { + case <-ctx.Done(): + return + case err := <-n.errs: + n.Log.Error(err) + case msg := <-n.in: + jetstreamMsg := slices.Contains(n.jsSubs, msg.Sub) -func (n *NatsConsumer) handleJetstreamMessage(msg *nats.Msg) { - n.Lock() - defer n.Unlock() + if jetstreamMsg { + if err := msg.InProgress(); err != nil { + n.Log.Warnf("Failed to mark JetStream message as in progress on subject %s: %v", msg.Subject, err) + } + } - if err := msg.InProgress(); err != nil { - n.Log.Warnf("Failed to mark JetStream message as in progress on subject %s: %v", msg.Subject, err) - } + // Parse the metric and add it to the accumulator + metrics, err := n.parser.Parse(msg.Data) + if err != nil { + <-n.sem + n.acc.AddError(fmt.Errorf("Failed to handle message on subject %s: %v", msg.Subject, err)) + } + if len(metrics) == 0 { + once.Do(func() { + n.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } else { + for _, m := range metrics { + m.AddTag("subject", msg.Subject) + } + id := n.acc.AddTrackingMetricGroup(metrics) - id, err := n.handleMessage(msg) - if err != nil { - n.Log.Errorf("Failed to handle JetStream message on subject %s: %v", msg.Subject, err) - if err := msg.Term(); err != nil { - n.Log.Errorf("Failed to terminate JetStream message on subject %s: %v", msg.Subject, err) + // Make sure we manually acknowledge the messages later on delivery to Telegraf output(s) + if jetstreamMsg { + n.Lock() + n.undelivered[id] = msg + n.Unlock() + } + } } - return } - - n.undelivered[id] = msg } func (n *NatsConsumer) waitForDelivery(ctx context.Context) { @@ -298,38 +284,31 @@ func (n *NatsConsumer) waitForDelivery(ctx context.Context) { case <-ctx.Done(): return case track := <-n.acc.Delivered(): - <-n.sem - msg := n.removeDelivered(track.ID()) + // Get the tracked metric if any. Please remember, only Jetstream messages support a manual ACK + n.Lock() + msg, ok := n.undelivered[track.ID()] + delete(n.undelivered, track.ID()) + n.Unlock() - if msg != nil { - if track.Delivered() { - err := msg.Ack() - if err != nil { - n.Log.Errorf("Failed to acknowledge JetStream message on subject %s: %v", msg.Subject, err) - } - } else { - err := msg.Term() - if err != nil { - n.Log.Errorf("Failed to terminate JetStream message on subject %s: %v", msg.Subject, err) - } + if !ok { + <-n.sem + continue + } + if track.Delivered() { + if err := msg.Ack(); err != nil { + n.Log.Errorf("Failed to acknowledge JetStream message on subject %s: %v", msg.Subject, err) + } + } else { + err := msg.Term() + if err != nil { + n.Log.Errorf("Failed to terminate JetStream message on subject %s: %v", msg.Subject, err) } } + <-n.sem } } } -func (n *NatsConsumer) removeDelivered(id telegraf.TrackingID) *nats.Msg { - n.Lock() - defer n.Unlock() - - msg, ok := n.undelivered[id] - if !ok { - return nil - } - delete(n.undelivered, id) - return msg -} - func (n *NatsConsumer) clean() { for _, sub := range n.subs { if err := sub.Unsubscribe(); err != nil { From af01db25f812e1931205cb1258677192589b7eb6 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Mon, 3 Nov 2025 15:37:01 +0100 Subject: [PATCH 24/29] Restore default value --- plugins/inputs/nats_consumer/nats_consumer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 2a4d3cc46b037..16a197ba91673 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -335,6 +335,7 @@ func init() { Servers: []string{"nats://localhost:4222"}, Subjects: []string{"telegraf"}, QueueGroup: "telegraf_consumers", + PendingBytesLimit: nats.DefaultSubPendingBytesLimit, PendingMessageLimit: nats.DefaultSubPendingMsgsLimit, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, } From 482834f693f79d9ac985cdebbd29aec2daf8014a Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Mon, 3 Nov 2025 15:45:21 +0100 Subject: [PATCH 25/29] Revert config file changes --- plugins/inputs/nats_consumer/README.md | 5 +++-- plugins/inputs/nats_consumer/sample.conf | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/plugins/inputs/nats_consumer/README.md b/plugins/inputs/nats_consumer/README.md index e66f456896b03..862b8b8ec6d22 100644 --- a/plugins/inputs/nats_consumer/README.md +++ b/plugins/inputs/nats_consumer/README.md @@ -88,9 +88,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Use TLS but skip chain & host verification # insecure_skip_verify = false - ## Sets the limit for pending messages for each subscription - ## This shouldn't need to be adjusted except in very high throughput scenarios + ## Sets the limits for pending msgs and bytes for each subscription + ## These shouldn't need to be adjusted except in very high throughput scenarios # pending_message_limit = 65536 + # pending_bytes_limit = 67108864 ## Max undelivered messages ## This plugin uses tracking metrics, which ensure messages are read to diff --git a/plugins/inputs/nats_consumer/sample.conf b/plugins/inputs/nats_consumer/sample.conf index f962a3f0c93be..42a94af5c9158 100644 --- a/plugins/inputs/nats_consumer/sample.conf +++ b/plugins/inputs/nats_consumer/sample.conf @@ -45,9 +45,10 @@ ## Use TLS but skip chain & host verification # insecure_skip_verify = false - ## Sets the limit for pending messages for each subscription - ## This shouldn't need to be adjusted except in very high throughput scenarios + ## Sets the limits for pending msgs and bytes for each subscription + ## These shouldn't need to be adjusted except in very high throughput scenarios # pending_message_limit = 65536 + # pending_bytes_limit = 67108864 ## Max undelivered messages ## This plugin uses tracking metrics, which ensure messages are read to From 315da1c81fb18ae519c9b92efb6a9f35068ff456 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Mon, 3 Nov 2025 16:17:56 +0100 Subject: [PATCH 26/29] Linter fix --- plugins/inputs/nats_consumer/nats_consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 16a197ba91673..f2d5f3ca401ac 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -255,7 +255,7 @@ func (n *NatsConsumer) receiver(ctx context.Context) { metrics, err := n.parser.Parse(msg.Data) if err != nil { <-n.sem - n.acc.AddError(fmt.Errorf("Failed to handle message on subject %s: %v", msg.Subject, err)) + n.acc.AddError(fmt.Errorf("failed to handle message on subject %s: %w", msg.Subject, err)) } if len(metrics) == 0 { once.Do(func() { From 00e545743a92b530f1b2ebf36c9884fe99b310ce Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Mon, 3 Nov 2025 16:22:26 +0100 Subject: [PATCH 27/29] Update plugins/inputs/nats_consumer/nats_consumer.go Co-authored-by: Sven Rebhan <36194019+srebhan@users.noreply.github.com> --- plugins/inputs/nats_consumer/nats_consumer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index f2d5f3ca401ac..a25ed5efac80d 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -299,8 +299,7 @@ func (n *NatsConsumer) waitForDelivery(ctx context.Context) { n.Log.Errorf("Failed to acknowledge JetStream message on subject %s: %v", msg.Subject, err) } } else { - err := msg.Term() - if err != nil { + if err := msg.Term(); err != nil { n.Log.Errorf("Failed to terminate JetStream message on subject %s: %v", msg.Subject, err) } } From 04da00b2005a845780e3cc30a77febe894e98972 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Thu, 6 Nov 2025 10:48:06 +0100 Subject: [PATCH 28/29] Apply comments --- plugins/inputs/nats_consumer/nats_consumer.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index a25ed5efac80d..466369c9dc1ab 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -254,13 +254,18 @@ func (n *NatsConsumer) receiver(ctx context.Context) { // Parse the metric and add it to the accumulator metrics, err := n.parser.Parse(msg.Data) if err != nil { - <-n.sem n.acc.AddError(fmt.Errorf("failed to handle message on subject %s: %w", msg.Subject, err)) } if len(metrics) == 0 { once.Do(func() { n.Log.Debug(internal.NoMetricsCreatedMsg) }) + <-n.sem + if jetstreamMsg { + if err := msg.Ack(); err != nil { + n.acc.AddError(fmt.Errorf("failed to acknowledge JetStream message on subject %s: %v", msg.Subject, err)) + } + } } else { for _, m := range metrics { m.AddTag("subject", msg.Subject) From b612218ddf751eec4880382b72977eb8c5c2f9fe Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Thu, 6 Nov 2025 15:36:23 +0100 Subject: [PATCH 29/29] Linter fix --- plugins/inputs/nats_consumer/nats_consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 466369c9dc1ab..dcde50008a011 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -263,7 +263,7 @@ func (n *NatsConsumer) receiver(ctx context.Context) { <-n.sem if jetstreamMsg { if err := msg.Ack(); err != nil { - n.acc.AddError(fmt.Errorf("failed to acknowledge JetStream message on subject %s: %v", msg.Subject, err)) + n.acc.AddError(fmt.Errorf("failed to acknowledge JetStream message on subject %s: %w", msg.Subject, err)) } } } else {