Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,11 @@ public synchronized CompletableFuture<AutomqGetPartitionSnapshotResponse> snapsh
} else {
collectPartitionSnapshotsCf = CompletableFuture.completedFuture(null);
}
boolean newSession = finalSessionEpoch == 1;
return collectPartitionSnapshotsCf
.thenApply(nil -> {
if (requestVersion > ZERO_ZONE_V0_REQUEST_VERSION) {
if (finalSessionEpoch == 1) {
if (newSession) {
// return the WAL config in the session first response
resp.setConfirmWalConfig(confirmWalConfig);
}
Expand All @@ -187,7 +188,12 @@ public synchronized CompletableFuture<AutomqGetPartitionSnapshotResponse> snapsh
if (requestCommit) {
// Commit after generating the snapshots.
// Then the snapshot-read partitions could read from snapshot-read cache or block cache.
CompletableFuture<Void> commitCf = confirmWAL.commit(0, false);
CompletableFuture<Void> commitCf = newSession ?
// The proxy node's first snapshot-read request needs to commit immediately to ensure the data could be read.
confirmWAL.commit(0, false)
// The proxy node's snapshot-read cache isn't enough to hold the 'uncommitted' data,
// so the proxy node request a commit to ensure the data could be read from block cache.
: confirmWAL.commit(1000, false);
inflightCommitCfSet.add(commitCf);
commitCf.whenComplete((rst, ex) -> inflightCommitCfSet.remove(commitCf));
}
Expand All @@ -203,7 +209,8 @@ public synchronized boolean expired() {
return time.milliseconds() - lastGetSnapshotsTimestamp > 60000;
}

private CompletableFuture<Void> collectPartitionSnapshots(short requestVersion, AutomqGetPartitionSnapshotResponseData resp) {
private CompletableFuture<Void> collectPartitionSnapshots(short requestVersion,
AutomqGetPartitionSnapshotResponseData resp) {
Map<Uuid, List<PartitionSnapshot>> topic2partitions = new HashMap<>();
List<CompletableFuture<Void>> completeCfList = COMPLETE_CF_LIST_LOCAL.get();
completeCfList.clear();
Expand Down Expand Up @@ -239,7 +246,8 @@ private CompletableFuture<Void> collectPartitionSnapshots(short requestVersion,
return retCf;
}

private PartitionSnapshot snapshot(short requestVersion, Partition partition, PartitionSnapshotVersion oldVersion,
private PartitionSnapshot snapshot(short requestVersion, Partition partition,
PartitionSnapshotVersion oldVersion,
PartitionSnapshotVersion newVersion, List<CompletableFuture<Void>> completeCfList) {
if (newVersion == null) {
// partition is closed
Expand Down
55 changes: 34 additions & 21 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -765,21 +765,41 @@ private boolean hasInflightForceUploadTask() {
private CompletableFuture<Void> lazyUpload(LazyCommit lazyCommit) {
lazyUploadQueue.add(lazyCommit);
backgroundExecutor.schedule(() -> {
if (!lazyCommit.cf.isDone()) {
forceUpload();
if (lazyUploadQueue.contains(lazyCommit)) {
// If the queue does not contain the lazyCommit, it means another commit has happened after this lazyCommit.
if (lazyCommit.lazyLingerMs == 0) {
// If the lazyLingerMs is 0, we need to force upload as soon as possible.
forceUpload();
} else {
uploadDeltaWAL();
}
}
}, lazyCommit.lazyLingerMs, TimeUnit.MILLISECONDS);
return lazyCommit.cf;
return lazyCommit.awaitTrim ? lazyCommit.trimCf : lazyCommit.commitCf;
}

private void completeLazyUpload(List<LazyCommit> tasks, Throwable ex) {
tasks.forEach(task -> {
if (ex != null) {
task.cf.completeExceptionally(ex);
} else {
task.cf.complete(null);
}
});
private void notifyLazyUpload(List<LazyCommit> tasks) {
CompletableFuture.allOf(inflightWALUploadTasks.stream().map(t -> t.cf).collect(Collectors.toList()).toArray(new CompletableFuture[0]))
.whenComplete((nil, ex) -> {
for (LazyCommit task : tasks) {
if (ex != null) {
task.commitCf.completeExceptionally(ex);
} else {
task.commitCf.complete(null);
}
}
});

CompletableFuture.allOf(inflightWALUploadTasks.stream().map(t -> t.trimCf).collect(Collectors.toList()).toArray(new CompletableFuture[0]))
.whenComplete((nil, ex) -> {
for (LazyCommit task : tasks) {
if (ex != null) {
task.trimCf.completeExceptionally(ex);
} else {
task.trimCf.complete(null);
}
}
});
}

private CompletableFuture<Void> forceUpload() {
Expand Down Expand Up @@ -875,15 +895,7 @@ CompletableFuture<Void> uploadDeltaWAL(long streamId, boolean force) {
}

// notify lazy upload tasks
CompletableFuture.allOf(inflightWALUploadTasks.stream().map(t -> t.cf).collect(Collectors.toList()).toArray(new CompletableFuture[0]))
.whenComplete((nil, ex) -> {
completeLazyUpload(lazyUploadTasks.stream().filter(t -> !t.awaitTrim).collect(Collectors.toList()), ex);
});

CompletableFuture.allOf(inflightWALUploadTasks.stream().map(t -> t.trimCf).collect(Collectors.toList()).toArray(new CompletableFuture[0]))
.whenComplete((nil, ex) -> {
completeLazyUpload(lazyUploadTasks.stream().filter(t -> t.awaitTrim).collect(Collectors.toList()), ex);
});
notifyLazyUpload(lazyUploadTasks);
return cf;
}

Expand Down Expand Up @@ -1087,7 +1099,8 @@ public RecoveryBlockResult(LogCache.LogCacheBlock cacheBlock, RuntimeException e
}

public static class LazyCommit {
final CompletableFuture<Void> cf = new CompletableFuture<>();
final CompletableFuture<Void> trimCf = new CompletableFuture<>();
final CompletableFuture<Void> commitCf = new CompletableFuture<>();
final long lazyLingerMs;
final boolean awaitTrim;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,22 @@ public CompletableFuture<Void> consume(ThrottleStrategy throttleStrategy, long s
forceConsume(size);
cf.complete(null);
} else {
boolean satisfied = false;
lock.lock();
try {
if (availableTokens.get() <= 0 || !queuedCallbacks.isEmpty()) {
queuedCallbacks.offer(new BucketItem(throttleStrategy, size, cf));
condition.signalAll();
} else {
reduceToken(size);
cf.complete(null);
satisfied = true;
}
} finally {
lock.unlock();
}
if (satisfied) {
cf.complete(null);
}
}
return cf;
}
Expand Down
Loading