Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions plugins/inputs/nats_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios
## Sets the limit for pending messages for each subscription
## This shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536
# pending_bytes_limit = 67108864

## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
Expand Down
107 changes: 72 additions & 35 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var (
)

type NatsConsumer struct {
sync.Mutex

QueueGroup string `toml:"queue_group"`
Subjects []string `toml:"subjects"`
Servers []string `toml:"servers"`
Expand All @@ -36,7 +38,7 @@ type NatsConsumer struct {
JsSubjects []string `toml:"jetstream_subjects"`
JsStream string `toml:"jetstream_stream"`
PendingMessageLimit int `toml:"pending_message_limit"`
PendingBytesLimit int `toml:"pending_bytes_limit"`
PendingBytesLimit int `toml:"pending_bytes_limit" deprecated:"1.37.0;1.40.0;unused"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm actually this setting is used in line 160, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, result of wrong merge. Will fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm still not fixed I think so unresolving my comment...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you still see it used in current code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well you are removing this feature from the code and then deprecate the function without giving a reason on why you remove this! Furthermore, even if there is a good reason for removal it does not belong to this PR!

MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Log telegraf.Logger `toml:"-"`
tls.ClientConfig
Expand All @@ -48,10 +50,12 @@ type NatsConsumer struct {

parser telegraf.Parser
// channel for all incoming NATS messages
in chan *nats.Msg
in chan *nats.Msg
undelivered map[telegraf.TrackingID]*nats.Msg
// channel for all NATS read errors
errs chan error
acc telegraf.TrackingAccumulator
sem semaphore
wg sync.WaitGroup
cancel context.CancelFunc
}
Expand Down Expand Up @@ -82,7 +86,9 @@ func (n *NatsConsumer) SetParser(parser telegraf.Parser) {

// Start the nats consumer. Caller must call *NatsConsumer.Stop() to clean up.
func (n *NatsConsumer) Start(acc telegraf.Accumulator) error {
n.sem = make(semaphore, n.MaxUndeliveredMessages)
n.acc = acc.WithTracking(n.MaxUndeliveredMessages)
n.undelivered = make(map[telegraf.TrackingID]*nats.Msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

Suggested change
n.undelivered = make(map[telegraf.TrackingID]*nats.Msg)
n.undelivered = make(map[telegraf.TrackingID]*nats.Msg, PendingMessageLimit)

Copy link
Contributor Author

@Hipska Hipska Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very sure, I would use MaxUndeliveredMessages instead if needed.

And I'm starting to think they are actually meaning the same..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I wonder if those two parameters don't mean the same thing. But yeah if the mean different things I agree to use MaxUndeliveredMessages as this is what is meant here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
n.undelivered = make(map[telegraf.TrackingID]*nats.Msg)
n.undelivered = make(map[telegraf.TrackingID]*nats.Msg, n.MaxUndeliveredMessages)


options := []nats.Option{
nats.MaxReconnects(-1),
Expand Down Expand Up @@ -125,17 +131,9 @@ func (n *NatsConsumer) Start(acc telegraf.Accumulator) error {
// Setup message and error channels
n.errs = make(chan error)

n.in = make(chan *nats.Msg, 1000)
n.in = make(chan *nats.Msg, n.PendingMessageLimit)
for _, subj := range n.Subjects {
sub, err := n.conn.QueueSubscribe(subj, n.QueueGroup, func(m *nats.Msg) {
n.in <- m
})
if err != nil {
return err
}

// set the subscription pending limits
err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit)
Comment on lines -137 to -138
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you remove this feature?

sub, err := n.conn.ChanQueueSubscribe(subj, n.QueueGroup, n.in)
if err != nil {
return err
}
Expand All @@ -145,7 +143,9 @@ func (n *NatsConsumer) Start(acc telegraf.Accumulator) error {

if len(n.JsSubjects) > 0 {
var connErr error
var subOptions []nats.SubOpt
subOptions := []nats.SubOpt{
nats.ManualAck(),
}
if n.JsStream != "" {
subOptions = append(subOptions, nats.BindStream(n.JsStream))
}
Expand All @@ -156,15 +156,7 @@ func (n *NatsConsumer) Start(acc telegraf.Accumulator) error {

if n.jsConn != nil {
for _, jsSub := range n.JsSubjects {
sub, err := n.jsConn.QueueSubscribe(jsSub, n.QueueGroup, func(m *nats.Msg) {
n.in <- m
}, subOptions...)
if err != nil {
return err
}

// set the subscription pending limits
err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit)
sub, err := n.jsConn.ChanQueueSubscribe(jsSub, n.QueueGroup, n.in, subOptions...)
if err != nil {
return err
}
Expand All @@ -178,6 +170,13 @@ func (n *NatsConsumer) Start(acc telegraf.Accumulator) error {
ctx, cancel := context.WithCancel(context.Background())
n.cancel = cancel

// Start goroutine to handle delivery notifications from accumulator.
n.wg.Add(1)
go func() {
defer n.wg.Done()
n.waitForDelivery(ctx)
}()

// Start the message reader
n.wg.Add(1)
go func() {
Expand Down Expand Up @@ -212,31 +211,24 @@ func (n *NatsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e erro
// receiver() reads all incoming messages from NATS, and parses them into
// telegraf metrics.
func (n *NatsConsumer) receiver(ctx context.Context) {
sem := make(semaphore, n.MaxUndeliveredMessages)

for {
select {
case <-ctx.Done():
return
case <-n.acc.Delivered():
<-sem
case err := <-n.errs:
n.Log.Error(err)
case sem <- empty{}:
case n.sem <- empty{}:
select {
case <-ctx.Done():
return
case err := <-n.errs:
<-sem
<-n.sem
n.Log.Error(err)
case <-n.acc.Delivered():
<-sem
<-sem
case msg := <-n.in:
metrics, err := n.parser.Parse(msg.Data)
if err != nil {
n.Log.Errorf("Subject: %s, error: %s", msg.Subject, err.Error())
<-sem
n.Log.Errorf("Failed to parse message in subject %s: %v", msg.Subject, err)
<-n.sem
continue
}
if len(metrics) == 0 {
Expand All @@ -247,12 +239,58 @@ func (n *NatsConsumer) receiver(ctx context.Context) {
for _, m := range metrics {
m.AddTag("subject", msg.Subject)
}
n.acc.AddTrackingMetricGroup(metrics)
id := n.acc.AddTrackingMetricGroup(metrics)
for _, s := range n.jsSubs {
if msg.Sub == s {
n.Lock()
n.undelivered[id] = msg
n.Unlock()
break
}
}
}
}
}
}

func (n *NatsConsumer) waitForDelivery(parentCtx context.Context) {
for {
select {
case <-parentCtx.Done():
return
case track := <-n.acc.Delivered():
<-n.sem
msg := n.removeDelivered(track.ID())

if msg != nil {
if track.Delivered() {
err := msg.Ack()
if err != nil {
n.Log.Errorf("Failed to Ack message on subject %s: %v", msg.Subject, err)
}
} else {
err := msg.Nak()
if err != nil {
n.Log.Errorf("Failed to Nak message on subject %s: %v", msg.Subject, err)
}
}
}
}
}
}

func (n *NatsConsumer) removeDelivered(id telegraf.TrackingID) *nats.Msg {
n.Lock()
defer n.Unlock()

msg, ok := n.undelivered[id]
if !ok {
return nil
}
delete(n.undelivered, id)
return msg
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

Suggested change
func (n *NatsConsumer) waitForDelivery(parentCtx context.Context) {
for {
select {
case <-parentCtx.Done():
return
case track := <-n.acc.Delivered():
<-n.sem
msg := n.removeDelivered(track.ID())
if msg != nil {
if track.Delivered() {
err := msg.Ack()
if err != nil {
n.Log.Errorf("Failed to Ack message on subject %s: %v", msg.Subject, err)
}
} else {
err := msg.Nak()
if err != nil {
n.Log.Errorf("Failed to Nak message on subject %s: %v", msg.Subject, err)
}
}
}
}
}
}
func (n *NatsConsumer) removeDelivered(id telegraf.TrackingID) *nats.Msg {
n.Lock()
defer n.Unlock()
msg, ok := n.undelivered[id]
if !ok {
return nil
}
delete(n.undelivered, id)
return msg
}
func (n *NatsConsumer) handleDelivery(ctx context.Context) {
for {
select {
case <-ctx.Done():
// Plugin is stopping
return
case track := <-n.acc.Delivered():
// Get the delivered message and remove it from the internal tracking
// mechanism
n.Lock()
msg, found := n.undelivered[id]
delete(n.undelivered, id)
defer n.Unlock()
// Make space for a new message to be received despite any error case
<-n.sem
if !found {
n.Log.Errorf("received delivery event for unknown message %v", id)
continue
}
// Acknowledge the message
if track.Delivered() {
if err := msg.Ack(); err != nil {
n.Log.Errorf("Failed to Ack message on subject %s: %v", msg.Subject, err)
}
} else {
if err := msg.Nak(); err != nil {
n.Log.Errorf("Failed to Nak message on subject %s: %v", msg.Subject, err)
}
}
}
}
}

I'm not sure we should NAK the message here if this means the message is re-queued to be honest. The background is that if only a single metric is rejected (for whatever reason) the whole NATS message is NAK'ed. If the underlying reason for rejecting the metric is permanent, this may trigger an infinite loop for the message as it will always be NAK'ed and thus may render the input dysfunctional if enough of those messages are consumed (i.e. more than the allowed max pending).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that the whole purpose of tracking metrics?

Copy link
Member

@srebhan srebhan Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is. The purpose of tracking metrics is to allow an input to know if a message was delivered or is "in flight". If you receive a delivery event it means that the metric was processed by the endpoint (it might be a processor as well, just for clarity). The Delivered function tells you if all messages were delivered successfully.

Now there are different ways to handle "unsuccessful" messages. In PR #15796 we implemented NAK'ing messages for AMQP if delivery fails ensuring that the messages are not re-enqueued in the topic.

Coming back to this PR: If delivery fails, i.e. the metric could not be handled by the output plugin (e.g. serialization fails, format is invalid etc or the endpoint got the metric but something went wrong, and you NAK the message, you must make sure that this message is not sent again, otherwise you end up in an infinite loop of getting the message -> delivery fails -> NAK message - back to square one. Now if you not only have one of those metrics but they do appear sporadically, you will end up with a dysfunctional input only being busy with NAK'ed messages...

So please either make sure the NAK'ed messages are not re-enqueued in the topic or do not NAK those messages!


func (n *NatsConsumer) clean() {
for _, sub := range n.subs {
if err := sub.Unsubscribe(); err != nil {
Expand All @@ -279,7 +317,6 @@ func init() {
Servers: []string{"nats://localhost:4222"},
Subjects: []string{"telegraf"},
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
}
Expand Down
29 changes: 23 additions & 6 deletions plugins/inputs/nats_consumer/nats_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func TestIntegrationStartStop(t *testing.T) {
Servers: []string{"nats://" + container.Address + ":" + container.Ports["4222"]},
Subjects: []string{"telegraf"},
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Log: testutil.Logger{},
Expand Down Expand Up @@ -143,7 +142,6 @@ func TestIntegrationSendReceive(t *testing.T) {
Servers: []string{addr},
Subjects: subjects,
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Log: testutil.Logger{},
Expand Down Expand Up @@ -179,6 +177,10 @@ func TestIntegrationSendReceive(t *testing.T) {

actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.IgnoreTime(), testutil.SortMetrics())

plugin.Lock()
defer plugin.Unlock()
require.Empty(t, plugin.undelivered)
})
}
}
Expand Down Expand Up @@ -214,15 +216,15 @@ func TestJetStreamIntegrationSendReceive(t *testing.T) {
require.NoError(t, err)

// Setup the plugin for JetStream
log := testutil.CaptureLogger{}
plugin := &NatsConsumer{
Servers: []string{addr},
JsSubjects: []string{subject},
JsStream: streamName,
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Log: testutil.Logger{},
Log: &log,
}

parser := &influx.Parser{}
Expand Down Expand Up @@ -258,6 +260,23 @@ func TestJetStreamIntegrationSendReceive(t *testing.T) {
),
}
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime(), testutil.SortMetrics())

// Acknowledge the message and check undelivered tracking
log.Clear()
plugin.Lock()
require.Len(t, plugin.undelivered, 1)
plugin.Unlock()
for _, m := range actual {
m.Accept()
}

require.Eventually(t, func() bool {
plugin.Lock()
defer plugin.Unlock()
return len(plugin.undelivered) == 0
}, time.Second, 100*time.Millisecond, "undelivered messages not cleared")

require.Empty(t, log.Messages(), "no warnings or errors should be logged")
}

func TestJetStreamIntegrationSourcedStreamNotFound(t *testing.T) {
Expand Down Expand Up @@ -297,7 +316,6 @@ func TestJetStreamIntegrationSourcedStreamNotFound(t *testing.T) {
Servers: []string{addr},
JsSubjects: []string{"TESTSTREAM"},
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Log: testutil.Logger{},
Expand Down Expand Up @@ -353,7 +371,6 @@ func TestJetStreamIntegrationSourcedStreamFound(t *testing.T) {
JsSubjects: []string{"TESTSTREAM"},
JsStream: streamName,
QueueGroup: "telegraf_consumers",
PendingBytesLimit: nats.DefaultSubPendingBytesLimit,
PendingMessageLimit: nats.DefaultSubPendingMsgsLimit,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Log: testutil.Logger{},
Expand Down
5 changes: 2 additions & 3 deletions plugins/inputs/nats_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Sets the limits for pending msgs and bytes for each subscription
## These shouldn't need to be adjusted except in very high throughput scenarios
## Sets the limit for pending messages for each subscription
## This shouldn't need to be adjusted except in very high throughput scenarios
# pending_message_limit = 65536
# pending_bytes_limit = 67108864

## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
Expand Down
Loading