Skip to content

Commit 819b3f9

Browse files
committed
update
Signed-off-by: Tihomir Surdilovic <[email protected]>
1 parent 3f90abc commit 819b3f9

File tree

4 files changed

+66
-13
lines changed

4 files changed

+66
-13
lines changed

core/src/main/java/io/temporal/samples/packetdelivery/PacketDelivery.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
public class PacketDelivery {
1111
private Packet packet;
1212
private boolean deliveryConfirmation = false;
13+
private boolean needDeliveryConfirmation = false;
1314
private CompletablePromise delivered = Workflow.newPromise();
1415
private CancellationScope cancellationScope;
1516

@@ -61,6 +62,7 @@ public void processDelivery() {
6162
+ " - "
6263
+ packet.getContent()
6364
+ " awaiting delivery confirmation");
65+
needDeliveryConfirmation = true;
6466
Workflow.await(() -> deliveryConfirmation);
6567
logger.info(
6668
"** Delivery for packet: "
@@ -75,8 +77,9 @@ public void processDelivery() {
7577
+ " - "
7678
+ packet.getContent());
7779
deliveryConfirmationResult = activities.completeDelivery(packet);
78-
// Reset deliveryConfirmation
80+
// Reset deliveryConfirmation and needDeliveryConfirmation
7981
deliveryConfirmation = false;
82+
needDeliveryConfirmation = false;
8083
}
8184
});
8285

@@ -93,6 +96,7 @@ public void processDelivery() {
9396
// Just for show for example that cancel could come in while we are waiting on approval signal
9497
// too
9598
else if (e instanceof CanceledFailure) {
99+
needDeliveryConfirmation = false;
96100
// Run compensation activity and complete
97101
compensationActivities.compensateDelivery(packet);
98102
}
@@ -109,4 +113,12 @@ public void cancelDelivery(String reason) {
109113
cancellationScope.cancel(reason);
110114
}
111115
}
116+
117+
public boolean isNeedDeliveryConfirmation() {
118+
return needDeliveryConfirmation;
119+
}
120+
121+
public Packet getPacket() {
122+
return packet;
123+
}
112124
}

core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflow.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package io.temporal.samples.packetdelivery;
22

3+
import io.temporal.workflow.QueryMethod;
34
import io.temporal.workflow.SignalMethod;
45
import io.temporal.workflow.WorkflowInterface;
56
import io.temporal.workflow.WorkflowMethod;
7+
import java.util.List;
68

79
@WorkflowInterface
810
public interface PacketDeliveryWorkflow {
@@ -14,4 +16,7 @@ public interface PacketDeliveryWorkflow {
1416

1517
@SignalMethod
1618
void cancelDelivery(int deliveryId, String reason);
19+
20+
@QueryMethod
21+
List<Packet> deliveryConfirmationPackets();
1722
}

core/src/main/java/io/temporal/samples/packetdelivery/PacketDeliveryWorkflowImpl.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
import java.util.HashMap;
99
import java.util.List;
1010
import java.util.Map;
11+
import org.slf4j.Logger;
1112

1213
public class PacketDeliveryWorkflowImpl implements PacketDeliveryWorkflow {
13-
1414
private final Map<Integer, PacketDelivery> packetDeliveries = new HashMap<>();
15+
private final Logger logger = Workflow.getLogger(this.getClass().getName());
1516

1617
private final PacketDeliveryActivities activities =
1718
Workflow.newActivityStub(
@@ -47,7 +48,31 @@ public void confirmDelivery(int deliveryId) {
4748
@Override
4849
public void cancelDelivery(int deliveryId, String reason) {
4950
if (packetDeliveries.containsKey(deliveryId)) {
50-
packetDeliveries.get(deliveryId).cancelDelivery(reason);
51+
// Only makes sense to cancel if delivery is not done yet
52+
if (!packetDeliveries.get(deliveryId).getDelivered().isCompleted()) {
53+
logger.info("Sending cancellation for delivery : " + deliveryId + " and reason: " + reason);
54+
packetDeliveries.get(deliveryId).cancelDelivery(reason);
55+
}
56+
logger.info(
57+
"Bypassing sending cancellation for delivery : "
58+
+ deliveryId
59+
+ " and reason: "
60+
+ reason
61+
+ " because delivery already completed");
5162
}
5263
}
64+
65+
@Override
66+
public List<Packet> deliveryConfirmationPackets() {
67+
List<Packet> confirmationPackets = new ArrayList<>();
68+
packetDeliveries
69+
.values()
70+
.forEach(
71+
p -> {
72+
if (p.isNeedDeliveryConfirmation()) {
73+
confirmationPackets.add(p.getPacket());
74+
}
75+
});
76+
return confirmationPackets;
77+
}
5378
}

core/src/main/java/io/temporal/samples/packetdelivery/Starter.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package io.temporal.samples.packetdelivery;
22

33
import io.temporal.client.WorkflowClient;
4+
import io.temporal.client.WorkflowNotFoundException;
45
import io.temporal.client.WorkflowOptions;
56
import io.temporal.client.WorkflowStub;
67
import io.temporal.serviceclient.WorkflowServiceStubs;
78
import io.temporal.worker.Worker;
89
import io.temporal.worker.WorkerFactory;
10+
import java.util.List;
911

1012
public class Starter {
1113
public static void main(String[] args) {
@@ -30,16 +32,25 @@ public static void main(String[] args) {
3032
WorkflowClient.start(workflow::execute);
3133

3234
// start completing package deliveries (send confirmations)
33-
sleep(3);
34-
workflow.confirmDelivery(3); // furniture
35-
sleep(1);
36-
workflow.confirmDelivery(5); // electronics
37-
sleep(1);
38-
workflow.confirmDelivery(1); // books
39-
sleep(1);
40-
workflow.confirmDelivery(2); // jewelry
41-
sleep(1);
42-
workflow.confirmDelivery(4); // food
35+
// Query workflow for packets that need confirmation, confirm until none need confirmation any
36+
// more
37+
while (true) {
38+
sleep(3);
39+
List<Packet> packets = workflow.deliveryConfirmationPackets();
40+
if (packets.isEmpty()) {
41+
break;
42+
}
43+
44+
for (Packet p : packets) {
45+
try {
46+
workflow.confirmDelivery(p.getId());
47+
} catch (WorkflowNotFoundException e) {
48+
// In some cases with cancellations happening, workflow could be completed by now
49+
// We just ignore and exit out of loop
50+
break;
51+
}
52+
}
53+
}
4354

4455
// wait for workflow to complete
4556
String result = WorkflowStub.fromTyped(workflow).getResult(String.class);

0 commit comments

Comments
 (0)