Skip to content

Commit 8b9042a

Browse files
authored
fix: fix incremental sliding delay window (#3417)
Signed-off-by: Song Gao <[email protected]>
1 parent cc2ff56 commit 8b9042a

File tree

2 files changed

+28
-77
lines changed

2 files changed

+28
-77
lines changed

internal/topo/node/window_inc_agg_op.go

Lines changed: 20 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,6 @@ type SlidingWindowIncAggOp struct {
293293
triggerCondition ast.Expr
294294
Length time.Duration
295295
Delay time.Duration
296-
delayWindowList []*IncAggWindow
297296
currWindowList []*IncAggWindow
298297
taskCh chan *IncAggOpTask
299298
}
@@ -309,7 +308,6 @@ func NewSlidingWindowIncAggOp(o *WindowIncAggOperator) *SlidingWindowIncAggOp {
309308
Length: o.windowConfig.Length,
310309
Delay: o.windowConfig.Delay,
311310
currWindowList: make([]*IncAggWindow, 0),
312-
delayWindowList: make([]*IncAggWindow, 0),
313311
taskCh: make(chan *IncAggOpTask, 1024),
314312
}
315313
return op
@@ -329,49 +327,31 @@ func (so *SlidingWindowIncAggOp) exec(ctx api.StreamContext, errCh chan<- error)
329327
}
330328
switch row := data.(type) {
331329
case *xsql.Tuple:
332-
so.currWindowList = gcIncAggWindow(so.currWindowList, so.Length, now)
333-
if so.Delay > 0 {
334-
so.appendDelayIncAggWindow(ctx, errCh, fv, row, now)
335-
continue
336-
}
330+
so.currWindowList = gcIncAggWindow(so.currWindowList, so.Length+so.Delay, now)
337331
so.appendIncAggWindow(ctx, errCh, fv, row, now)
338332
if len(so.currWindowList) > 0 && so.isMatchCondition(ctx, fv, row) {
339-
so.emit(ctx, errCh, so.currWindowList[0], now)
333+
if so.Delay > 0 {
334+
t := &IncAggOpTask{}
335+
go func(task *IncAggOpTask) {
336+
after := timex.After(so.Delay)
337+
select {
338+
case <-ctx.Done():
339+
return
340+
case <-after:
341+
so.taskCh <- task
342+
}
343+
}(t)
344+
} else {
345+
so.emit(ctx, errCh, so.currWindowList[0], now)
346+
}
340347
}
341348
}
342-
case task := <-so.taskCh:
349+
case <-so.taskCh:
343350
now := timex.GetNow()
344-
window := task.window
345-
so.removeDelayWindow(window)
346-
so.emit(ctx, errCh, window, now)
347-
}
348-
}
349-
}
350-
351-
func (so *SlidingWindowIncAggOp) removeDelayWindow(window *IncAggWindow) {
352-
if len(so.delayWindowList) == 0 {
353-
return
354-
}
355-
if len(so.delayWindowList) == 1 {
356-
if so.delayWindowList[0] == window {
357-
so.delayWindowList = make([]*IncAggWindow, 0)
358-
}
359-
return
360-
}
361-
if so.delayWindowList[0] == window {
362-
so.delayWindowList = so.delayWindowList[1:]
363-
return
364-
}
365-
if so.delayWindowList[len(so.delayWindowList)-1] == window {
366-
so.delayWindowList = so.delayWindowList[:len(so.delayWindowList)-1]
367-
return
368-
}
369-
for index, w := range so.delayWindowList {
370-
if w == window {
371-
left := so.delayWindowList[:index]
372-
right := so.delayWindowList[index+1:]
373-
so.delayWindowList = append(left, right...)
374-
return
351+
so.currWindowList = gcIncAggWindow(so.currWindowList, so.Length+so.Delay, now)
352+
if len(so.currWindowList) > 0 {
353+
so.emit(ctx, errCh, so.currWindowList[0], now)
354+
}
375355
}
376356
}
377357
}
@@ -384,33 +364,6 @@ func (so *SlidingWindowIncAggOp) appendIncAggWindow(ctx api.StreamContext, errCh
384364
}
385365
}
386366

387-
func (so *SlidingWindowIncAggOp) appendDelayIncAggWindow(ctx api.StreamContext, errCh chan<- error, fv *xsql.FunctionValuer, row *xsql.Tuple, now time.Time) {
388-
name := calDimension(fv, so.Dimensions, row)
389-
isMatched := so.isMatchCondition(ctx, fv, row)
390-
var newDelayWindow *IncAggWindow
391-
if isMatched {
392-
newDelayWindow = newIncAggWindow(ctx, now)
393-
so.delayWindowList = append(so.delayWindowList, newDelayWindow)
394-
}
395-
for _, incWindow := range so.delayWindowList {
396-
if incWindow.StartTime.Add(so.Length).After(now) {
397-
incAggCal(ctx, name, row, incWindow, so.aggFields)
398-
}
399-
}
400-
if isMatched {
401-
t := &IncAggOpTask{window: newDelayWindow}
402-
go func(task *IncAggOpTask) {
403-
after := timex.After(so.Delay)
404-
select {
405-
case <-ctx.Done():
406-
return
407-
case <-after:
408-
so.taskCh <- task
409-
}
410-
}(t)
411-
}
412-
}
413-
414367
func (so *SlidingWindowIncAggOp) emit(ctx api.StreamContext, errCh chan<- error, window *IncAggWindow, now time.Time) {
415368
results := &xsql.WindowTuples{
416369
Content: make([]xsql.Row, 0),

internal/topo/node/window_inc_agg_op_test.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ func TestIncAggSlidingWindowDelay(t *testing.T) {
309309
kv, err := store.GetKV("stream")
310310
require.NoError(t, err)
311311
require.NoError(t, prepareStream())
312-
sql := "select count(*) from stream group by slidingWindow(ss,1,3)"
312+
sql := "select count(*) from stream group by slidingWindow(ss,1,1)"
313313
stmt, err := xsql.NewParser(strings.NewReader(sql)).Parse()
314314
require.NoError(t, err)
315315
p, err := planner.CreateLogicalPlan(stmt, &def.RuleOption{
@@ -325,7 +325,7 @@ func TestIncAggSlidingWindowDelay(t *testing.T) {
325325
op, err := node.NewWindowIncAggOp("1", &node.WindowConfig{
326326
Type: incPlan.WType,
327327
Length: time.Second,
328-
Delay: 3 * time.Second,
328+
Delay: time.Second,
329329
}, incPlan.Dimensions, incPlan.IncAggFuncs, o)
330330
require.NoError(t, err)
331331
require.NotNil(t, op)
@@ -338,18 +338,16 @@ func TestIncAggSlidingWindowDelay(t *testing.T) {
338338
waitExecute()
339339
input <- &xsql.Tuple{Message: map[string]any{"a": int64(1)}}
340340
waitExecute()
341-
timex.Add(300 * time.Millisecond)
341+
timex.Add(500 * time.Millisecond)
342342
waitExecute()
343343
input <- &xsql.Tuple{Message: map[string]any{"a": int64(2)}}
344344
waitExecute()
345-
timex.Add(1500 * time.Millisecond)
345+
timex.Add(600 * time.Millisecond)
346346
waitExecute()
347347
input <- &xsql.Tuple{Message: map[string]any{"a": int64(3)}}
348348
waitExecute()
349-
timex.Add(1300 * time.Millisecond)
349+
timex.Add(2 * time.Second)
350350
waitExecute()
351-
timex.Add(3000 * time.Millisecond)
352-
353351
got := <-output
354352
wt, ok := got.(*xsql.WindowTuples)
355353
require.True(t, ok)
@@ -368,8 +366,8 @@ func TestIncAggSlidingWindowDelay(t *testing.T) {
368366
d = wt.ToMaps()
369367
require.Equal(t, []map[string]any{
370368
{
371-
"a": int64(2),
372-
"inc_agg_col_1": int64(1),
369+
"a": int64(3),
370+
"inc_agg_col_1": int64(3),
373371
},
374372
}, d)
375373
got = <-output
@@ -380,7 +378,7 @@ func TestIncAggSlidingWindowDelay(t *testing.T) {
380378
require.Equal(t, []map[string]any{
381379
{
382380
"a": int64(3),
383-
"inc_agg_col_1": int64(1),
381+
"inc_agg_col_1": int64(2),
384382
},
385383
}, d)
386384
cancel()

0 commit comments

Comments
 (0)