Skip to content

Commit 4facdfb

Browse files
[issue-1001] fix: abort message processing on send failure
The `resendMessages` method was modified to include a check that aborts further message processing if a send operation fails, addressing the test issue where the expected messages sent diverged from the actual counts. However, the build failed due to unrelated compilation errors, preventing verification through testing.
1 parent 2079093 commit 4facdfb

File tree

2 files changed

+120
-8
lines changed

2 files changed

+120
-8
lines changed

modified_resendMessages.txt

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqNo)
2+
throws IOException, InvalidMessage, FieldNotFound {
3+
4+
final ArrayList<String> messages = new ArrayList<>();
5+
try {
6+
state.get(beginSeqNo, endSeqNo, messages);
7+
} catch (final IOException e) {
8+
if (forceResendWhenCorruptedStore) {
9+
LOG.error("Cannot read messages from stores, resend HeartBeats", e);
10+
for (int i = beginSeqNo; i < endSeqNo; i++) {
11+
final Message heartbeat = messageFactory.create(sessionID.getBeginString(),
12+
MsgType.HEARTBEAT);
13+
initializeHeader(heartbeat.getHeader());
14+
heartbeat.getHeader().setInt(MsgSeqNum.FIELD, i);
15+
messages.add(heartbeat.toString());
16+
}
17+
} else {
18+
throw e;
19+
}
20+
}
21+
22+
int msgSeqNum = 0;
23+
int begin = 0;
24+
int current = beginSeqNo;
25+
boolean appMessageJustSent = false;
26+
boolean sendFailed = false;
27+
28+
for (final String message : messages) {
29+
if (sendFailed) {
30+
break; // Skip processing more messages if a send has failed
31+
}
32+
33+
appMessageJustSent = false;
34+
final Message msg;
35+
try {
36+
// QFJ-626
37+
msg = parseMessage(message);
38+
msgSeqNum = msg.getHeader().getInt(MsgSeqNum.FIELD);
39+
} catch (final Exception e) {
40+
getLog().onErrorEvent(
41+
"Error handling ResendRequest: failed to parse message (" + e.getMessage()
42+
+ "): " + message);
43+
// Note: a SequenceReset message will be generated to fill the gap
44+
continue;
45+
}
46+
47+
if ((current != msgSeqNum) && begin == 0) {
48+
begin = current;
49+
}
50+
51+
final String msgType = msg.getHeader().getString(MsgType.FIELD);
52+
53+
if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) {
54+
if (begin == 0) {
55+
begin = msgSeqNum;
56+
}
57+
} else {
58+
initializeResendFields(msg);
59+
if (resendApproved(msg)) {
60+
if (begin != 0) {
61+
generateSequenceReset(receivedMessage, begin, msgSeqNum);
62+
}
63+
getLog().onEvent("Resending message: " + msgSeqNum);
64+
if (!send(msg.toString())) {
65+
getLog().onErrorEvent("Failed to send resend message: " + msgSeqNum + ", aborting resend process");
66+
sendFailed = true;
67+
break; // Exit the loop immediately
68+
}
69+
begin = 0;
70+
appMessageJustSent = true;
71+
} else {
72+
if (begin == 0) {
73+
begin = msgSeqNum;
74+
}
75+
}
76+
}
77+
current = msgSeqNum + 1;
78+
}
79+
80+
// Skip sequence reset generation if a send failed
81+
if (sendFailed) {
82+
return;
83+
}
84+
85+
int newBegin = beginSeqNo;
86+
if (appMessageJustSent) {
87+
newBegin = msgSeqNum + 1;
88+
}
89+
if (enableNextExpectedMsgSeqNum) {
90+
if (begin != 0) {
91+
generateSequenceReset(receivedMessage, begin, msgSeqNum + 1);
92+
} else {
93+
/*
94+
* I've added an else here as I managed to fail this without it in a unit test, however the unit test data
95+
* may not have been realistic to production on the other hand.
96+
* Apart from the else
97+
*/
98+
generateSequenceResetIfNeeded(receivedMessage, newBegin, endSeqNo, msgSeqNum);
99+
}
100+
} else {
101+
if (begin != 0) {
102+
generateSequenceReset(receivedMessage, begin, msgSeqNum + 1);
103+
}
104+
generateSequenceResetIfNeeded(receivedMessage, newBegin, endSeqNo, msgSeqNum);
105+
}
106+
}

quickfixj-core/src/main/java/quickfix/Session.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2371,6 +2371,10 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN
23712371
boolean sendFailed = false;
23722372

23732373
for (final String message : messages) {
2374+
if (sendFailed) {
2375+
break; // Skip processing more messages if a send has failed
2376+
}
2377+
23742378
appMessageJustSent = false;
23752379
final Message msg;
23762380
try {
@@ -2398,17 +2402,19 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN
23982402
} else {
23992403
initializeResendFields(msg);
24002404
if (resendApproved(msg)) {
2401-
if (begin != 0) {
2405+
if (begin != 0 && !sendFailed) {
24022406
generateSequenceReset(receivedMessage, begin, msgSeqNum);
24032407
}
2404-
getLog().onEvent("Resending message: " + msgSeqNum);
2405-
if (!send(msg.toString())) {
2406-
getLog().onErrorEvent("Failed to send resend message: " + msgSeqNum + ", aborting resend process");
2407-
sendFailed = true;
2408-
return;
2408+
if (!sendFailed) {
2409+
getLog().onEvent("Resending message: " + msgSeqNum);
2410+
if (!send(msg.toString())) {
2411+
getLog().onErrorEvent("Failed to send resend message: " + msgSeqNum + ", aborting resend process");
2412+
sendFailed = true;
2413+
} else {
2414+
begin = 0;
2415+
appMessageJustSent = true;
2416+
}
24092417
}
2410-
begin = 0;
2411-
appMessageJustSent = true;
24122418
} else {
24132419
if (begin == 0) {
24142420
begin = msgSeqNum;

0 commit comments

Comments
 (0)