Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ brokerServiceURLTLS=
brokerWebServiceURL=
brokerWebServiceURLTLS=

# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache
# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName.
topicNameCacheMaxCapacity=100000

# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when
# there are too many topics are in use.
maxSecondsToClearTopicNameCache=7200

# If function workers are setup in a separate cluster, configure the following 2 settings. This url should point to
# the discovery service provider of the function workers cluster, and does not support multi urls yet.
functionWorkerWebServiceURL=
Expand Down
8 changes: 8 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ maxHttpServerConnections=2048
# Max concurrent web requests
maxConcurrentHttpRequests=1024

# Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache
# per "maxSecondsToClearTopicNameCache", it does not mean broker will not cache TopicName.
topicNameCacheMaxCapacity=100000

# A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache frequently when
# there are too many topics are in use.
maxSecondsToClearTopicNameCache=7200

### --- Authentication --- ###

# Enable authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ protected void startClearInvalidateTopicNameCacheTask() {
maxSecondsToClearTopicNameCache,
maxSecondsToClearTopicNameCache,
TimeUnit.SECONDS);
TopicName.setEvictCacheByScheduledTask(true);
}

protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.Codec;

Expand All @@ -37,6 +39,8 @@ public class TopicName implements ServiceUnitId {

public static final String PARTITIONED_TOPIC_SUFFIX = "-partition-";

public static final int CLIENT_MAX_TOPIC_NAME_CACHE = 10_000;

private final String completeTopicName;

private final TopicDomain domain;
Expand All @@ -51,6 +55,10 @@ public class TopicName implements ServiceUnitId {

private static final ConcurrentHashMap<String, TopicName> cache = new ConcurrentHashMap<>();

@Getter
@Setter
private static boolean evictCacheByScheduledTask = false;

public static void clearIfReachedMaxCapacity(int maxCapacity) {
if (maxCapacity < 0) {
// Unlimited cache.
Expand Down Expand Up @@ -82,7 +90,11 @@ public static TopicName get(String topic) {
if (tp != null) {
return tp;
}
return cache.computeIfAbsent(topic, k -> new TopicName(k));
tp = cache.computeIfAbsent(topic, k -> new TopicName(k));
if (!evictCacheByScheduledTask) {
clearIfReachedMaxCapacity(CLIENT_MAX_TOPIC_NAME_CACHE);
}
return tp;
}

public static TopicName getPartitionedTopicName(String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,21 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private boolean zooKeeperAllowReadOnlyOperations;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = "Max capacity of the topic name cache. -1 means unlimited cache; 0 means broker will clear all cache"
+ " per maxSecondsToClearTopicNameCache, it does not mean broker will not cache TopicName."
)
private int topicNameCacheMaxCapacity = 100_000;

@FieldContext(
category = CATEGORY_SERVER,
doc = "A Specifies the minimum number of seconds that the topic name stays in memory, to avoid clear cache"
+ " frequently when there are too many topics are in use."
)
private int maxSecondsToClearTopicNameCache = 3600 * 2;

@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "If does not set metadataStoreUrl or configurationMetadataStoreUrl, this url should point to the"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.netty.DnsResolverUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand Down Expand Up @@ -236,7 +237,6 @@ public void start() throws Exception {
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));

Class<? extends ServerSocketChannel> serverSocketChannelClass =
EventLoopUtil.getServerSocketChannelClass(workerGroup);
bootstrap.channel(serverSocketChannelClass);
Expand All @@ -246,6 +246,14 @@ public void start() throws Exception {
&& EpollServerSocketChannel.class.isAssignableFrom(serverSocketChannelClass)) {
proxyZeroCopyModeEnabled = true;
}
// Start the task the clean topic name object cache.
final int maxSecondsToClearTopicNameCache = proxyConfig.getMaxSecondsToClearTopicNameCache();
workerGroup.scheduleAtFixedRate(
() -> TopicName.clearIfReachedMaxCapacity(proxyConfig.getTopicNameCacheMaxCapacity()),
maxSecondsToClearTopicNameCache,
maxSecondsToClearTopicNameCache,
TimeUnit.SECONDS);
TopicName.setEvictCacheByScheduledTask(true);

bootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, false, null));
// Bind and start to accept incoming connections.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public void testBackwardCompatibility() throws IOException {
printWriter.println("zookeeperSessionTimeoutMs=60");
printWriter.println("zooKeeperCacheExpirySeconds=500");
printWriter.println("httpMaxRequestHeaderSize=1234");
printWriter.println("topicNameCacheMaxCapacity=20000");
printWriter.println("maxSecondsToClearTopicNameCache=1800");
}
testConfigFile.deleteOnExit();
InputStream stream = new FileInputStream(testConfigFile);
Expand All @@ -81,6 +83,8 @@ public void testBackwardCompatibility() throws IOException {
assertEquals(serviceConfig.getMetadataStoreSessionTimeoutMillis(), 60);
assertEquals(serviceConfig.getMetadataStoreCacheExpirySeconds(), 500);
assertEquals(serviceConfig.getHttpMaxRequestHeaderSize(), 1234);
assertEquals(serviceConfig.getTopicNameCacheMaxCapacity(), 20000);
assertEquals(serviceConfig.getMaxSecondsToClearTopicNameCache(), 1800);

testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties");
if (testConfigFile.exists()) {
Expand All @@ -91,13 +95,17 @@ public void testBackwardCompatibility() throws IOException {
printWriter.println("metadataStoreCacheExpirySeconds=500");
printWriter.println("zooKeeperSessionTimeoutMillis=-1");
printWriter.println("zooKeeperCacheExpirySeconds=-1");
printWriter.println("topicNameCacheMaxCapacity=200");
printWriter.println("maxSecondsToClearTopicNameCache=900");
}
testConfigFile.deleteOnExit();
stream = new FileInputStream(testConfigFile);
serviceConfig = PulsarConfigurationLoader.create(stream, ProxyConfiguration.class);
stream.close();
assertEquals(serviceConfig.getMetadataStoreSessionTimeoutMillis(), 60);
assertEquals(serviceConfig.getMetadataStoreCacheExpirySeconds(), 500);
assertEquals(serviceConfig.getTopicNameCacheMaxCapacity(), 200);
assertEquals(serviceConfig.getMaxSecondsToClearTopicNameCache(), 900);

testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties");
if (testConfigFile.exists()) {
Expand All @@ -108,13 +116,17 @@ public void testBackwardCompatibility() throws IOException {
printWriter.println("metadataStoreCacheExpirySeconds=30");
printWriter.println("zookeeperSessionTimeoutMs=100");
printWriter.println("zooKeeperCacheExpirySeconds=300");
printWriter.println("topicNameCacheMaxCapacity=100");
printWriter.println("maxSecondsToClearTopicNameCache=100");
}
testConfigFile.deleteOnExit();
stream = new FileInputStream(testConfigFile);
serviceConfig = PulsarConfigurationLoader.create(stream, ProxyConfiguration.class);
stream.close();
assertEquals(serviceConfig.getMetadataStoreSessionTimeoutMillis(), 100);
assertEquals(serviceConfig.getMetadataStoreCacheExpirySeconds(), 300);
assertEquals(serviceConfig.getTopicNameCacheMaxCapacity(), 100);
assertEquals(serviceConfig.getMaxSecondsToClearTopicNameCache(), 100);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.internal.PropertiesUtils;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
Expand Down Expand Up @@ -128,6 +129,14 @@ public void start() throws PulsarServerException, PulsarClientException, Malform
throw new PulsarServerException(e);
}
}
// Start the task the clean topic name object cache.
final int maxSecondsToClearTopicNameCache = config.getMaxSecondsToClearTopicNameCache();
executor.scheduleAtFixedRate(
() -> TopicName.clearIfReachedMaxCapacity(config.getTopicNameCacheMaxCapacity()),
maxSecondsToClearTopicNameCache,
maxSecondsToClearTopicNameCache,
TimeUnit.SECONDS);
TopicName.setEvictCacheByScheduledTask(true);

log.info("Pulsar WebSocket Service started");
}
Expand Down
Loading