Skip to content

Commit fbade94

Browse files
committed
feat: support ttheader streaming timeout and unify streaming timeout control
1 parent 1eea0ae commit fbade94

39 files changed

+2540
-132
lines changed

client/callopt/streamcall/call_options.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ func WithTag(key, val string) Option {
4848
}
4949

5050
// WithRecvTimeout add recv timeout for stream.Recv function.
51-
// NOTICE: ONLY effective for ttheader streaming protocol for now.
5251
func WithRecvTimeout(d time.Duration) Option {
5352
return Option{f: func(o *callopt.CallOptions, di *strings.Builder) {
5453
di.WriteString("WithRecvTimeout(")
@@ -58,3 +57,25 @@ func WithRecvTimeout(d time.Duration) Option {
5857
o.StreamOptions.RecvTimeout = d
5958
}}
6059
}
60+
61+
// WithSendTimeout add send timeout for stream.Send function.
62+
func WithSendTimeout(d time.Duration) Option {
63+
return Option{f: func(o *callopt.CallOptions, di *strings.Builder) {
64+
di.WriteString("WithSendTimeout(")
65+
di.WriteString(d.String())
66+
di.WriteString(")")
67+
68+
o.StreamOptions.SendTimeout = d
69+
}}
70+
}
71+
72+
// WithStreamTimeout add timeout for whole stream.
73+
func WithStreamTimeout(d time.Duration) Option {
74+
return Option{f: func(o *callopt.CallOptions, di *strings.Builder) {
75+
di.WriteString("WithStreamTimeout(")
76+
di.WriteString(d.String())
77+
di.WriteString(")")
78+
79+
o.StreamOptions.StreamTimeout = d
80+
}}
81+
}

client/callopt/streamcall/call_options_test.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,29 @@ import (
2525
"github.com/cloudwego/kitex/internal/test"
2626
)
2727

28-
func TestWithRecvTimeout(t *testing.T) {
29-
var sb strings.Builder
30-
callOpts := callopt.CallOptions{}
31-
testTimeout := 1 * time.Second
32-
WithRecvTimeout(testTimeout).f(&callOpts, &sb)
33-
test.Assert(t, callOpts.StreamOptions.RecvTimeout == testTimeout)
34-
test.Assert(t, sb.String() == "WithRecvTimeout(1s)")
28+
func Test_streamCallTimeoutCallOptions(t *testing.T) {
29+
t.Run("WithRecvTimeout", func(t *testing.T) {
30+
var sb strings.Builder
31+
callOpts := callopt.CallOptions{}
32+
testTimeout := 1 * time.Second
33+
WithRecvTimeout(testTimeout).f(&callOpts, &sb)
34+
test.Assert(t, callOpts.StreamOptions.RecvTimeout == testTimeout)
35+
test.Assert(t, sb.String() == "WithRecvTimeout(1s)")
36+
})
37+
t.Run("WithSendTimeout", func(t *testing.T) {
38+
var sb strings.Builder
39+
callOpts := callopt.CallOptions{}
40+
testTimeout := 1 * time.Second
41+
WithSendTimeout(testTimeout).f(&callOpts, &sb)
42+
test.Assert(t, callOpts.StreamOptions.SendTimeout == testTimeout)
43+
test.Assert(t, sb.String() == "WithSendTimeout(1s)")
44+
})
45+
t.Run("WithStreamTimeout", func(t *testing.T) {
46+
var sb strings.Builder
47+
callOpts := callopt.CallOptions{}
48+
testTimeout := 1 * time.Second
49+
WithStreamTimeout(testTimeout).f(&callOpts, &sb)
50+
test.Assert(t, callOpts.StreamOptions.StreamTimeout == testTimeout)
51+
test.Assert(t, sb.String() == "WithStreamTimeout(1s)")
52+
})
3553
}

client/client.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,12 @@ func initRPCInfo(ctx context.Context, method string, opt *client.Options, svcInf
827827
if sopt.RecvTimeout > 0 {
828828
cfg.SetStreamRecvTimeout(sopt.RecvTimeout)
829829
}
830+
if sopt.SendTimeout > 0 {
831+
cfg.SetStreamSendTimeout(sopt.SendTimeout)
832+
}
833+
if sopt.StreamTimeout > 0 {
834+
cfg.SetStreamTimeout(sopt.StreamTimeout)
835+
}
830836

831837
ctx = rpcinfo.NewCtxWithRPCInfo(ctx, ri)
832838

@@ -838,6 +844,12 @@ func initRPCInfo(ctx context.Context, method string, opt *client.Options, svcInf
838844
if callOpts.StreamOptions.RecvTimeout != 0 {
839845
cfg.SetStreamRecvTimeout(callOpts.StreamOptions.RecvTimeout)
840846
}
847+
if callOpts.StreamOptions.SendTimeout != 0 {
848+
cfg.SetStreamSendTimeout(callOpts.StreamOptions.SendTimeout)
849+
}
850+
if callOpts.StreamOptions.StreamTimeout != 0 {
851+
cfg.SetStreamTimeout(callOpts.StreamOptions.StreamTimeout)
852+
}
841853
}
842854

843855
return ctx, ri, callOpts

client/client_test.go

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1266,22 +1266,66 @@ func Test_initRPCInfoWithStreamClientCallOption(t *testing.T) {
12661266
mtd := mocks.MockMethod
12671267
svcInfo := mocks.ServiceInfo()
12681268
callOptTimeout := 1 * time.Second
1269+
cliTimeout := 2 * time.Second
12691270
testService := "testService"
12701271

1271-
// config call option
1272-
cliIntf, err := NewClient(svcInfo, WithTransportProtocol(transport.TTHeaderStreaming), WithDestService(testService))
1273-
test.Assert(t, err == nil, err)
1274-
cli := cliIntf.(*kcFinalizerClient)
1275-
ctx := NewCtxWithCallOptions(context.Background(), streamcall.GetCallOptions([]streamcall.Option{streamcall.WithRecvTimeout(callOptTimeout)}))
1276-
_, ri, _ := cli.initRPCInfo(ctx, mtd, 0, nil, true)
1277-
test.Assert(t, ri.Config().StreamRecvTimeout() == callOptTimeout)
1272+
testcases := []struct {
1273+
desc string
1274+
cliOpt StreamOption
1275+
callOpt streamcall.Option
1276+
verifyFunc func(t *testing.T, ri rpcinfo.RPCInfo, isPureCli bool)
1277+
}{
1278+
{
1279+
desc: "stream recv timeout",
1280+
cliOpt: WithStreamRecvTimeout(cliTimeout),
1281+
callOpt: streamcall.WithRecvTimeout(callOptTimeout),
1282+
verifyFunc: func(t *testing.T, ri rpcinfo.RPCInfo, isPureCli bool) {
1283+
if isPureCli {
1284+
test.Assert(t, ri.Config().StreamRecvTimeout() == cliTimeout, ri)
1285+
} else {
1286+
test.Assert(t, ri.Config().StreamRecvTimeout() == callOptTimeout, ri)
1287+
}
1288+
},
1289+
},
1290+
{
1291+
desc: "stream send timeout",
1292+
cliOpt: WithStreamSendTimeout(cliTimeout),
1293+
callOpt: streamcall.WithSendTimeout(callOptTimeout),
1294+
verifyFunc: func(t *testing.T, ri rpcinfo.RPCInfo, isPureCli bool) {
1295+
if isPureCli {
1296+
test.Assert(t, ri.Config().StreamSendTimeout() == cliTimeout, ri)
1297+
} else {
1298+
test.Assert(t, ri.Config().StreamSendTimeout() == callOptTimeout, ri)
1299+
}
1300+
},
1301+
},
1302+
{
1303+
desc: "stream timeout",
1304+
cliOpt: WithStreamTimeout(cliTimeout),
1305+
callOpt: streamcall.WithStreamTimeout(callOptTimeout),
1306+
verifyFunc: func(t *testing.T, ri rpcinfo.RPCInfo, isPureCli bool) {
1307+
if isPureCli {
1308+
test.Assert(t, ri.Config().StreamTimeout() == cliTimeout, ri)
1309+
} else {
1310+
test.Assert(t, ri.Config().StreamTimeout() == callOptTimeout, ri)
1311+
}
1312+
},
1313+
},
1314+
}
12781315

1279-
// call option has higher priority
1280-
cliTimeout := 2 * time.Second
1281-
cliIntf, err = NewClient(svcInfo, WithTransportProtocol(transport.TTHeaderStreaming), WithStreamOptions(WithStreamRecvTimeout(cliTimeout)), WithDestService(testService))
1282-
test.Assert(t, err == nil, err)
1283-
cli = cliIntf.(*kcFinalizerClient)
1284-
ctx = NewCtxWithCallOptions(context.Background(), streamcall.GetCallOptions([]streamcall.Option{streamcall.WithRecvTimeout(callOptTimeout)}))
1285-
_, ri, _ = cli.initRPCInfo(ctx, mtd, 0, nil, true)
1286-
test.Assert(t, ri.Config().StreamRecvTimeout() == callOptTimeout)
1316+
for _, tc := range testcases {
1317+
t.Run(tc.desc, func(t *testing.T) {
1318+
// config client option
1319+
cliIntf, err := NewClient(svcInfo, WithTransportProtocol(transport.TTHeaderStreaming), WithDestService(testService), WithStreamOptions(tc.cliOpt))
1320+
test.Assert(t, err == nil, err)
1321+
cli := cliIntf.(*kcFinalizerClient)
1322+
_, ri, _ := cli.initRPCInfo(ctx, mtd, 0, nil, true)
1323+
tc.verifyFunc(t, ri, true)
1324+
1325+
// call option has higher priority
1326+
ctx = NewCtxWithCallOptions(context.Background(), streamcall.GetCallOptions([]streamcall.Option{tc.callOpt}))
1327+
_, ri, _ = cli.initRPCInfo(ctx, mtd, 0, nil, true)
1328+
tc.verifyFunc(t, ri, false)
1329+
})
1330+
}
12871331
}

client/option_stream.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ func WithStreamOptions(opts ...StreamOption) Option {
4040
}
4141

4242
// WithStreamRecvTimeout add recv timeout for stream.Recv function.
43-
// NOTICE: ONLY effective for ttheader streaming protocol for now.
4443
func WithStreamRecvTimeout(d time.Duration) StreamOption {
4544
return StreamOption{F: func(o *client.StreamOptions, di *utils.Slice) {
4645
di.Push(fmt.Sprintf("WithStreamRecvTimeout(%dms)", d.Milliseconds()))
@@ -49,6 +48,24 @@ func WithStreamRecvTimeout(d time.Duration) StreamOption {
4948
}}
5049
}
5150

51+
// WithStreamSendTimeout add send timeout for stream.Send function.
52+
func WithStreamSendTimeout(d time.Duration) StreamOption {
53+
return StreamOption{F: func(o *client.StreamOptions, di *utils.Slice) {
54+
di.Push(fmt.Sprintf("WithStreamSendTimeout(%dms)", d.Milliseconds()))
55+
56+
o.SendTimeout = d
57+
}}
58+
}
59+
60+
// WithStreamTimeout add timeout for whole stream.
61+
func WithStreamTimeout(d time.Duration) StreamOption {
62+
return StreamOption{F: func(o *client.StreamOptions, di *utils.Slice) {
63+
di.Push(fmt.Sprintf("WithStreamTimeout(%dms)", d.Milliseconds()))
64+
65+
o.StreamTimeout = d
66+
}}
67+
}
68+
5269
// WithStreamMiddleware add middleware for stream.
5370
func WithStreamMiddleware(mw cep.StreamMiddleware) StreamOption {
5471
return StreamOption{F: func(o *StreamOptions, di *utils.Slice) {

0 commit comments

Comments
 (0)