Skip to content

Commit 476e7e8

Browse files
committed
[improve][client] PIP-229: Add a common interface to get fields of MessageIdData
Master issue: apache#18950 ### Motivation We need a common interface to get fields of the MessageIdData. After that, we won't need to assert a MessageId implementation is an instance of a specific class. And we can pass our customized MessageId implementation to APIs like `acknowledge` and `seek`. ### Modifications - Add `MessageIdAdv` to get fields of `MessageIdData`, make all MessageId implementations inherit it (except `MultiMessageIdImpl`). - Add `MessageIdAdvUtils` for the most common used methods. - Replace `BatchMessageAcker` with the `BitSet` for ACK. - Remove `TopicMessageIdImpl#getInnerMessageId` since a `TopicMessageIdImpl` can be treated as its underlying `MessageId` implementation now. - Remove `instanceof BatchMessageIdImpl` checks in `pulsar-client` module by casting to `MessageIdAdv`. After this refactoring, the 3rd party library will no longer need to cast a `MessageId` to a specific implementation. It only needs to cast `MessageId` to `MessageIdAdv`. Users can also implement their own util class so the methods of `MessageIdAdvUtils` are all not public. ### Verifications Add `CustomMessageIdTest` to verify a simple MessageIdAdv implementation that only has the (ledger id, entry id, batch idx, batch size) fields also works for seek and acknowledgment.
1 parent 4ed8a87 commit 476e7e8

35 files changed

+671
-805
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@
4141
import org.apache.pulsar.client.api.ConsumerEventListener;
4242
import org.apache.pulsar.client.api.Message;
4343
import org.apache.pulsar.client.api.MessageId;
44+
import org.apache.pulsar.client.api.MessageIdAdv;
4445
import org.apache.pulsar.client.api.MessageRoutingMode;
4546
import org.apache.pulsar.client.api.Producer;
4647
import org.apache.pulsar.client.api.PulsarClientException;
4748
import org.apache.pulsar.client.api.SubscriptionType;
48-
import org.apache.pulsar.client.impl.MessageIdImpl;
4949
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
5050
import org.apache.pulsar.common.naming.TopicName;
5151
import org.apache.pulsar.common.util.FutureUtil;
@@ -370,8 +370,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
370370
}
371371
totalMessages++;
372372
consumer1.acknowledge(msg);
373-
MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
374-
receivedPtns.add(msgId.getPartitionIndex());
373+
receivedPtns.add(((MessageIdAdv) msg.getMessageId()).getPartitionIndex());
375374
}
376375

377376
assertTrue(Sets.difference(listener1.activePtns, receivedPtns).isEmpty());
@@ -387,8 +386,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
387386
}
388387
totalMessages++;
389388
consumer2.acknowledge(msg);
390-
MessageIdImpl msgId = MessageIdImpl.convertToMessageIdImpl(msg.getMessageId());
391-
receivedPtns.add(msgId.getPartitionIndex());
389+
receivedPtns.add(((MessageIdAdv) msg.getMessageId()).getPartitionIndex());
392390
}
393391
assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty());
394392
assertTrue(Sets.difference(listener2.activePtns, receivedPtns).isEmpty());

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ public void testSeekByFunction() throws Exception {
678678
if (message == null) {
679679
break;
680680
}
681-
received.add(MessageIdImpl.convertToMessageIdImpl(message.getMessageId()));
681+
received.add(message.getMessageId());
682682
}
683683
int msgNumFromPartition1 = list.size() / 2;
684684
int msgNumFromPartition2 = 1;
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertFalse;
23+
import static org.testng.Assert.assertNotNull;
24+
import static org.testng.Assert.assertTrue;
25+
import java.util.ArrayList;
26+
import java.util.concurrent.TimeUnit;
27+
import lombok.Cleanup;
28+
import org.testng.annotations.AfterClass;
29+
import org.testng.annotations.BeforeClass;
30+
import org.testng.annotations.DataProvider;
31+
import org.testng.annotations.Test;
32+
33+
@Test(groups = "broker-api")
34+
public class CustomMessageIdTest extends ProducerConsumerBase {
35+
36+
@BeforeClass
37+
@Override
38+
protected void setup() throws Exception {
39+
super.internalSetup();
40+
super.producerBaseSetup();
41+
}
42+
43+
@AfterClass(alwaysRun = true)
44+
@Override
45+
protected void cleanup() throws Exception {
46+
super.internalCleanup();
47+
}
48+
49+
@DataProvider
50+
public static Object[][] enableBatching() {
51+
return new Object[][]{
52+
{ true },
53+
{ false }
54+
};
55+
}
56+
57+
@Test
58+
public void testSeek() throws Exception {
59+
final var topic = "persistent://my-property/my-ns/test-seek-" + System.currentTimeMillis();
60+
@Cleanup final var producer = pulsarClient.newProducer(Schema.INT32).topic(topic).create();
61+
final var msgIds = new ArrayList<SimpleMessageIdImpl>();
62+
for (int i = 0; i < 10; i++) {
63+
msgIds.add(new SimpleMessageIdImpl((MessageIdAdv) producer.send(i)));
64+
}
65+
@Cleanup final var consumer = pulsarClient.newConsumer(Schema.INT32)
66+
.topic(topic).subscriptionName("sub").subscribe();
67+
consumer.seek(msgIds.get(6));
68+
final var msg = consumer.receive(3, TimeUnit.SECONDS);
69+
assertNotNull(msg);
70+
assertEquals(msg.getValue(), 7);
71+
}
72+
73+
@Test(dataProvider = "enableBatching")
74+
public void testAcknowledgment(boolean enableBatching) throws Exception {
75+
final var topic = "persistent://my-property/my-ns/test-ack-"
76+
+ enableBatching + System.currentTimeMillis();
77+
final var producer = pulsarClient.newProducer(Schema.INT32)
78+
.topic(topic)
79+
.enableBatching(enableBatching)
80+
.batchingMaxMessages(10)
81+
.batchingMaxPublishDelay(300, TimeUnit.MILLISECONDS)
82+
.create();
83+
final var consumer = pulsarClient.newConsumer(Schema.INT32)
84+
.topic(topic)
85+
.subscriptionName("sub")
86+
.enableBatchIndexAcknowledgment(true)
87+
.isAckReceiptEnabled(true)
88+
.subscribe();
89+
for (int i = 0; i < 10; i++) {
90+
producer.sendAsync(i);
91+
}
92+
final var msgIds = new ArrayList<SimpleMessageIdImpl>();
93+
for (int i = 0; i < 10; i++) {
94+
final var msg = consumer.receive();
95+
final var msgId = new SimpleMessageIdImpl((MessageIdAdv) msg.getMessageId());
96+
msgIds.add(msgId);
97+
if (enableBatching) {
98+
assertTrue(msgId.getBatchIndex() >= 0 && msgId.getBatchSize() > 0);
99+
} else {
100+
assertFalse(msgId.getBatchIndex() >= 0 && msgId.getBatchSize() > 0);
101+
}
102+
}
103+
consumer.acknowledgeCumulative(msgIds.get(8));
104+
consumer.redeliverUnacknowledgedMessages();
105+
final var msg = consumer.receive(3, TimeUnit.SECONDS);
106+
assertNotNull(msg);
107+
assertEquals(msg.getValue(), 9);
108+
}
109+
110+
private record SimpleMessageIdImpl(long ledgerId, long entryId, int batchIndex, int batchSize)
111+
implements MessageIdAdv {
112+
113+
public SimpleMessageIdImpl(MessageIdAdv msgId) {
114+
this(msgId.getLedgerId(), msgId.getEntryId(), msgId.getBatchIndex(), msgId.getBatchSize());
115+
}
116+
117+
@Override
118+
public byte[] toByteArray() {
119+
return new byte[0]; // never used
120+
}
121+
122+
@Override
123+
public long getLedgerId() {
124+
return ledgerId;
125+
}
126+
127+
@Override
128+
public long getEntryId() {
129+
return entryId;
130+
}
131+
132+
@Override
133+
public int getBatchIndex() {
134+
return batchIndex;
135+
}
136+
137+
@Override
138+
public int getBatchSize() {
139+
return batchSize;
140+
}
141+
}
142+
}

pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -767,7 +767,7 @@ public void testMessageIdForSubscribeToSinglePartition() throws Exception {
767767

768768
for (int i = 0; i < totalMessages; i ++) {
769769
msg = consumer1.receive(5, TimeUnit.SECONDS);
770-
Assert.assertEquals(MessageIdImpl.convertToMessageIdImpl(msg.getMessageId()).getPartitionIndex(), 2);
770+
Assert.assertEquals(((MessageIdAdv) msg.getMessageId()).getPartitionIndex(), 2);
771771
consumer1.acknowledge(msg);
772772
}
773773

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,10 @@ private AckTestData prepareDataForAck(String topic) throws PulsarClientException
176176
messageIds.add(message.getMessageId());
177177
}
178178
MessageId firstEntryMessageId = messageIds.get(0);
179-
MessageId secondEntryMessageId = ((BatchMessageIdImpl) messageIds.get(1)).toMessageIdImpl();
179+
MessageId secondEntryMessageId = MessageIdAdvUtils.discardBatch(messageIds.get(1));
180180
// Verify messages 2 to N must be in the same entry
181181
for (int i = 2; i < messageIds.size(); i++) {
182-
assertEquals(((BatchMessageIdImpl) messageIds.get(i)).toMessageIdImpl(), secondEntryMessageId);
182+
assertEquals(MessageIdAdvUtils.discardBatch(messageIds.get(i)), secondEntryMessageId);
183183
}
184184

185185
assertTrue(interceptor.individualAckedMessageIdList.isEmpty());

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,6 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException,
118118
Message<byte[]> message = consumer.receive();
119119
assertEquals(new String(message.getData()), messagePrefix + i);
120120
MessageId messageId = message.getMessageId();
121-
if (topicType == TopicType.PARTITIONED) {
122-
messageId = MessageIdImpl.convertToMessageIdImpl(messageId);
123-
}
124121
assertTrue(messageIds.remove(messageId), "Failed to receive message");
125122
}
126123
log.info("Remaining message IDs = {}", messageIds);
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api;
20+
21+
import java.util.BitSet;
22+
23+
/**
24+
* The {@link MessageId} interface provided for advanced users.
25+
* <p>
26+
* All built-in MessageId implementations should be able to be cast to MessageIdAdv.
27+
* </p>
28+
*/
29+
public interface MessageIdAdv extends MessageId {
30+
31+
/**
32+
* Get the ledger ID.
33+
*
34+
* @return the ledger ID
35+
*/
36+
long getLedgerId();
37+
38+
/**
39+
* Get the entry ID.
40+
*
41+
* @return the entry ID
42+
*/
43+
long getEntryId();
44+
45+
/**
46+
* Get the partition index.
47+
*
48+
* @return -1 if the message is from a non-partitioned topic, otherwise the non-negative partition index
49+
*/
50+
default int getPartitionIndex() {
51+
return -1;
52+
}
53+
54+
/**
55+
* Get the batch index.
56+
*
57+
* @return -1 if the message is not in a batch
58+
*/
59+
default int getBatchIndex() {
60+
return -1;
61+
}
62+
63+
/**
64+
* Get the batch size.
65+
*
66+
* @return 0 if the message is not in a batch
67+
*/
68+
default int getBatchSize() {
69+
return 0;
70+
}
71+
72+
/**
73+
* Get the BitSet that indicates which messages in the batch.
74+
*
75+
* @implNote The message IDs of a batch should share a BitSet. For example, given 3 messages in the same batch whose
76+
* size is 3, all message IDs of them should return "111" (i.e. a BitSet whose size is 3 and all bits are 1). If the
77+
* 1st message has been acknowledged, the returned BitSet should become "011" (i.e. the 1st bit become 0).
78+
*
79+
* @return null if the message is a non-batched message
80+
*/
81+
default BitSet getAckSet() {
82+
return null;
83+
}
84+
85+
/**
86+
* Get the message ID of the first chunk if the current message ID represents the position of a chunked message.
87+
*
88+
* @implNote A chunked message is distributed across different BookKeeper entries. The message ID of a chunked
89+
* message is composed of two message IDs that represent positions of the first and the last chunk. The message ID
90+
* itself represents the position of the last chunk.
91+
*
92+
* @return null if the message is not a chunked message
93+
*/
94+
default MessageIdAdv getFirstChunkMessageId() {
95+
return null;
96+
}
97+
98+
/**
99+
* The default implementation of {@link Comparable#compareTo(Object)}.
100+
*/
101+
default int compareTo(MessageId o) {
102+
if (!(o instanceof MessageIdAdv)) {
103+
throw new UnsupportedOperationException("Unknown MessageId type: "
104+
+ ((o != null) ? o.getClass().getName() : "null"));
105+
}
106+
final MessageIdAdv other = (MessageIdAdv) o;
107+
int result = Long.compare(this.getLedgerId(), other.getLedgerId());
108+
if (result != 0) {
109+
return result;
110+
}
111+
result = Long.compare(this.getEntryId(), other.getEntryId());
112+
if (result != 0) {
113+
return result;
114+
}
115+
// TODO: Correct the following compare logics, see https://github.com/apache/pulsar/pull/18981
116+
result = Integer.compare(this.getPartitionIndex(), other.getPartitionIndex());
117+
if (result != 0) {
118+
return result;
119+
}
120+
return Integer.compare(this.getBatchIndex(), other.getBatchIndex());
121+
}
122+
}

0 commit comments

Comments
 (0)