Skip to content

Conversation

@BewareMyPower
Copy link
Contributor

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@BewareMyPower BewareMyPower added this to the 4.1.0 milestone Jun 21, 2025
@github-actions github-actions bot added the PIP label Jun 21, 2025
@BewareMyPower BewareMyPower self-assigned this Jun 21, 2025
@github-actions github-actions bot added the doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. label Jun 21, 2025
@BewareMyPower BewareMyPower changed the title [pip] PIP-429: Client-Side Computation of Last Compacted Message ID [improve][pip] PIP-429: Client-Side Computation of Last Compacted Message ID Jun 21, 2025
Copy link
Contributor

@gaoran10 gaoran10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to make changes on the server side for several reasons.

  • Broker can handle this issue well and provide a compatibility solution.
  • Add a flag for the valid compacted position can avoid deserializing and uncompressing the entire batch messages metadata header.
  • If making this change at the client-side, it increases the complexity for the client, and all language clients need to change.
  • It's weird to return EntryBuffer when getting the last message ID.
  • Actually, Pulsar only needs to retain valid message data in a compacted entry, but it retains all compacted messages with "empty header" and "empty payload".

@BewareMyPower
Copy link
Contributor Author

If making this change at the client-side, it increases the complexity for the client, and all language clients need to change.

Please check the Backward & Forward Compatibility section

When an older client sends a GetLastMessageId request to a newer broker, the broker computes the last message ID using its standard process. To ensure backward compatibility, the broker excludes the payload buffer from the response, aligning with the expectations of older clients.

@BewareMyPower
Copy link
Contributor Author

Actually, Pulsar only needs to retain valid message data in a compacted entry, but it retains all compacted messages with "empty header" and "empty payload".

Yes. Unfortunately, it's limited by client side's logic that the default logic to parse the payload buffer looks like:

int batchSize = msgMetadata.getNumMessagesInBatch();
for (int i = 0; i < batchSize; i++) {
    int batchIndex = i;
    final var singleMessageMetadata = parse(payload);
    if (singleMessageMetadata.isCompactedOut()) {
        break;
    }
    // Create a message, whose batch index is i, from the payload buffer
}

Add a flag for the valid compacted position can avoid deserializing and uncompressing the entire batch messages metadata header.

This makes sense to me. Adding a new field to MessageMetadata might be more simple. But we need to take care of the compatibility issue. We can simplify the payload format when this field is set. Let me try a demo first.

@BewareMyPower BewareMyPower marked this pull request as draft June 23, 2025 02:27
@coderzc
Copy link
Member

coderzc commented Jun 23, 2025

I prefer to make changes on the server side for several reasons.

  • Broker can handle this issue well and provide a compatibility solution.
  • Add a flag for the valid compacted position can avoid deserializing and uncompressing the entire batch messages metadata header.
  • If making this change at the client-side, it increases the complexity for the client, and all language clients need to change.
  • It's weird to return EntryBuffer when getting the last message ID.
  • Actually, Pulsar only needs to retain valid message data in a compacted entry, but it retains all compacted messages with "empty header" and "empty payload".

If we purpose are to ensure that the compaction task is successful, we only need to check lastCompactedPosition and lastDispatchablePosition before trigger compaction task. If lastDispatchablePosition <= lastCompactedPosition, then I just need to skip the compact task because there is no new data need to be compacted.

If we need make the Pulsar reader to read Kafka format data, then we need this change.

@BewareMyPower BewareMyPower changed the title [improve][pip] PIP-429: Client-Side Computation of Last Compacted Message ID [improve][pip] PIP-429: Optimize Handling of Compacted Last Entry by Skipping Payload Buffer Parsing Jun 23, 2025
@BewareMyPower
Copy link
Contributor Author

If we need make the Pulsar reader to read Kafka format data, then we need this change.

No. We need to ensure the GetLastMessageId RPC works even if the last entry is compacted with a non-default payload format.

The consumer is able to configure a MessagePayloadProcessor to parse the message of different format. The reader does not have the configuration and I will use another PR to add it.

@BewareMyPower BewareMyPower marked this pull request as ready for review June 24, 2025 05:01
Copy link
Member

@dao-jun dao-jun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, good change!

@BewareMyPower BewareMyPower merged commit 516ce04 into apache:master Jul 16, 2025
20 checks passed
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. PIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants