Skip to content

Conversation

@chirag-wadhwa5
Copy link
Collaborator

@chirag-wadhwa5 chirag-wadhwa5 commented Oct 30, 2025

This PR is part of
KIP-1226.

It introduces a new field called deliveryCompleteCount in
writeShareGroupState RPC and ShareUpdate and ShareSnapshot. The
share partition now maintains inFlightTerminalCount (introduced
here) and it uses this
exact value to pass to the writeState request for
deliveryCompleteCount.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka clients labels Oct 30, 2025
@apoorvmittal10 apoorvmittal10 added ci-approved and removed triage PRs from the community labels Oct 31, 2025
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, LGTM! I will wait for @AndrewJSchofield's review on this.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Just a few minor comments.

"type": "request",
"listeners": ["broker"],
"name": "WriteShareGroupStateRequest",
"validVersions": "0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can we have a comment to describe the versions. Here's my suggestion:

  // Version 0 is the initial version (KIP-932).
  //
  // Version 1 introduces DeliveryCompleteCount (KIP-1226).

"apiKey": 85,
"type": "response",
"name": "WriteShareGroupStateResponse",
"validVersions": "0",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, a version comment such as:

  // Version 0 is the initial version (KIP-932).
  //
  // Version 1 introduces DeliveryCompleteCount in the request (KIP-1226).

.setTopicsData(List.of(new TopicData<>(topicIdPartition.topicId(),
List.of(PartitionFactory.newPartitionStateBatchData(
topicIdPartition.partition(), stateEpoch, startOffset(), leaderEpoch, stateBatches))))
topicIdPartition.partition(), stateEpoch, startOffset(), inFlightTerminalRecords(), leaderEpoch, stateBatches))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that inFlightTerminalRecords was the previous name before the KIP was approved, but it's changed now. Please can we expunge the old name for the sake of the future maintainers.

verifyNoErr: Boolean = true, authorizer: Authorizer = null,
writeStateResult: util.List[WriteShareGroupStateResponseData.WriteStateResult]): WriteShareGroupStateResponse = {
val requestChannelRequest = buildRequest(new WriteShareGroupStateRequest.Builder(requestData).build())
val requestChannelRequest = buildRequest(new WriteShareGroupStateRequest.Builder(requestData).build(0))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you hardcoding version 0 here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. That was a typo, will remove the 0.


public static ShareGroupOffset fromRequest(InitializeShareGroupStateRequestData.PartitionData data, int snapshotEpoch, long timestamp) {
// Since initialization changes the start offset, and hence the in flight state is forgotten, the end offset is set
// to be the same as the start offset, and the in flight record count is set to 0.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems inaccurate. I don't understand how the in-flight record count is set to zero here. I do think that the delivery complete count is unknown at this point and is set to the uninitialised value.

"about": "The leader epoch of the share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset." },
{ "name": "DeliveryCompleteCount", "type": "int32", "versions": "0+", "taggedVersions": "0+", "tag": 0, "default": "-1",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at ConsumerGroupMetadataValue, you'll see an example of how a new tagged field was commented. I suggest a similar thing here such as

  // DeliveryCompleteCount was added in Apache Kafka 4.2 (KIP-1226).

"about": "The leader epoch of the share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset, or -1 if the start offset is not being updated." },
{ "name": "DeliveryCompleteCount", "type": "int32", "versions": "0+", "taggedVersions": "0+", "tag": 0, "default": "-1",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved clients core Kafka Broker KIP-932 Queues for Kafka

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants