Skip to content

Commit 73b70d7

Browse files
committed
impr: decoder code
fix: decoder not consuming resp in buffer
1 parent 426196a commit 73b70d7

File tree

1 file changed

+33
-41
lines changed

1 file changed

+33
-41
lines changed

src/decoder.h

Lines changed: 33 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
#include "MsgPack.h"
1616
#include "transport.h"
1717
#include "rpclite_utils.h"
18+
#include "error.h"
1819

1920
using namespace RpcUtils::detail;
2021

2122
#define MIN_RPC_BYTES 4
22-
#define CHUNK_SIZE 32
2323

2424
template<size_t BufferSize = DECODER_BUFFER_SIZE>
2525
class RpcDecoder {
@@ -78,15 +78,19 @@ class RpcDecoder {
7878
// This should never happen
7979
error.code = PARSING_ERR;
8080
error.traceback = "Unexpected response type";
81-
crop_response(true);
81+
consume(_response_size, _response_offset);
82+
if (_response_offset == 0) reset_packet();
83+
_discarded_packets++;
8284
return true;
8385
}
8486

8587
if (resp_size.size() != RESPONSE_SIZE) {
8688
// This should never happen
8789
error.code = PARSING_ERR;
8890
error.traceback = "Unexpected RPC response size";
89-
crop_response(true);
91+
consume(_response_size, _response_offset);
92+
if (_response_offset == 0) reset_packet();
93+
_discarded_packets++;
9094
return true;
9195
}
9296

@@ -95,19 +99,25 @@ class RpcDecoder {
9599
if (!unpacker.deserialize(nil, result)) {
96100
error.code = PARSING_ERR;
97101
error.traceback = "Result not parsable (check type)";
98-
crop_response(true);
102+
consume(_response_size, _response_offset);
103+
if (_response_offset == 0) reset_packet();
104+
_discarded_packets++;
99105
return true;
100106
}
101107
} else { // RPC returned an error
102108
if (!unpacker.deserialize(error, nil)) {
103109
error.code = PARSING_ERR;
104110
error.traceback = "RPC Error not parsable (check type)";
105-
crop_response(true);
111+
consume(_response_size, _response_offset);
112+
if (_response_offset == 0) reset_packet();
113+
_discarded_packets++;
106114
return true;
107115
}
108116
}
109117

110-
crop_response(false);
118+
if (_response_offset == 0) reset_packet();
119+
consume(_response_size, _response_offset);
120+
111121
return true;
112122
}
113123

@@ -190,10 +200,8 @@ class RpcDecoder {
190200
size_t offset = 0;
191201

192202
if (packet_incoming()) {
193-
if (response_queued()) {
194-
return;
195-
}
196-
offset = _response_offset;
203+
if (response_queued()) return; // parsing complete
204+
offset = _response_offset; // looking for a RESP
197205
}
198206

199207
size_t bytes_checked = 0;
@@ -220,24 +228,16 @@ class RpcDecoder {
220228
break; // Not a valid RPC format
221229
}
222230

223-
if (offset == 0) { // that's the first packet
231+
if (offset == 0) {
224232
_packet_type = type;
225233
_packet_size = bytes_checked;
226-
if (type == RESP_MSG) { // and it is for a client
227-
_response_offset = 0;
228-
_response_size = bytes_checked;
229-
} else if (!response_queued()) {
230-
_response_offset = bytes_checked;
231-
_response_size = 0;
232-
}
234+
}
235+
236+
if (type == RESP_MSG) {
237+
_response_offset = offset;
238+
_response_size = bytes_checked; // response queued
233239
} else {
234-
if (type == RESP_MSG) { // we have a response packet in the queue
235-
_response_offset = offset;
236-
_response_size = bytes_checked;
237-
} else { // look further
238-
_response_offset = offset + bytes_checked;
239-
_response_size = 0;
240-
}
240+
_response_offset = offset + bytes_checked;
241241
}
242242

243243
break;
@@ -250,7 +250,7 @@ class RpcDecoder {
250250
bool packet_incoming() const { return _packet_size >= MIN_RPC_BYTES; }
251251

252252
bool response_queued() const {
253-
return (_response_offset < _bytes_stored) && (_response_size > 0);
253+
return _response_size > 0;
254254
}
255255

256256
int packet_type() const { return _packet_type; }
@@ -303,23 +303,9 @@ class RpcDecoder {
303303
}
304304

305305
reset_packet();
306-
if (_response_offset >= packet_size) {
307-
_response_offset -= packet_size;
308-
}
309306
return consume(packet_size);
310307
}
311308

312-
void crop_response(bool discard) {
313-
consume(_response_size, _response_offset);
314-
if (_response_offset==0) { // the response was in the first position
315-
reset_packet();
316-
}
317-
reset_response();
318-
if (discard) {
319-
_discarded_packets++;
320-
}
321-
}
322-
323309
void discard() {
324310
consume(_packet_size);
325311
reset_packet();
@@ -332,7 +318,7 @@ class RpcDecoder {
332318
}
333319

334320
void reset_response() {
335-
_response_offset = _bytes_stored;
321+
_response_offset = 0;
336322
_response_size = 0;
337323
}
338324

@@ -345,6 +331,12 @@ class RpcDecoder {
345331
_raw_buffer[i] = _raw_buffer[i+size];
346332
}
347333

334+
if (_response_offset >= offset + size) {
335+
_response_offset -= size;
336+
} else {
337+
reset_response();
338+
}
339+
348340
_bytes_stored = remaining_bytes;
349341
return size;
350342
}

0 commit comments

Comments
 (0)