Skip to content

Commit a1ae377

Browse files
committed
Add release note
1 parent 1f805f1 commit a1ae377

File tree

5 files changed

+34
-19
lines changed

5 files changed

+34
-19
lines changed

comp/core/autodiscovery/providers/datastreams/kafka_actions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (c *actionsController) manageSubscriptionToRC() {
8686
}
8787
c.closeMutex.RUnlock()
8888
if isConnectedToKafka(c.ac) {
89-
c.rcclient.Subscribe(data.ProductDebug, c.update)
89+
c.rcclient.Subscribe(data.ProductDataStreamsKafkaActions, c.update)
9090
return
9191
}
9292
}

pkg/collector/scheduler/scheduler_test.go

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -207,19 +207,31 @@ func TestStopOneTimeSchedule(t *testing.T) {
207207
// RunOnceTestCheck is a test check that can be configured as run-once or regular
208208
type RunOnceTestCheck struct {
209209
stub.StubCheck
210+
mu sync.RWMutex
210211
id string
211212
runOnce bool
212213
intl time.Duration
213214
runCounter int
214215
}
215216

216-
func (c *RunOnceTestCheck) ID() checkid.ID { return checkid.ID(c.id) }
217-
func (c *RunOnceTestCheck) RunOnce() bool { return c.runOnce }
217+
func (c *RunOnceTestCheck) ID() checkid.ID { return checkid.ID(c.id) }
218+
func (c *RunOnceTestCheck) RunOnce() bool {
219+
c.mu.RLock()
220+
defer c.mu.RUnlock()
221+
return c.runOnce
222+
}
218223
func (c *RunOnceTestCheck) Interval() time.Duration { return c.intl }
219224
func (c *RunOnceTestCheck) Run() error {
225+
c.mu.Lock()
226+
defer c.mu.Unlock()
220227
c.runCounter++
221228
return nil
222229
}
230+
func (c *RunOnceTestCheck) GetRunCounter() int {
231+
c.mu.RLock()
232+
defer c.mu.RUnlock()
233+
return c.runCounter
234+
}
223235

224236
func TestRunOnceCheckDescheduling(t *testing.T) {
225237
checkChan := make(chan check.Check, 100)
@@ -269,30 +281,29 @@ func TestRunOnceCheckDescheduling(t *testing.T) {
269281
runOnceCheck := &RunOnceTestCheck{
270282
id: "run-once-check",
271283
runOnce: true,
284+
intl: 1 * time.Second,
272285
}
273-
runOnceCheck.intl = 1 * time.Second
274286

275287
regularCheck := &RunOnceTestCheck{
276288
id: "regular-check",
277289
runOnce: false,
290+
intl: 1 * time.Second,
278291
}
279-
regularCheck.intl = 1 * time.Second
292+
293+
// Manually create a job queue for 1 second interval and replace its ticker
294+
// This must be done BEFORE calling Enter to avoid race conditions
295+
jq := newJobQueue(1 * time.Second)
296+
tickerChan := make(chan time.Time, 10)
297+
jq.bucketTicker.Stop()
298+
jq.bucketTicker = &time.Ticker{C: tickerChan}
299+
s.jobQueues[1*time.Second] = jq
280300

281301
// Schedule both checks
282302
err := s.Enter(runOnceCheck)
283303
require.NoError(t, err)
284304
err = s.Enter(regularCheck)
285305
require.NoError(t, err)
286306

287-
// Verify both checks are in the queue
288-
jq := s.jobQueues[1*time.Second]
289-
require.NotNil(t, jq)
290-
291-
// Replace the ticker's channel with one we control
292-
jq.bucketTicker.Stop()
293-
tickerChan := make(chan time.Time, 10)
294-
jq.bucketTicker = &time.Ticker{C: tickerChan}
295-
296307
// Start the scheduler
297308
s.Run()
298309

pkg/config/remote/data/product.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ const (
3838
ProductContainerAutoscalingValues = "CONTAINER_AUTOSCALING_VALUES"
3939
// ProductDataStreamsLiveMessages is to capture messages from Kafka
4040
ProductDataStreamsLiveMessages = "DSM_LIVE_MESSAGES"
41-
// ProductDebug is for debugging and testing remote configuration
42-
ProductDebug = "DEBUG"
41+
// ProductDataStreamsKafkaActions is to execute Kafka actions remotely
42+
ProductDataStreamsKafkaActions = "DSM_KAFKA_ACTIONS"
4343
)
4444

4545
// ProductListToString converts a product list to string list

pkg/remoteconfig/state/products.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ var validProducts = map[string]struct{}{
3737
ProductNDMDeviceProfilesCustom: {},
3838
ProductMetricControl: {},
3939
ProductDataStreamsLiveMessages: {},
40+
ProductDataStreamsKafkaActions: {},
4041
ProductLiveDebuggingSymbolDB: {},
4142
ProductGradualRollout: {},
4243
ProductApmPolicies: {},
4344
ProductSyntheticsTest: {},
4445
ProductBTFDD: {},
4546
ProductFFEFlags: {},
46-
ProductDebug: {},
4747
}
4848

4949
const (
@@ -114,6 +114,8 @@ const (
114114
ProductMetricControl = "METRIC_CONTROL"
115115
// ProductDataStreamsLiveMessages is used for capturing messages from Kafka
116116
ProductDataStreamsLiveMessages = "DSM_LIVE_MESSAGES"
117+
// ProductDataStreamsKafkaActions is used for executing Kafka actions remotely
118+
ProductDataStreamsKafkaActions = "DSM_KAFKA_ACTIONS"
117119
// ProductGradualRollout tracks the latest stable release versions for K8s gradual rollout.
118120
ProductGradualRollout = "K8S_INJECTION_DD"
119121
// ProductBTFDD accesses a BTF catalog used when the kernel is newer than the system-probe has bundled support for
@@ -122,6 +124,4 @@ const (
122124
ProductApmPolicies = "APM_POLICIES"
123125
// ProductFFEFlags is used for feature flagging experiments remote updates
124126
ProductFFEFlags = "FFE_FLAGS"
125-
// ProductDebug is for debugging and testing remote configuration
126-
ProductDebug = "DEBUG"
127127
)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
features:
2+
- Checks can be scheduled only once with run_once configuration
3+
- Data Streams Kafka actions perform actions on Kafka clusters
4+

0 commit comments

Comments
 (0)