Skip to content

Commit 1469b85

Browse files
committed
[fix][broker] Ignore and remove the replicator cursor when the remote cluster is absent
### Motivation Sometimes when a remote cluster is deleted, the replication cursor might still exist for some topics. In this case, creating producers or consumers on these topics will fail. Here is a log observed in a production environment: > WARN org.apache.pulsar.broker.service.BrokerService - Replication or > dedup check failed. Removing topic from topics list > persistent://public/__kafka/__consumer_offsets-partition-40, > java.util.concurrent.CompletionException: java.lang.RuntimeException: > org.apache.pulsar.metadata.api.MetadataStoreException$NotFoundException: > kop If it happened, unloading the topic or restarting the broker could not help. We have to remove the cursor manually. ### Modificatons When initializing a `PersistentTopic`, if there is any replicator cursor while the responding cluster does not exist, ignore the exception from `addReplicationCluster`. Then, remove this "zombie" cursor. ### Verifications `PersistentTopicTest#testCreateTopicWithZombieReplicatorCursor` is added to verify `PersistentTopic#initialize` will succeed and the zombie replicator cursor will be removed.
1 parent 68c10ee commit 1469b85

File tree

2 files changed

+62
-1
lines changed

2 files changed

+62
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,38 @@ public CompletableFuture<Void> initialize() {
338338
if (cursor.getName().startsWith(replicatorPrefix)) {
339339
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
340340
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
341-
futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
341+
final CompletableFuture<Void> future = new CompletableFuture<>();
342+
addReplicationCluster(remoteCluster, cursor, localCluster).whenComplete((__, e) -> {
343+
if (e == null) {
344+
future.complete(null);
345+
} else {
346+
Throwable throwable = e;
347+
while (throwable.getCause() != null) {
348+
throwable = throwable.getCause();
349+
}
350+
if (throwable instanceof MetadataStoreException.NotFoundException
351+
&& throwable.getMessage().equals(remoteCluster)) {
352+
log.warn("[{}] Remote cluster '{}' is not found while there is a replicator cursor,"
353+
+ " remove cursor '{}'", topic, remoteCluster, cursor.getName());
354+
ledger.asyncDeleteCursor(cursor.getName(), new DeleteCursorCallback() {
355+
@Override
356+
public void deleteCursorComplete(Object ctx) {
357+
log.info("[{}] Deleted replicator cursor '{}'", topic, cursor.getName());
358+
}
359+
360+
@Override
361+
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
362+
log.error("[{}] Failed to delete the replicator cursor '{}'",
363+
topic, cursor.getName(), exception);
364+
}
365+
}, null);
366+
future.complete(null);
367+
} else {
368+
future.completeExceptionally(e);
369+
}
370+
}
371+
});
372+
futures.add(future);
342373
}
343374
}
344375
return FutureUtil.waitForAll(futures).thenCompose(__ ->

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,20 @@
4040
import java.nio.charset.StandardCharsets;
4141
import java.util.ArrayList;
4242
import java.util.Collection;
43+
import java.util.Collections;
44+
import java.util.HashSet;
4345
import java.util.List;
46+
import java.util.Set;
4447
import java.util.UUID;
4548
import java.util.concurrent.CompletableFuture;
4649
import java.util.concurrent.CountDownLatch;
4750
import java.util.concurrent.ExecutionException;
4851
import java.util.concurrent.TimeUnit;
4952
import java.util.concurrent.atomic.AtomicBoolean;
53+
import java.util.function.Supplier;
5054
import lombok.Cleanup;
5155
import org.apache.bookkeeper.client.LedgerHandle;
56+
import org.apache.bookkeeper.mledger.ManagedCursor;
5257
import org.apache.bookkeeper.mledger.ManagedLedger;
5358
import org.apache.pulsar.broker.service.BrokerService;
5459
import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -57,6 +62,7 @@
5762
import org.apache.pulsar.client.admin.PulsarAdminException;
5863
import org.apache.pulsar.client.api.Consumer;
5964
import org.apache.pulsar.client.api.Message;
65+
import org.apache.pulsar.client.api.MessageId;
6066
import org.apache.pulsar.client.api.MessageListener;
6167
import org.apache.pulsar.client.api.MessageRoutingMode;
6268
import org.apache.pulsar.client.api.Producer;
@@ -525,4 +531,28 @@ public void testDeleteTopicFail() throws Exception {
525531
makeDeletedFailed.set(false);
526532
persistentTopic.delete().get();
527533
}
534+
535+
@Test
536+
public void testCreateTopicWithZombieReplicatorCursor() throws Exception {
537+
final String topicName = "persistent://prop/ns-abc/testCreateTopicWithZombieReplicatorCursor";
538+
final String remoteCluster = "remote";
539+
admin.topics().createNonPartitionedTopic(topicName);
540+
admin.topics().createSubscription(topicName, conf.getReplicatorPrefix() + "." + remoteCluster,
541+
MessageId.earliest, true);
542+
543+
final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false)
544+
.get(3, TimeUnit.SECONDS).orElse(null);
545+
assertNotNull(topic);
546+
547+
final Supplier<Set<String>> getCursors = () -> {
548+
final Set<String> cursors = new HashSet<>();
549+
final Iterable<ManagedCursor> iterable = topic.getManagedLedger().getCursors();
550+
iterable.forEach(c -> cursors.add(c.getName()));
551+
return cursors;
552+
};
553+
assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster));
554+
topic.initialize().get(3, TimeUnit.SECONDS);
555+
Awaitility.await().atMost(3, TimeUnit.SECONDS)
556+
.until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext());
557+
}
528558
}

0 commit comments

Comments
 (0)