Skip to content

Commit 1f805f1

Browse files
committed
Add kafka actions check scheduling
1 parent f42d155 commit 1f805f1

File tree

23 files changed

+739
-13
lines changed

23 files changed

+739
-13
lines changed

cmd/agent/subcommands/run/command.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -613,8 +613,10 @@ func startAgent(
613613
if configUtils.IsRemoteConfigEnabled(cfg) {
614614
// Subscribe to `AGENT_TASK` product
615615
rcclient.SubscribeAgentTask()
616-
controller := datastreams.NewController(ac, rcclient)
617-
ac.AddConfigProvider(controller, false, 0)
616+
liveMessagesController := datastreams.NewController(ac, rcclient)
617+
ac.AddConfigProvider(liveMessagesController, false, 0)
618+
actionsController := datastreams.NewActionsController(ac, rcclient)
619+
ac.AddConfigProvider(actionsController, false, 0)
618620

619621
if pkgconfigsetup.Datadog().GetBool("remote_configuration.agent_integrations.enabled") {
620622
// Spin up the config provider to schedule integrations through remote-config

comp/collector/collector/collectorimpl/collector_demux_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ type cancelledCheck struct {
256256
demux demultiplexer.FakeSamplerMock
257257
}
258258

259+
func (c *cancelledCheck) RunOnce() bool { return false }
259260
func (c *cancelledCheck) Run() error {
260261
c.flip <- struct{}{}
261262

comp/collector/collector/collectorimpl/collector_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func (c *TestCheck) Stop() { c.stop <- true }
4848
func (c *TestCheck) Cancel() { c.Called() }
4949
func (c *TestCheck) Interval() time.Duration { return 1 * time.Minute }
5050
func (c *TestCheck) Run() error { <-c.stop; return nil }
51+
func (c *TestCheck) RunOnce() bool { return false }
5152
func (c *TestCheck) ID() checkid.ID {
5253
if c.uniqueID != "" {
5354
return c.uniqueID

comp/collector/collector/collectorimpl/internal/middleware/check_wrapper.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,8 @@ func (c *CheckWrapper) GetDiagnoses() ([]diagnose.Diagnosis, error) {
160160
func (c *CheckWrapper) IsHASupported() bool {
161161
return c.inner.IsHASupported()
162162
}
163+
164+
// RunOnce returns true if the inner check should run only once
165+
func (c *CheckWrapper) RunOnce() bool {
166+
return c.inner.RunOnce()
167+
}
Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2020-present Datadog, Inc.
5+
6+
// Package datastreams contains logic to configure actions for Kafka via remote configuration
7+
package datastreams
8+
9+
import (
10+
"context"
11+
"encoding/json"
12+
"fmt"
13+
"sync"
14+
"time"
15+
16+
"github.com/DataDog/datadog-agent/comp/core/autodiscovery"
17+
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
18+
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/names"
19+
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/types"
20+
"github.com/DataDog/datadog-agent/comp/remote-config/rcclient"
21+
"github.com/DataDog/datadog-agent/pkg/config/remote/data"
22+
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
23+
"github.com/DataDog/datadog-agent/pkg/trace/traceutil/normalize"
24+
"github.com/DataDog/datadog-agent/pkg/util/log"
25+
yaml "gopkg.in/yaml.v2"
26+
)
27+
28+
const (
29+
kafkaConsumerIntegrationName = "kafka_consumer"
30+
kafkaActionsIntegrationName = "kafka_actions"
31+
)
32+
33+
// isConnectedToKafka checks if any kafka_consumer integration is configured
34+
func isConnectedToKafka(ac autodiscovery.Component) bool {
35+
for _, config := range ac.GetUnresolvedConfigs() {
36+
if config.Name == kafkaConsumerIntegrationName {
37+
return true
38+
}
39+
}
40+
return false
41+
}
42+
43+
func normalizeBootstrapServers(servers string) string {
44+
if servers == "" {
45+
return ""
46+
}
47+
return normalize.NormalizeTagValue(servers)
48+
}
49+
50+
type kafkaActionsConfig struct {
51+
Actions json.RawMessage `json:"actions"`
52+
BootstrapServers string `json:"bootstrap_servers"`
53+
}
54+
55+
// actionsController listens to remote configuration updates for Kafka actions
56+
// and schedules one-off kafka_actions checks.
57+
type actionsController struct {
58+
ac autodiscovery.Component
59+
rcclient rcclient.Component
60+
configChanges chan integration.ConfigChanges
61+
closeMutex sync.RWMutex
62+
closed bool
63+
}
64+
65+
// String returns the name of the provider. All Config instances produced
66+
// by this provider will have this value in their Provider field.
67+
func (c *actionsController) String() string {
68+
return names.DataStreamsLiveMessages
69+
}
70+
71+
// GetConfigErrors returns a map of errors that occurred on the last Collect
72+
// call, indexed by a description of the resource that generated the error.
73+
// The result is displayed in diagnostic tools such as `agent status`.
74+
func (c *actionsController) GetConfigErrors() map[string]types.ErrorMsgSet {
75+
return map[string]types.ErrorMsgSet{}
76+
}
77+
78+
func (c *actionsController) manageSubscriptionToRC() {
79+
ticker := time.NewTicker(time.Second * 10)
80+
defer ticker.Stop()
81+
for range ticker.C {
82+
c.closeMutex.RLock()
83+
if c.closed {
84+
c.closeMutex.RUnlock()
85+
return
86+
}
87+
c.closeMutex.RUnlock()
88+
if isConnectedToKafka(c.ac) {
89+
c.rcclient.Subscribe(data.ProductDebug, c.update)
90+
return
91+
}
92+
}
93+
}
94+
95+
// NewActionsController creates a new Kafka actions controller instance
96+
func NewActionsController(ac autodiscovery.Component, rcclient rcclient.Component) types.ConfigProvider {
97+
c := &actionsController{
98+
ac: ac,
99+
rcclient: rcclient,
100+
configChanges: make(chan integration.ConfigChanges, 10),
101+
}
102+
c.configChanges <- integration.ConfigChanges{}
103+
go c.manageSubscriptionToRC()
104+
return c
105+
}
106+
107+
// Stream starts sending configuration updates for the kafka_actions integration to the output channel.
108+
func (c *actionsController) Stream(ctx context.Context) <-chan integration.ConfigChanges {
109+
go func() {
110+
<-ctx.Done()
111+
c.closeMutex.Lock()
112+
defer c.closeMutex.Unlock()
113+
if c.closed {
114+
return
115+
}
116+
c.closed = true
117+
close(c.configChanges)
118+
}()
119+
return c.configChanges
120+
}
121+
122+
func (c *actionsController) update(updates map[string]state.RawConfig, applyStateCallback func(string, state.ApplyStatus)) {
123+
remoteConfigs := parseActionsConfig(updates, applyStateCallback)
124+
if len(remoteConfigs) == 0 {
125+
return
126+
}
127+
cfgs := c.ac.GetUnresolvedConfigs()
128+
changes := integration.ConfigChanges{}
129+
for _, parsed := range remoteConfigs {
130+
auth, base, err := extractKafkaAuthFromInstance(cfgs, parsed.bootstrapServers)
131+
if err != nil {
132+
log.Errorf("Failed to extract Kafka auth for config %s: %v", parsed.path, err)
133+
applyStateCallback(parsed.path, state.ApplyStatus{State: state.ApplyStateError, Error: err.Error()})
134+
continue
135+
}
136+
137+
newCfg := integration.Config{
138+
Name: kafkaActionsIntegrationName,
139+
Source: c.String(),
140+
Instances: []integration.Data{},
141+
InitConfig: nil,
142+
LogsConfig: nil,
143+
Provider: base.Provider,
144+
NodeName: base.NodeName,
145+
}
146+
147+
var actionsMap map[string]any
148+
if err := json.Unmarshal(parsed.actionsJSON, &actionsMap); err != nil {
149+
log.Errorf("Failed to unmarshal actions JSON for config %s: %v", parsed.path, err)
150+
applyStateCallback(parsed.path, state.ApplyStatus{State: state.ApplyStateError, Error: err.Error()})
151+
continue
152+
}
153+
154+
actionsMap["run_once"] = true
155+
for k, v := range auth {
156+
actionsMap[k] = v
157+
}
158+
159+
payload, err := yaml.Marshal(actionsMap)
160+
if err != nil {
161+
log.Errorf("Failed to marshal instance config for %s: %v", parsed.path, err)
162+
applyStateCallback(parsed.path, state.ApplyStatus{State: state.ApplyStateError, Error: err.Error()})
163+
continue
164+
}
165+
166+
newCfg.Instances = []integration.Data{integration.Data(payload)}
167+
changes.Schedule = append(changes.Schedule, newCfg)
168+
applyStateCallback(parsed.path, state.ApplyStatus{State: state.ApplyStateAcknowledged})
169+
}
170+
if len(changes.Schedule) == 0 {
171+
return
172+
}
173+
c.closeMutex.RLock()
174+
defer c.closeMutex.RUnlock()
175+
if c.closed {
176+
return
177+
}
178+
c.configChanges <- changes
179+
}
180+
181+
type parsedActionsConfig struct {
182+
path string
183+
bootstrapServers string
184+
actionsJSON json.RawMessage
185+
}
186+
187+
func parseActionsConfig(updates map[string]state.RawConfig, applyStateCallback func(string, state.ApplyStatus)) []parsedActionsConfig {
188+
var configs []parsedActionsConfig
189+
for path, rawConfig := range updates {
190+
var cfg kafkaActionsConfig
191+
err := json.Unmarshal(rawConfig.Config, &cfg)
192+
if err != nil {
193+
log.Errorf("Can't decode kafka actions configuration from remote-config: %v", err)
194+
applyStateCallback(path, state.ApplyStatus{State: state.ApplyStateError, Error: err.Error()})
195+
continue
196+
}
197+
if len(cfg.Actions) == 0 {
198+
applyStateCallback(path, state.ApplyStatus{State: state.ApplyStateError, Error: "missing actions"})
199+
continue
200+
}
201+
202+
bootstrapServers := normalizeBootstrapServers(cfg.BootstrapServers)
203+
204+
configs = append(configs, parsedActionsConfig{
205+
path: path,
206+
bootstrapServers: bootstrapServers,
207+
actionsJSON: cfg.Actions,
208+
})
209+
}
210+
return configs
211+
}
212+
213+
func extractKafkaAuthFromInstance(cfgs []integration.Config, bootstrapServers string) (map[string]any, *integration.Config, error) {
214+
out := make(map[string]any)
215+
216+
for cfgIdx := range cfgs {
217+
cfg := cfgs[cfgIdx]
218+
if cfg.Name != kafkaConsumerIntegrationName {
219+
continue
220+
}
221+
222+
// This is a special case, to be deleted if matching by bootstrap_servers works perfectly.
223+
// It is a fallback in case matching by bootstrap_servers fails in some cases.
224+
if bootstrapServers == "" {
225+
if len(cfg.Instances) > 0 {
226+
auth := extractAuthFromInstanceData(cfg.Instances[0])
227+
return auth, &cfg, nil
228+
}
229+
continue
230+
}
231+
232+
for _, instanceData := range cfg.Instances {
233+
var instanceMap map[string]any
234+
if err := yaml.Unmarshal(instanceData, &instanceMap); err != nil {
235+
continue
236+
}
237+
238+
connectStr, ok := instanceMap["kafka_connect_str"].(string)
239+
if !ok || connectStr == "" {
240+
continue
241+
}
242+
243+
if normalizeBootstrapServers(connectStr) == bootstrapServers {
244+
auth := extractAuthFromInstanceData(instanceData)
245+
return auth, &cfg, nil
246+
}
247+
}
248+
}
249+
250+
if bootstrapServers == "" {
251+
return out, nil, fmt.Errorf("kafka_consumer integration not found on this node")
252+
}
253+
return out, nil, fmt.Errorf("kafka_consumer integration with bootstrap_servers=%s not found", bootstrapServers)
254+
}
255+
256+
func extractAuthFromInstanceData(instanceData integration.Data) map[string]any {
257+
out := make(map[string]any)
258+
raw := map[string]interface{}{}
259+
if err := yaml.Unmarshal(instanceData, &raw); err != nil {
260+
return out
261+
}
262+
263+
allowList := []string{
264+
"kafka_connect_str",
265+
"security_protocol",
266+
"sasl_mechanism",
267+
"sasl_plain_username",
268+
"sasl_plain_password",
269+
"sasl_kerberos_keytab",
270+
"sasl_kerberos_principal",
271+
"sasl_kerberos_service_name",
272+
"sasl_kerberos_domain_name",
273+
"tls_verify",
274+
"tls_ca_cert",
275+
"tls_cert",
276+
"tls_private_key",
277+
"tls_private_key_password",
278+
"tls_validate_hostname",
279+
"tls_ciphers",
280+
"tls_crlfile",
281+
"sasl_oauth_token_provider",
282+
}
283+
for _, k := range allowList {
284+
if v, ok := raw[k]; ok {
285+
if m, okm := v.(map[interface{}]interface{}); okm {
286+
strMap := make(map[string]interface{}, len(m))
287+
for kk, vv := range m {
288+
strMap[fmt.Sprint(kk)] = vv
289+
}
290+
out[k] = strMap
291+
} else {
292+
out[k] = v
293+
}
294+
}
295+
}
296+
return out
297+
}

0 commit comments

Comments
 (0)