Skip to content

Commit f0d403d

Browse files
authored
Merge pull request #142 from shelfio/implement-backpressure
2 parents 1381df9 + 1767d82 commit f0d403d

File tree

9 files changed

+256
-51
lines changed

9 files changed

+256
-51
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
"lint": "eslint . --ext .js,.ts,.json --fix --quiet",
2727
"lint:ci": "eslint . --ext .js,.ts,.json --quiet",
2828
"prepack": "yarn build",
29-
"test": "export ENVIRONMENT=local && jest src --runInBand",
29+
"test": "export ENVIRONMENT=local && jest src --runInBand --forceExit",
3030
"type-check": "tsc --noEmit",
3131
"type-check:watch": "npm run type-check -- --watch"
3232
},
@@ -56,7 +56,7 @@
5656
"@babel/cli": "7.17.6",
5757
"@babel/core": "7.17.5",
5858
"@shelf/babel-config": "0.1.8",
59-
"@shelf/eslint-config": "2.0.0",
59+
"@shelf/eslint-config": "2.16.0",
6060
"@shelf/jest-dynamodb": "2.1.0",
6161
"@shelf/prettier-config": "1.0.0",
6262
"@shelf/tsconfig": "0.0.6",

readme.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ const {parallelScan} = require('@shelf/dynamodb-parallel-scan');
3434

3535
### Use as async generator (or streams)
3636

37-
Note: this stream doesn't implement backpressure mechanism just yet, so memory overflow could happen if you don't consume stream fast enough.
37+
Note: `highWaterMark` determines items count threshold, so Parallel Scan can fetch `concurrency` \* 1MB more data even after highWaterMark was reached.
3838

3939
```js
4040
const {parallelScanAsStream} = require('@shelf/dynamodb-parallel-scan');
@@ -49,7 +49,7 @@ const {parallelScanAsStream} = require('@shelf/dynamodb-parallel-scan');
4949
},
5050
ProjectionExpression: 'fileSize',
5151
},
52-
{concurrency: 1000, chunkSize: 10000}
52+
{concurrency: 1000, chunkSize: 10000, highWaterMark: 10000}
5353
);
5454

5555
for await (const items of stream) {

src/blocker.test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import {inspect} from 'util';
2+
import {Blocker} from './blocker';
3+
4+
it('should return same promise after sequent block() calls', () => {
5+
const blocker = new Blocker();
6+
7+
blocker.block();
8+
9+
const oldPromise = blocker.get();
10+
11+
blocker.block();
12+
13+
const newPromise = blocker.get();
14+
15+
expect(newPromise === oldPromise).toBeTruthy();
16+
});
17+
18+
it('should return pending promise after block()', () => {
19+
const blocker = new Blocker();
20+
21+
blocker.block();
22+
23+
const inspectedBlocker = inspect(blocker.get());
24+
const isInspectedBlockerPending = inspectedBlocker.includes('pending');
25+
26+
expect(isInspectedBlockerPending).toBeTruthy();
27+
});
28+
29+
it('should return resolved promise after unblock()', () => {
30+
const blocker = new Blocker();
31+
32+
blocker.unblock();
33+
34+
const inspectedBlocker = inspect(blocker.get());
35+
const isInspectedBlockerResolved = inspectedBlocker.includes('undefined');
36+
37+
expect(isInspectedBlockerResolved).toBeTruthy();
38+
});
39+
40+
it('should return resolved promise in default state', () => {
41+
const blocker = new Blocker();
42+
43+
const inspectedBlocker = inspect(blocker.get());
44+
const isInspectedBlockerResolved = inspectedBlocker.includes('undefined');
45+
46+
expect(isInspectedBlockerResolved).toBeTruthy();
47+
});
48+
49+
it('should be blocked after block() call', () => {
50+
const blocker = new Blocker();
51+
52+
blocker.block();
53+
54+
expect(blocker.isBlocked()).toBeTruthy();
55+
});
56+
57+
it('should not be blocked after unblock() call', () => {
58+
const blocker = new Blocker();
59+
60+
blocker.unblock();
61+
blocker.unblock();
62+
63+
expect(blocker.isBlocked()).toBeFalsy();
64+
});
65+
66+
it('should not be blocked in default state', () => {
67+
const blocker = new Blocker();
68+
69+
expect(blocker.isBlocked()).toBeFalsy();
70+
});

src/blocker.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
export class Blocker {
2+
private _promise: Promise<void>;
3+
private _promiseResolver: (...args: any[]) => any;
4+
private _isBlocked: boolean;
5+
6+
constructor() {
7+
this._promise = Promise.resolve();
8+
this._promiseResolver = () => {}; // eslint-disable-line @typescript-eslint/no-empty-function
9+
this._isBlocked = false;
10+
}
11+
12+
block(): void {
13+
if (this._isBlocked) {
14+
return;
15+
}
16+
17+
this._promise = new Promise(r => {
18+
this._promiseResolver = r;
19+
setTimeout(r, 2147483647);
20+
}); //TODO: Implement endless promise
21+
22+
this._isBlocked = true;
23+
}
24+
25+
unblock(): void {
26+
this._promiseResolver();
27+
28+
this._isBlocked = false;
29+
}
30+
31+
get(): Promise<void> {
32+
return this._promise;
33+
}
34+
35+
isBlocked(): boolean {
36+
return this._isBlocked;
37+
}
38+
}

src/ddb.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import DynamoDB from 'aws-sdk/clients/dynamodb';
2-
import {DocumentClient} from 'aws-sdk/lib/dynamodb/document_client';
3-
import {InsertManyParams} from './ddb.types';
2+
import type {DocumentClient} from 'aws-sdk/lib/dynamodb/document_client';
3+
import type {InsertManyParams} from './ddb.types';
44

55
const isTest = process.env.JEST_WORKER_ID;
66
const config = {
@@ -27,11 +27,13 @@ export function insertMany({
2727
}: InsertManyParams): Promise<DocumentClient.BatchWriteItemOutput> {
2828
const params: DocumentClient.BatchWriteItemInput = {
2929
RequestItems: {
30-
[tableName]: items.map(item => ({
31-
PutRequest: {
32-
Item: item,
33-
},
34-
})),
30+
[tableName]: items.map(item => {
31+
return {
32+
PutRequest: {
33+
Item: item,
34+
},
35+
};
36+
}),
3537
},
3638
};
3739

src/parallel-scan-stream.test.ts

Lines changed: 77 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,44 @@
1-
import {insertMany} from './ddb';
1+
jest.setTimeout(25000);
2+
3+
import {uniq} from 'lodash';
4+
import * as ddbHelpers from './ddb';
25
import {parallelScanAsStream} from './parallel-scan-stream';
36

7+
async function delay(ms: number) {
8+
return new Promise(r => {
9+
setTimeout(r, ms);
10+
});
11+
}
12+
413
describe('parallelScanAsStream', () => {
514
const files = [
6-
{id: 'some-file-id-1'},
7-
{id: 'some-file-id-2'},
8-
{id: 'some-file-id-3', fileSize: 100},
9-
{id: 'some-file-id-4'},
10-
{id: 'some-file-id-5'},
11-
{id: 'some-file-id-6', fileSize: 200},
12-
{id: 'some-file-id-7'},
13-
{id: 'some-file-id-8'},
14-
{id: 'some-file-id-9', fileSize: 300},
15-
{id: 'some-file-id-10'},
15+
{id: 'some-file-id-1', isLarge: false},
16+
{id: 'some-file-id-2', isLarge: false},
17+
{id: 'some-file-id-3', fileSize: 100, isLarge: false},
18+
{id: 'some-file-id-4', isLarge: false},
19+
{id: 'some-file-id-5', isLarge: false},
20+
{id: 'some-file-id-6', fileSize: 200, isLarge: false},
21+
{id: 'some-file-id-7', isLarge: false},
22+
{id: 'some-file-id-8', isLarge: false},
23+
{id: 'some-file-id-9', fileSize: 300, isLarge: false},
24+
{id: 'some-file-id-10', isLarge: false},
1625
];
1726

1827
beforeAll(async () => {
19-
await insertMany({items: files, tableName: 'files'});
28+
await ddbHelpers.insertMany({items: files, tableName: 'files'});
2029
});
2130

2231
it('should stream items with chunks of 2 with concurrency 1', async () => {
2332
const stream = await parallelScanAsStream(
2433
{
2534
TableName: 'files',
26-
FilterExpression: 'attribute_exists(#id)',
35+
FilterExpression: 'attribute_exists(#id) and #isLarge = :false',
2736
ExpressionAttributeNames: {
2837
'#id': 'id',
38+
'#isLarge': 'isLarge',
39+
},
40+
ExpressionAttributeValues: {
41+
':false': false,
2942
},
3043
},
3144
{concurrency: 1, chunkSize: 2}
@@ -40,9 +53,13 @@ describe('parallelScanAsStream', () => {
4053
const stream = await parallelScanAsStream(
4154
{
4255
TableName: 'files',
43-
FilterExpression: 'attribute_exists(#id)',
56+
FilterExpression: 'attribute_exists(#id) and #isLarge = :false',
4457
ExpressionAttributeNames: {
4558
'#id': 'id',
59+
'#isLarge': 'isLarge',
60+
},
61+
ExpressionAttributeValues: {
62+
':false': false,
4663
},
4764
},
4865
{concurrency: 5, chunkSize: 2}
@@ -58,4 +75,50 @@ describe('parallelScanAsStream', () => {
5875

5976
expect(allItems).toHaveLength(10);
6077
});
78+
79+
it('should pause calling dynamodb after highWaterMark reached', async () => {
80+
const scanSpy = jest.spyOn(ddbHelpers, 'scan');
81+
82+
const megaByte = Buffer.alloc(1024 * 390); // Maximum allowed item size in ddb is 400KB
83+
const megaByteString = megaByte.toString();
84+
85+
await ddbHelpers.insertMany({
86+
items: [
87+
{id: 'some-big-file-id-1', isLarge: true, payload: megaByteString},
88+
{id: 'some-big-file-id-2', isLarge: true, payload: megaByteString},
89+
{id: 'some-big-file-id-3', isLarge: true, payload: megaByteString},
90+
{id: 'some-big-file-id-4', isLarge: true, payload: megaByteString},
91+
{id: 'some-big-file-id-5', isLarge: true, payload: megaByteString},
92+
],
93+
tableName: 'files',
94+
});
95+
96+
const stream = await parallelScanAsStream(
97+
{
98+
TableName: 'files',
99+
FilterExpression: 'attribute_exists(#id) and #isLarge = :true',
100+
ExpressionAttributeNames: {
101+
'#id': 'id',
102+
'#isLarge': 'isLarge',
103+
},
104+
ExpressionAttributeValues: {
105+
':true': true,
106+
},
107+
},
108+
{concurrency: 1, chunkSize: 1, highWaterMark: 1}
109+
);
110+
111+
const scanCallsByIteration = [];
112+
for await (const _ of stream) {
113+
expect(_).not.toBeUndefined();
114+
115+
await delay(1000);
116+
117+
scanCallsByIteration.push(scanSpy.mock.calls.length);
118+
}
119+
120+
const scanCallsByIterationUniq = uniq(scanCallsByIteration);
121+
122+
expect(scanCallsByIterationUniq).toEqual([1, 2]);
123+
});
61124
});

0 commit comments

Comments
 (0)