Skip to content

Commit 7b81fa1

Browse files
committed
add replayer test with worker interceptor
Signed-off-by: Tihomir Surdilovic <[email protected]>
1 parent 902591b commit 7b81fa1

File tree

1 file changed

+192
-0
lines changed

1 file changed

+192
-0
lines changed
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package io.temporal.samples.interceptorreplaytest;
2+
3+
import static org.junit.Assert.fail;
4+
5+
import io.temporal.activity.ActivityInterface;
6+
import io.temporal.activity.ActivityOptions;
7+
import io.temporal.client.WorkflowOptions;
8+
import io.temporal.common.WorkflowExecutionHistory;
9+
import io.temporal.common.interceptors.*;
10+
import io.temporal.testing.TestWorkflowEnvironment;
11+
import io.temporal.testing.TestWorkflowExtension;
12+
import io.temporal.testing.WorkflowReplayer;
13+
import io.temporal.worker.Worker;
14+
import io.temporal.worker.WorkerFactoryOptions;
15+
import io.temporal.workflow.Workflow;
16+
import io.temporal.workflow.WorkflowInterface;
17+
import io.temporal.workflow.WorkflowMethod;
18+
import java.time.Duration;
19+
import org.junit.jupiter.api.Test;
20+
import org.junit.jupiter.api.extension.RegisterExtension;
21+
22+
public class InterceptorReplayTest {
23+
@RegisterExtension
24+
public static final TestWorkflowExtension testWorkflowExtension =
25+
TestWorkflowExtension.newBuilder()
26+
// Register workflow and activity impls
27+
.registerWorkflowImplementationTypes(TestWorkflowImpl.class)
28+
.setActivityImplementations(new TestActivitiesImpl())
29+
// Register worker interceptor
30+
.setWorkerFactoryOptions(
31+
WorkerFactoryOptions.newBuilder()
32+
.setWorkerInterceptors(new TestWorkerInterceptor())
33+
.build())
34+
.setDoNotStart(true)
35+
.build();
36+
37+
@Test
38+
// TODO
39+
public void testReplayWithInterceptors(TestWorkflowEnvironment testEnv, Worker worker) {
40+
// Run our test workflow. We need to set workflow id so can get history after
41+
testEnv.start();
42+
TestWorkflow workflow =
43+
testEnv
44+
.getWorkflowClient()
45+
.newWorkflowStub(
46+
TestWorkflow.class,
47+
WorkflowOptions.newBuilder()
48+
.setWorkflowId("test-workflow")
49+
.setTaskQueue(worker.getTaskQueue())
50+
.build());
51+
workflow.execute();
52+
53+
// Replay execution with history of just executed
54+
WorkflowExecutionHistory eventHistory =
55+
testEnv.getWorkflowClient().fetchHistory("test-workflow");
56+
57+
try {
58+
WorkflowReplayer.replayWorkflowExecution(eventHistory, worker);
59+
} catch (Exception e) {
60+
fail(e.getMessage());
61+
}
62+
testEnv.shutdown();
63+
64+
// Try replaying execution with test env where we dont have interceptors registered
65+
TestWorkflowEnvironment testEnv2 = TestWorkflowEnvironment.newInstance();
66+
Worker testEnv2Worker = testEnv2.newWorker("test-taskqueue");
67+
testEnv2Worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class);
68+
testEnv2Worker.registerActivitiesImplementations(new TestActivitiesImpl());
69+
70+
testEnv2.start();
71+
72+
// Replay should fail with worker that does not have interceptor registered
73+
try {
74+
WorkflowReplayer.replayWorkflowExecution(eventHistory, testEnv2Worker);
75+
fail("This should have failed");
76+
} catch (Exception e) {
77+
System.out.println(e.getMessage());
78+
}
79+
80+
// But it should be fine with worker that does
81+
try {
82+
WorkflowReplayer.replayWorkflowExecution(eventHistory, worker);
83+
} catch (Exception e) {
84+
fail(e.getMessage());
85+
}
86+
87+
testEnv2.shutdown();
88+
}
89+
90+
// Test workflow and activities
91+
@WorkflowInterface
92+
public interface TestWorkflow {
93+
@WorkflowMethod
94+
void execute();
95+
}
96+
97+
public static class TestWorkflowImpl implements TestWorkflow {
98+
99+
TestActivities activities =
100+
Workflow.newActivityStub(
101+
TestActivities.class,
102+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(2)).build());
103+
104+
@Override
105+
public void execute() {
106+
activities.activityOne();
107+
}
108+
}
109+
110+
@ActivityInterface
111+
public interface TestActivities {
112+
void activityOne();
113+
114+
void activityTwo();
115+
116+
void activityThree();
117+
}
118+
119+
public static class TestActivitiesImpl implements TestActivities {
120+
@Override
121+
public void activityOne() {
122+
System.out.println("Activities one done");
123+
}
124+
125+
@Override
126+
public void activityTwo() {
127+
System.out.println("Activities two done");
128+
}
129+
130+
@Override
131+
public void activityThree() {
132+
System.out.println("Activities three done");
133+
}
134+
}
135+
136+
// Test worker and workflow interceptors
137+
public static class TestWorkerInterceptor extends WorkerInterceptorBase {
138+
@Override
139+
public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) {
140+
return new TestWorkflowInboundCallsInterceptor(next);
141+
}
142+
}
143+
144+
public static class TestWorkflowInboundCallsInterceptor
145+
extends WorkflowInboundCallsInterceptorBase {
146+
TestActivities activities =
147+
Workflow.newActivityStub(
148+
TestActivities.class,
149+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(2)).build());
150+
151+
public TestWorkflowInboundCallsInterceptor(WorkflowInboundCallsInterceptor next) {
152+
super(next);
153+
}
154+
155+
@Override
156+
public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
157+
super.init(new TestWorkflowOutboundCallsInterceptor(outboundCalls));
158+
}
159+
160+
@Override
161+
public WorkflowOutput execute(WorkflowInput input) {
162+
WorkflowOutput output = super.execute(input);
163+
// Run activity three before completing execution
164+
activities.activityThree();
165+
return output;
166+
}
167+
}
168+
169+
public static class TestWorkflowOutboundCallsInterceptor
170+
extends WorkflowOutboundCallsInterceptorBase {
171+
TestActivities activities =
172+
Workflow.newActivityStub(
173+
TestActivities.class,
174+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(2)).build());
175+
176+
public TestWorkflowOutboundCallsInterceptor(WorkflowOutboundCallsInterceptor next) {
177+
super(next);
178+
}
179+
180+
@Override
181+
public <R> ActivityOutput<R> executeActivity(ActivityInput<R> input) {
182+
ActivityOutput output = super.executeActivity(input);
183+
184+
// we only want to intercept ActivityOne here
185+
if (input.getActivityName().equals("ActivityOne")) {
186+
activities.activityTwo();
187+
}
188+
189+
return output;
190+
}
191+
}
192+
}

0 commit comments

Comments
 (0)