Skip to content

Commit 68c10ee

Browse files
authored
[feat][broker][PIP-195] Add metrics for bucket delayed message tracker (apache#19716)
1 parent 55523ac commit 68c10ee

File tree

21 files changed

+532
-57
lines changed

21 files changed

+532
-57
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
366366
private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds = 300;
367367

368368
@FieldContext(category = CATEGORY_SERVER, doc = """
369-
The max number of delayed message index in per bucket snapshot segment, -1 means no limitation\
369+
The max number of delayed message index in per bucket snapshot segment, -1 means no limitation, \
370370
after reaching the max number limitation, the snapshot segment will be cut off.""")
371371
private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment = 5000;
372372

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,6 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
6767
*/
6868
boolean shouldPauseAllDeliveries();
6969

70-
/**
71-
* Tells whether this DelayedDeliveryTracker contains this message index,
72-
* if the tracker is not supported it or disabled this feature also will return false.
73-
*/
74-
boolean containsMessage(long ledgerId, long entryId);
75-
7670
/**
7771
* Reset tick time use zk policies cache.
7872
* @param tickTime

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,6 @@ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead
178178
&& !hasMessageAvailable();
179179
}
180180

181-
@Override
182-
public boolean containsMessage(long ledgerId, long entryId) {
183-
return false;
184-
}
185-
186181
protected long nextDeliveryTime() {
187182
return priorityQueue.peekN1();
188183
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -67,25 +67,21 @@ public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMet
6767
@Override
6868
public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
6969
return openLedger(bucketId).thenCompose(
70-
ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
70+
ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
7171
thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
7272
}
7373

7474
@Override
7575
public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
7676
long lastSegmentEntryId) {
7777
return openLedger(bucketId).thenCompose(
78-
ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId,
78+
ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId,
7979
lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
8080
}
8181

8282
@Override
8383
public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
84-
return openLedger(bucketId).thenApply(ledgerHandle -> {
85-
long length = ledgerHandle.getLength();
86-
closeLedger(ledgerHandle);
87-
return length;
88-
});
84+
return openLedger(bucketId).thenApply(LedgerHandle::getLength);
8985
}
9086

9187
@Override
@@ -212,8 +208,8 @@ private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, byte[] data)
212208
});
213209
}
214210

215-
CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntryThenCloseLedger(LedgerHandle ledger,
216-
long firstEntryId, long lastEntryId) {
211+
CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntry(LedgerHandle ledger,
212+
long firstEntryId, long lastEntryId) {
217213
final CompletableFuture<Enumeration<LedgerEntry>> future = new CompletableFuture<>();
218214
ledger.asyncReadEntries(firstEntryId, lastEntryId,
219215
(rc, handle, entries, ctx) -> {
@@ -222,7 +218,6 @@ CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntryThenCloseLedger(Ledger
222218
} else {
223219
future.complete(entries);
224220
}
225-
closeLedger(handle);
226221
}, null
227222
);
228223
return future;

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
5858
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
5959
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
60+
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
6061
import org.apache.pulsar.common.util.FutureUtil;
6162
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
6263
import org.roaringbitmap.RoaringBitmap;
@@ -69,6 +70,10 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
6970

7071
static final int AsyncOperationTimeoutSeconds = 60;
7172

73+
private static final Long INVALID_BUCKET_ID = -1L;
74+
75+
private static final int MAX_MERGE_NUM = 4;
76+
7277
private final long minIndexCountPerBucket;
7378

7479
private final long timeStepPerBucketSnapshotSegmentInMillis;
@@ -93,9 +98,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
9398

9499
private final Table<Long, Long, ImmutableBucket> snapshotSegmentLastIndexTable;
95100

96-
private static final Long INVALID_BUCKET_ID = -1L;
97-
98-
private static final int MAX_MERGE_NUM = 4;
101+
private final BucketDelayedMessageIndexStats stats;
99102

100103
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
101104
Timer timer, long tickTimeMillis,
@@ -125,6 +128,7 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat
125128
this.lastMutableBucket =
126129
new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), FutureUtil.Sequencer.create(),
127130
bucketSnapshotStorage);
131+
this.stats = new BucketDelayedMessageIndexStats();
128132
this.numberDelayedMessages = recoverBucketSnapshot();
129133
}
130134

@@ -161,8 +165,9 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException {
161165
}
162166

163167
try {
164-
FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
168+
FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 2, TimeUnit.SECONDS);
165169
} catch (InterruptedException | ExecutionException | TimeoutException e) {
170+
log.error("[{}] Failed to recover delayed message index bucket snapshot.", dispatcher.getName(), e);
166171
if (e instanceof InterruptedException) {
167172
Thread.currentThread().interrupt();
168173
}
@@ -193,7 +198,7 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException {
193198
ImmutableBucket immutableBucket = mapEntry.getValue();
194199
immutableBucketMap.remove(key);
195200
// delete asynchronously without waiting for completion
196-
immutableBucket.asyncDeleteBucketSnapshot();
201+
immutableBucket.asyncDeleteBucketSnapshot(stats);
197202
}
198203

199204
MutableLong numberDelayedMessages = new MutableLong(0);
@@ -246,7 +251,8 @@ private Optional<ImmutableBucket> findImmutableBucket(long ledgerId) {
246251
return Optional.ofNullable(immutableBuckets.get(ledgerId));
247252
}
248253

249-
private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair) {
254+
private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair,
255+
long startTime) {
250256
if (immutableBucketDelayedIndexPair != null) {
251257
ImmutableBucket immutableBucket = immutableBucketDelayedIndexPair.getLeft();
252258
immutableBuckets.put(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId),
@@ -260,14 +266,19 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
260266
CompletableFuture<Long> future = createFuture.handle((bucketId, ex) -> {
261267
if (ex == null) {
262268
immutableBucket.setSnapshotSegments(null);
269+
immutableBucket.asyncUpdateSnapshotLength();
263270
log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(),
264271
immutableBucket.bucketKey());
272+
273+
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create,
274+
System.currentTimeMillis() - startTime);
275+
265276
return bucketId;
266277
}
267278

268-
//TODO Record create snapshot failed
269-
log.error("[{}] Failed to create bucket snapshot, bucketKey: {}",
270-
dispatcher.getName(), immutableBucket.bucketKey(), ex);
279+
log.error("[{}] Failed to create bucket snapshot, bucketKey: {}", dispatcher.getName(),
280+
immutableBucket.bucketKey(), ex);
281+
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.create);
271282

272283
// Put indexes back into the shared queue and downgrade to memory mode
273284
synchronized (BucketDelayedDeliveryTracker.this) {
@@ -311,12 +322,14 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
311322
if (!existBucket && ledgerId > lastMutableBucket.endLedgerId
312323
&& lastMutableBucket.size() >= minIndexCountPerBucket
313324
&& !lastMutableBucket.isEmpty()) {
325+
long createStartTime = System.currentTimeMillis();
326+
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
314327
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
315328
lastMutableBucket.sealBucketAndAsyncPersistent(
316329
this.timeStepPerBucketSnapshotSegmentInMillis,
317330
this.maxIndexesPerBucketSnapshotSegment,
318331
this.sharedBucketPriorityQueue);
319-
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
332+
afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
320333
lastMutableBucket.resetLastMutableBucketRange();
321334

322335
if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
@@ -374,7 +387,7 @@ private synchronized List<ImmutableBucket> selectMergedBuckets(final List<Immuta
374387
}
375388

376389
if (minIndex >= 0) {
377-
return values.subList(minIndex, minIndex + MAX_MERGE_NUM);
390+
return values.subList(minIndex, minIndex + mergeNum);
378391
} else if (mergeNum > 2){
379392
return selectMergedBuckets(values, mergeNum - 1);
380393
} else {
@@ -400,6 +413,9 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
400413
for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
401414
immutableBucket.merging = true;
402415
}
416+
417+
long mergeStartTime = System.currentTimeMillis();
418+
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.merge);
403419
return asyncMergeBucketSnapshot(toBeMergeImmutableBuckets).whenComplete((__, ex) -> {
404420
synchronized (this) {
405421
for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
@@ -409,9 +425,14 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
409425
if (ex != null) {
410426
log.error("[{}] Failed to merge bucket snapshot, bucketKeys: {}",
411427
dispatcher.getName(), bucketsStr, ex);
428+
429+
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.merge);
412430
} else {
413431
log.info("[{}] Merge bucket snapshot finish, bucketKeys: {}, bucketNum: {}",
414432
dispatcher.getName(), bucketsStr, immutableBuckets.asMapOfRanges().size());
433+
434+
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge,
435+
System.currentTimeMillis() - mergeStartTime);
415436
}
416437
});
417438
}
@@ -436,6 +457,8 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(List<Immut
436457
})
437458
.thenAccept(combinedDelayedIndexQueue -> {
438459
synchronized (BucketDelayedDeliveryTracker.this) {
460+
long createStartTime = System.currentTimeMillis();
461+
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
439462
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
440463
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
441464
timeStepPerBucketSnapshotSegmentInMillis,
@@ -461,12 +484,12 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(List<Immut
461484
}
462485
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
463486

464-
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
487+
afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
465488

466489
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
467490
.orElse(NULL_LONG_PROMISE).thenCompose(___ -> {
468491
List<CompletableFuture<Void>> removeFutures =
469-
buckets.stream().map(ImmutableBucket::asyncDeleteBucketSnapshot)
492+
buckets.stream().map(bucket -> bucket.asyncDeleteBucketSnapshot(stats))
470493
.toList();
471494
return FutureUtil.waitForAll(removeFutures);
472495
});
@@ -557,15 +580,17 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
557580

558581
if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) {
559582
immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
560-
bucket.asyncDeleteBucketSnapshot();
583+
bucket.asyncDeleteBucketSnapshot(stats);
561584
continue;
562585
}
563586

587+
long loadStartTime = System.currentTimeMillis();
588+
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
564589
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
565590
if (CollectionUtils.isEmpty(indexList)) {
566591
immutableBuckets.asMapOfRanges()
567592
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
568-
bucket.asyncDeleteBucketSnapshot();
593+
bucket.asyncDeleteBucketSnapshot(stats);
569594
return;
570595
}
571596
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
@@ -583,9 +608,14 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
583608

584609
log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}",
585610
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex);
611+
612+
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
586613
} else {
587614
log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}",
588615
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1);
616+
617+
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
618+
System.currentTimeMillis() - loadStartTime);
589619
}
590620
}).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 1), TimeUnit.SECONDS);
591621
} catch (Exception e) {
@@ -645,7 +675,7 @@ private CompletableFuture<Void> cleanImmutableBuckets() {
645675
Iterator<ImmutableBucket> iterator = immutableBuckets.asMapOfRanges().values().iterator();
646676
while (iterator.hasNext()) {
647677
ImmutableBucket bucket = iterator.next();
648-
futures.add(bucket.clear());
678+
futures.add(bucket.clear(stats));
649679
numberDelayedMessages -= bucket.getNumberBucketDelayedMessages();
650680
iterator.remove();
651681
}
@@ -661,7 +691,6 @@ private boolean removeIndexBit(long ledgerId, long entryId) {
661691
.orElse(false);
662692
}
663693

664-
@Override
665694
public boolean containsMessage(long ledgerId, long entryId) {
666695
if (lastMutableBucket.containsMessage(ledgerId, entryId)) {
667696
return true;
@@ -670,4 +699,15 @@ public boolean containsMessage(long ledgerId, long entryId) {
670699
return findImmutableBucket(ledgerId).map(bucket -> bucket.containsMessage(ledgerId, entryId))
671700
.orElse(false);
672701
}
702+
703+
public Map<String, TopicMetricBean> genTopicMetricMap() {
704+
stats.recordNumOfBuckets(immutableBuckets.asMapOfRanges().size() + 1);
705+
stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + this.lastMutableBucket.size());
706+
MutableLong totalSnapshotLength = new MutableLong();
707+
immutableBuckets.asMapOfRanges().values().forEach(immutableBucket -> {
708+
totalSnapshotLength.add(immutableBucket.getSnapshotLength());
709+
});
710+
stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue());
711+
return stats.genTopicMetricMap();
712+
}
673713
}

0 commit comments

Comments
 (0)