Skip to content

Commit e60b3c8

Browse files
committed
Add kafka actions check scheduling
1 parent 4edd904 commit e60b3c8

File tree

16 files changed

+733
-14
lines changed

16 files changed

+733
-14
lines changed

cmd/agent/subcommands/run/command.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -609,10 +609,11 @@ func startAgent(
609609

610610
// start remote configuration management
611611
if configUtils.IsRemoteConfigEnabled(cfg) {
612-
// Subscribe to `AGENT_TASK` product
613612
rcclient.SubscribeAgentTask()
614-
controller := datastreams.NewController(ac, rcclient)
615-
ac.AddConfigProvider(controller, false, 0)
613+
liveMessagesController := datastreams.NewController(ac, rcclient)
614+
ac.AddConfigProvider(liveMessagesController, false, 0)
615+
actionsController := datastreams.NewActionsController(ac, rcclient)
616+
ac.AddConfigProvider(actionsController, false, 0)
616617

617618
if pkgconfigsetup.Datadog().GetBool("remote_configuration.agent_integrations.enabled") {
618619
// Spin up the config provider to schedule integrations through remote-config

comp/collector/collector/collectorimpl/internal/middleware/check_wrapper.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,8 @@ func (c *CheckWrapper) GetDiagnoses() ([]diagnose.Diagnosis, error) {
160160
func (c *CheckWrapper) IsHASupported() bool {
161161
return c.inner.IsHASupported()
162162
}
163+
164+
// RunOnce returns true if the inner check should run only once
165+
func (c *CheckWrapper) RunOnce() bool {
166+
return c.inner.RunOnce()
167+
}
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2020-present Datadog, Inc.
5+
6+
// Package datastreams contains logic to configure actions for Kafka via remote configuration
7+
package datastreams
8+
9+
import (
10+
"context"
11+
"encoding/json"
12+
"fmt"
13+
"sync"
14+
"time"
15+
16+
"github.com/DataDog/datadog-agent/comp/core/autodiscovery"
17+
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
18+
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/names"
19+
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/types"
20+
"github.com/DataDog/datadog-agent/comp/remote-config/rcclient"
21+
"github.com/DataDog/datadog-agent/pkg/config/remote/data"
22+
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
23+
"github.com/DataDog/datadog-agent/pkg/trace/traceutil/normalize"
24+
"github.com/DataDog/datadog-agent/pkg/util/log"
25+
yaml "gopkg.in/yaml.v2"
26+
)
27+
28+
const (
29+
kafkaConsumerIntegrationName = "kafka_consumer"
30+
kafkaActionsIntegrationName = "kafka_actions"
31+
)
32+
33+
// isConnectedToKafka checks if any kafka_consumer integration is configured
34+
func isConnectedToKafka(ac autodiscovery.Component) bool {
35+
for _, config := range ac.GetUnresolvedConfigs() {
36+
if config.Name == kafkaConsumerIntegrationName {
37+
return true
38+
}
39+
}
40+
return false
41+
}
42+
43+
func normalizeBootstrapServers(servers string) string {
44+
if servers == "" {
45+
return ""
46+
}
47+
return normalize.NormalizeTagValue(servers)
48+
}
49+
50+
type kafkaActionsConfig struct {
51+
Actions json.RawMessage `json:"actions"`
52+
BootstrapServers string `json:"bootstrap_servers"`
53+
}
54+
55+
// actionsController listens to remote configuration updates for Kafka actions
56+
// and schedules one-off kafka_actions checks.
57+
type actionsController struct {
58+
ac autodiscovery.Component
59+
rcclient rcclient.Component
60+
configChanges chan integration.ConfigChanges
61+
closeMutex sync.RWMutex
62+
closed bool
63+
}
64+
65+
// String returns the name of the provider. All Config instances produced
66+
// by this provider will have this value in their Provider field.
67+
func (c *actionsController) String() string {
68+
return names.DataStreamsLiveMessages
69+
}
70+
71+
// GetConfigErrors returns a map of errors that occurred on the last Collect
72+
// call, indexed by a description of the resource that generated the error.
73+
// The result is displayed in diagnostic tools such as `agent status`.
74+
func (c *actionsController) GetConfigErrors() map[string]types.ErrorMsgSet {
75+
return map[string]types.ErrorMsgSet{}
76+
}
77+
78+
func (c *actionsController) manageSubscriptionToRC() {
79+
ticker := time.NewTicker(time.Second * 10)
80+
defer ticker.Stop()
81+
for range ticker.C {
82+
c.closeMutex.RLock()
83+
if c.closed {
84+
c.closeMutex.RUnlock()
85+
return
86+
}
87+
c.closeMutex.RUnlock()
88+
if isConnectedToKafka(c.ac) {
89+
c.rcclient.Subscribe(data.ProductDebug, c.update)
90+
return
91+
}
92+
}
93+
}
94+
95+
// NewActionsController creates a new Kafka actions controller instance
96+
func NewActionsController(ac autodiscovery.Component, rcclient rcclient.Component) types.ConfigProvider {
97+
c := &actionsController{
98+
ac: ac,
99+
rcclient: rcclient,
100+
configChanges: make(chan integration.ConfigChanges, 10),
101+
}
102+
c.configChanges <- integration.ConfigChanges{}
103+
go c.manageSubscriptionToRC()
104+
return c
105+
}
106+
107+
// Stream starts sending configuration updates for the kafka_actions integration to the output channel.
108+
func (c *actionsController) Stream(ctx context.Context) <-chan integration.ConfigChanges {
109+
go func() {
110+
<-ctx.Done()
111+
c.closeMutex.Lock()
112+
defer c.closeMutex.Unlock()
113+
if c.closed {
114+
return
115+
}
116+
c.closed = true
117+
close(c.configChanges)
118+
}()
119+
return c.configChanges
120+
}
121+
122+
func (c *actionsController) update(updates map[string]state.RawConfig, applyStateCallback func(string, state.ApplyStatus)) {
123+
remoteConfigs := parseActionsConfig(updates, applyStateCallback)
124+
if len(remoteConfigs) == 0 {
125+
return
126+
}
127+
cfgs := c.ac.GetUnresolvedConfigs()
128+
changes := integration.ConfigChanges{}
129+
for _, parsed := range remoteConfigs {
130+
auth, base, err := extractKafkaAuthFromInstance(cfgs, parsed.bootstrapServers)
131+
if err != nil {
132+
log.Errorf("Failed to extract Kafka auth for config %s: %v", parsed.path, err)
133+
applyStateCallback(parsed.path, state.ApplyStatus{State: state.ApplyStateError, Error: err.Error()})
134+
continue
135+
}
136+
137+
newCfg := integration.Config{
138+
Name: kafkaActionsIntegrationName,
139+
Source: c.String(),
140+
Instances: []integration.Data{},
141+
InitConfig: nil,
142+
LogsConfig: nil,
143+
Provider: base.Provider,
144+
NodeName: base.NodeName,
145+
}
146+
147+
var actionsMap map[string]any
148+
if err := json.Unmarshal(parsed.actionsJSON, &actionsMap); err != nil {
149+
log.Errorf("Failed to unmarshal actions JSON for config %s: %v", parsed.path, err)
150+
applyStateCallback(parsed.path, state.ApplyStatus{State: state.ApplyStateError, Error: err.Error()})
151+
continue
152+
}
153+
154+
actionsMap["run_once"] = true
155+
for k, v := range auth {
156+
actionsMap[k] = v
157+
}
158+
159+
payload, err := yaml.Marshal(actionsMap)
160+
if err != nil {
161+
log.Errorf("Failed to marshal instance config for %s: %v", parsed.path, err)
162+
applyStateCallback(parsed.path, state.ApplyStatus{State: state.ApplyStateError, Error: err.Error()})
163+
continue
164+
}
165+
166+
newCfg.Instances = []integration.Data{integration.Data(payload)}
167+
changes.Schedule = append(changes.Schedule, newCfg)
168+
applyStateCallback(parsed.path, state.ApplyStatus{State: state.ApplyStateAcknowledged})
169+
}
170+
if len(changes.Schedule) == 0 {
171+
return
172+
}
173+
c.closeMutex.RLock()
174+
defer c.closeMutex.RUnlock()
175+
if c.closed {
176+
return
177+
}
178+
c.configChanges <- changes
179+
}
180+
181+
type parsedActionsConfig struct {
182+
path string
183+
bootstrapServers string
184+
actionsJSON json.RawMessage
185+
}
186+
187+
func parseActionsConfig(updates map[string]state.RawConfig, applyStateCallback func(string, state.ApplyStatus)) []parsedActionsConfig {
188+
var configs []parsedActionsConfig
189+
for path, rawConfig := range updates {
190+
var cfg kafkaActionsConfig
191+
err := json.Unmarshal(rawConfig.Config, &cfg)
192+
if err != nil {
193+
log.Errorf("Can't decode kafka actions configuration from remote-config: %v", err)
194+
applyStateCallback(path, state.ApplyStatus{State: state.ApplyStateError, Error: err.Error()})
195+
continue
196+
}
197+
if len(cfg.Actions) == 0 {
198+
applyStateCallback(path, state.ApplyStatus{State: state.ApplyStateError, Error: "missing actions"})
199+
continue
200+
}
201+
202+
bootstrapServers := ""
203+
if cfg.BootstrapServers != "" {
204+
bootstrapServers = normalizeBootstrapServers(cfg.BootstrapServers)
205+
}
206+
207+
configs = append(configs, parsedActionsConfig{
208+
path: path,
209+
bootstrapServers: bootstrapServers,
210+
actionsJSON: cfg.Actions,
211+
})
212+
}
213+
return configs
214+
}
215+
216+
func extractKafkaAuthFromInstance(cfgs []integration.Config, bootstrapServers string) (map[string]any, *integration.Config, error) {
217+
out := make(map[string]any)
218+
219+
for cfgIdx := range cfgs {
220+
cfg := cfgs[cfgIdx]
221+
if cfg.Name != kafkaConsumerIntegrationName {
222+
continue
223+
}
224+
225+
if bootstrapServers == "" {
226+
if len(cfg.Instances) > 0 {
227+
auth := extractAuthFromInstanceData(cfg.Instances[0])
228+
return auth, &cfg, nil
229+
}
230+
continue
231+
}
232+
233+
for _, instanceData := range cfg.Instances {
234+
var instanceMap map[string]any
235+
if err := yaml.Unmarshal(instanceData, &instanceMap); err != nil {
236+
continue
237+
}
238+
239+
connectStr, ok := instanceMap["kafka_connect_str"].(string)
240+
if !ok || connectStr == "" {
241+
continue
242+
}
243+
244+
if normalizeBootstrapServers(connectStr) == bootstrapServers {
245+
auth := extractAuthFromInstanceData(instanceData)
246+
return auth, &cfg, nil
247+
}
248+
}
249+
}
250+
251+
if bootstrapServers == "" {
252+
return out, nil, fmt.Errorf("kafka_consumer integration not found on this node")
253+
}
254+
return out, nil, fmt.Errorf("kafka_consumer integration with bootstrap_servers=%s not found", bootstrapServers)
255+
}
256+
257+
func extractAuthFromInstanceData(instanceData integration.Data) map[string]any {
258+
out := make(map[string]any)
259+
raw := map[string]interface{}{}
260+
if err := yaml.Unmarshal(instanceData, &raw); err != nil {
261+
return out
262+
}
263+
264+
allowList := []string{
265+
"kafka_connect_str",
266+
"security_protocol",
267+
"sasl_mechanism",
268+
"sasl_plain_username",
269+
"sasl_plain_password",
270+
"sasl_kerberos_keytab",
271+
"sasl_kerberos_principal",
272+
"sasl_kerberos_service_name",
273+
"sasl_kerberos_domain_name",
274+
"tls_verify",
275+
"tls_ca_cert",
276+
"tls_cert",
277+
"tls_private_key",
278+
"tls_private_key_password",
279+
"tls_validate_hostname",
280+
"tls_ciphers",
281+
"tls_crlfile",
282+
"sasl_oauth_token_provider",
283+
}
284+
for _, k := range allowList {
285+
if v, ok := raw[k]; ok {
286+
if m, okm := v.(map[interface{}]interface{}); okm {
287+
strMap := make(map[string]interface{}, len(m))
288+
for kk, vv := range m {
289+
strMap[fmt.Sprint(kk)] = vv
290+
}
291+
out[k] = strMap
292+
} else {
293+
out[k] = v
294+
}
295+
}
296+
}
297+
return out
298+
}

0 commit comments

Comments
 (0)