Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions lib/aligned-lockable-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ export interface AlignedLockableBuffer extends Buffer {
lock: () => Promise<() => void>;
rlock: () => Promise<() => void>;
slice: (start?: number, end?: number) => AlignedLockableBuffer;
refCount: number;
addRef: () => void;
release: () => void;
}

function alignedLockableBufferSlice(
Expand All @@ -28,6 +31,15 @@ function attachMutex(
buffer.lock = lock;
buffer.rlock = rlock;
buffer.slice = alignedLockableBufferSlice;
buffer.refCount = 0;
buffer.addRef = () => {
buffer.refCount++;
};
buffer.release = () => {
if (buffer.refCount > 0) {
buffer.refCount--;
}
};
return buffer;
}

Expand Down Expand Up @@ -63,12 +75,28 @@ export class AlignedReadableState {
}

public getCurrentBuffer(): AlignedLockableBuffer {
let buffer = this.buffers[this.currentBufferIndex];
if (buffer === undefined) {
buffer = createBuffer(this.bufferSize, this.alignment);
this.buffers[this.currentBufferIndex] = buffer;
// Find a buffer with refCount === 0 (safe to reuse)
for (let i = 0; i < this.numBuffers; i++) {
const bufferIndex = (this.currentBufferIndex + i) % this.numBuffers;
let buffer = this.buffers[bufferIndex];

if (buffer === undefined) {
// Create new buffer
buffer = createBuffer(this.bufferSize, this.alignment);
this.buffers[bufferIndex] = buffer;
}

if (buffer.refCount === 0) {
// Found available buffer
this.currentBufferIndex = (bufferIndex + 1) % this.numBuffers;
buffer.addRef(); // Increment reference count
return buffer;
}
}
this.currentBufferIndex = (this.currentBufferIndex + 1) % this.numBuffers;
return buffer;

// All buffers are in use, create a new one (expand pool)
const newBuffer = createBuffer(this.bufferSize, this.alignment);
newBuffer.addRef();
return newBuffer;
}
}
8 changes: 8 additions & 0 deletions lib/block-read-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,16 @@ export class BlockReadStream extends Readable {
this.bytesRead += bytesRead;
if (bytesRead !== 0) {
this.push(buffer.slice(0, bytesRead));
// Release buffer reference after slice is pushed downstream
if (isAlignedLockableBuffer(buffer)) {
buffer.release();
}
} else {
this.push(null);
// Release buffer reference even for empty read
if (isAlignedLockableBuffer(buffer)) {
buffer.release();
}
}
} catch (error) {
this.emit('error', error);
Expand Down
Loading