-
Couldn't load subscription status.
- Fork 140
Parallelize graph writes #542
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Before you submit for review:
If you did not complete any of these, then please explain below. |
| buffer.clear(); | ||
| var writer = new ByteBufferIndexWriter(buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ownership model and lifecycle for the buffer here is a bit ambiguous to me, especially because the buffer is passed into this class as a parameter. I wonder if we can make the ByteBufferIndexWriter hide some of the implementation details so that we do not have a buffer.clear() and the buffer management logic at the end of this method. Instead, we could make the ByteBufferIndexWriter manage all of that, by adding a clone and a reset method, perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated ByteBufferIndexWriter with CloneBuffer (to avoid confusion with Object.clone()) and updated reset to include clearing the buffer. Moved that logic out of NodeRecordTask to clarify ownership in latest commit.
| for (int newOrdinal = 0; newOrdinal <= maxOrdinal; newOrdinal++) { | ||
| final int ordinal = newOrdinal; | ||
| final long fileOffset = baseOffset + (long) ordinal * recordSize; | ||
|
|
||
| Future<NodeRecordTask.Result> future = executor.submit(() -> { | ||
| var view = viewPerThread.get(); | ||
| var buffer = bufferPerThread.get(); | ||
|
|
||
| var task = new NodeRecordTask( | ||
| ordinal, | ||
| ordinalMapper, | ||
| graph, | ||
| view, | ||
| inlineFeatures, | ||
| featureStateSuppliers, | ||
| recordSize, | ||
| fileOffset, | ||
| buffer | ||
| ); | ||
|
|
||
| return task.call(); | ||
| }); | ||
|
|
||
| futures.add(future); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this doesn't have any back pressure mechanism, and we need that to prevent excessive memory utilization. I think we might want to consider an implementation that uses semaphores to manage concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the latest commit I've added backpressure mechanisms to both this loop and the file channel write loop. I'm not entirely clear on what solution you had in mind vis-a-vis semaphores, but I am open to re-implementing in another fashion if you feel that the code could be improved.
|
|
||
| // result.data is already a copy made in NodeRecordTask to avoid | ||
| // race conditions with thread-local buffer reuse | ||
| afc.write(result.data, result.fileOffset).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The .get() here negates the usages of the async file channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use a sempahore to limit the number of write tasks in flight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this section has been rewritten to remove the get() call at this point in the code and to split the write process into sub-tasks. Again, I'm happy to go with a different implementation if the code can be improved but I'm not sure exactly what you had in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces parallel write capability for graph indexes to improve write throughput. The implementation maintains backward compatibility by defaulting to sequential writes unless explicitly enabled via withParallelWrites(true). Testing shows linear performance scaling with dataset size, with speedups consistent across different hardware configurations.
Key changes:
- Added
ParallelGraphWriterclass that orchestrates parallel L0 record building using thread pools and asynchronous file I/O - Introduced
ByteBufferIndexWriterfor in-memory record construction before bulk disk writes - Added
withParallelWrites(boolean)builder option toOnDiskGraphIndexWriter.Builder
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| ParallelWriteExample.java | Example demonstrating parallel vs sequential write usage patterns and benchmark comparison |
| Grid.java | Enables parallel writes in the production Grid.buildOnDisk method |
| ParallelGraphWriter.java | Core parallel writer implementation with thread pooling, memory-aware backpressure, and async I/O |
| OnDiskGraphIndexWriter.java | Updated to support optional parallel write mode via builder configuration |
| NodeRecordTask.java | Task implementation for building individual node records in parallel worker threads |
| GraphIndexWriterTypes.java | New enum defining available writer types (sequential vs parallel) |
| GraphIndexWriter.java | Added factory methods for creating appropriate writer builders based on type |
| ByteBufferIndexWriter.java | IndexWriter implementation for writing to ByteBuffers in memory |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java
Outdated
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexWriter.java
Outdated
Show resolved
Hide resolved
|
@MarkWolters this is very cool change! re:
Did you mean parallel writes will only be used when the withParallelWrites(true) option is set? |
jvector-base/src/main/java/io/github/jbellis/jvector/disk/ByteBufferIndexWriter.java
Outdated
Show resolved
Hide resolved
jvector-examples/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelWriteExample.java
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexWriter.java
Outdated
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/OnDiskGraphIndexWriter.java
Outdated
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java
Outdated
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java
Outdated
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java
Outdated
Show resolved
Hide resolved
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelGraphWriter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added more comments, primarily around two aspects:
- Calculation of buffer - Seems like we need to consider alignment and hardware parallelism to reduce buffer sizes.
- Batching and GC and malloc overhead - It seems as if we are creating a task per ordinal, which seems like something that could be somewhat expensive on memory allocation and GC overhead.
| * This task is designed to be executed in a thread pool, with each worker thread | ||
| * owning its own ImmutableGraphIndex.View for thread-safe neighbor iteration. | ||
| */ | ||
| class NodeRecordTask implements Callable<NodeRecordTask.Result> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be the memory impact of creating this object for each write operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That depends on dataset size and vector dimensionality. But the number of NodeRecordTasks in existence at any time is bounded by a buffer that gets sized based on available memory as a back pressure mechanism.
jvector-base/src/main/java/io/github/jbellis/jvector/disk/ByteBufferIndexWriter.java
Outdated
Show resolved
Hide resolved
jvector-examples/src/main/java/io/github/jbellis/jvector/graph/disk/ParallelWriteExample.java
Show resolved
Hide resolved
| byte[] bytes = s.getBytes("UTF-8"); | ||
| int utflen = bytes.length; | ||
|
|
||
| if (utflen > 65535) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this number? 65535
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UTF format stores the string length as a 2-byte (16-bit) unsigned integer prefix, which has a maximum value of 65535
jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriterTypes.java
Show resolved
Hide resolved
| @Override | ||
| public Result call() throws Exception { | ||
| // Writer automatically clears buffer on construction | ||
| var writer = new ByteBufferIndexWriter(buffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not make the ByteBufferIndexWriter thread local and avoid recreating it on every call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's a significant difference. The constructor is trivially cheap, it just stores the buffer reference and initial position, and the real cost (the BytBuffer allocation) is already thread local. ByteBufferIndexWrite itself is tiny, only 2 fields, so allocation cost is negligible.
| * | ||
| * @return the number of records to buffer before writing | ||
| */ | ||
| private int calculateOptimalBufferSize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we take graph properties to this calculation to consider alignment with device blocks?
Also if we know the supported parallelism of the hardware, can leverage that as well to reduce the buffer size in the calculation to achieve the minimal queue depth?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we take graph properties to this calculation to consider alignment with device blocks? - I'm afraid I don't really understand what you are asking here, can you point me to an example of what you are referring to, or go into a little more detail?
| final long fileOffset = baseOffset + (long) ordinal * recordSize; | ||
|
|
||
| // Submit task to build this record | ||
| Future<NodeRecordTask.Result> future = executor.submit(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will create a new future object for each ordinal? Should we batch those to prevent GC and memory allocation overhead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is batched. The buffer is checked and, if full, written and cleared on lines 190 - 193.
if (futures.size() >= bufferSize) { writeRecordsAsync(futures); futures.clear(); }
This PR adds the option to specify that graph indexes should be written in parallel rather than sequentially. By default the existing sequential write behavior is maintained, parallel writes will only be used when the withParallelWrites(true) option is set through the OnDiskGraphIndexWriter.Builder class. Testing results below show the speedup achieved in the write phase across a number of cores. These gains appear to scale linearly with respect to dataset size (ie writing a dataset of 10 million records will take about 10x as long as a dataset of 1 million records but the speedup in parallel v sequential is roughly equal).
ETA: Testing also showed that the performance gains using a "prod-like" i3.4xlarge w/16 vCPUs and 8 disks striped RAID0 were roughly equivalent to the performance gains using a 64 vCPU m5.16xlarge with standard SSD