Skip to content

Commit 7c5a9a3

Browse files
committed
[SPARK-54094][SQL] Extract common methods to KafkaOffsetReaderBase
### What changes were proposed in this pull request? This patch extracts the same method `getOffsetRangesFromResolvedOffsets` from two `KafkaOffsetReader` implementations. ### Why are the changes needed? When reviewing apache#52729, found that `KafkaOffsetReaderConsumer` and `KafkaOffsetReaderAdmin` have the exactly same `getOffsetRangesFromResolvedOffsets` method. The method is actually long so seems good to extract them to common one. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52788 from viirya/kafkaoffsetreader_refactor. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
1 parent 1d575f2 commit 7c5a9a3

File tree

3 files changed

+93
-174
lines changed

3 files changed

+93
-174
lines changed

connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,17 @@ package org.apache.spark.sql.kafka010
1919

2020
import java.{util => ju}
2121

22+
import org.apache.kafka.clients.consumer.ConsumerConfig
2223
import org.apache.kafka.common.TopicPartition
2324

25+
import org.apache.spark.SparkEnv
2426
import org.apache.spark.internal.Logging
27+
import org.apache.spark.internal.LogKeys.TOPIC_PARTITION_OFFSET
28+
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
2529
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
2630
import org.apache.spark.sql.internal.SQLConf
2731
import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
32+
import org.apache.spark.util.ArrayImplicits._
2833

2934
/**
3035
* Base trait to fetch offsets from Kafka. The implementations are
@@ -167,3 +172,85 @@ private[kafka010] object KafkaOffsetReader extends Logging {
167172
}
168173
}
169174
}
175+
176+
private[kafka010] abstract class KafkaOffsetReaderBase extends KafkaOffsetReader with Logging {
177+
protected val rangeCalculator: KafkaOffsetRangeCalculator
178+
179+
private def getSortedExecutorList: Array[String] = {
180+
def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
181+
if (a.host == b.host) {
182+
a.executorId > b.executorId
183+
} else {
184+
a.host > b.host
185+
}
186+
}
187+
188+
val bm = SparkEnv.get.blockManager
189+
bm.master.getPeers(bm.blockManagerId).toArray
190+
.map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
191+
.sortWith(compare)
192+
.map(_.toString)
193+
}
194+
195+
override def getOffsetRangesFromResolvedOffsets(
196+
fromPartitionOffsets: PartitionOffsetMap,
197+
untilPartitionOffsets: PartitionOffsetMap,
198+
reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = {
199+
// Find the new partitions, and get their earliest offsets
200+
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
201+
val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
202+
if (newPartitionInitialOffsets.keySet != newPartitions) {
203+
// We cannot get from offsets for some partitions. It means they got deleted.
204+
val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
205+
reportDataLoss(
206+
s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
207+
() => KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
208+
}
209+
logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}")
210+
newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
211+
reportDataLoss(
212+
s"Added partition $p starts from $o instead of 0. Some data may have been missed",
213+
() => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o))
214+
}
215+
216+
val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
217+
if (deletedPartitions.nonEmpty) {
218+
val (message, config) =
219+
if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
220+
(s"$deletedPartitions are gone.${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}",
221+
Some(ConsumerConfig.GROUP_ID_CONFIG))
222+
} else {
223+
(s"$deletedPartitions are gone. Some data may have been missed.", None)
224+
}
225+
226+
reportDataLoss(
227+
message,
228+
() => KafkaExceptions.partitionsDeleted(deletedPartitions, config))
229+
}
230+
231+
// Use the until partitions to calculate offset ranges to ignore partitions that have
232+
// been deleted
233+
val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
234+
// Ignore partitions that we don't know the from offsets.
235+
newPartitionInitialOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
236+
}.toSeq
237+
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
238+
239+
val fromOffsets = fromPartitionOffsets ++ newPartitionInitialOffsets
240+
val untilOffsets = untilPartitionOffsets
241+
val ranges = topicPartitions.map { tp =>
242+
val fromOffset = fromOffsets(tp)
243+
val untilOffset = untilOffsets(tp)
244+
if (untilOffset < fromOffset) {
245+
reportDataLoss(
246+
s"Partition $tp's offset was changed from " +
247+
s"$fromOffset to $untilOffset. This could be either 1) a user error that the start " +
248+
"offset is set beyond available offset when starting query, or 2) the kafka " +
249+
"topic-partition is deleted and re-created.",
250+
() => KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset))
251+
}
252+
KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
253+
}
254+
rangeCalculator.getRanges(ranges, getSortedExecutorList.toImmutableArraySeq)
255+
}
256+
}

connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala

Lines changed: 3 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
2929
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
3030
import org.apache.kafka.common.requests.OffsetFetchResponse
3131

32-
import org.apache.spark.SparkEnv
33-
import org.apache.spark.internal.Logging
34-
import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS, TOPIC_PARTITION_OFFSET}
35-
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
32+
import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS}
3633
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3734
import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
3835
import org.apache.spark.util.ArrayImplicits._
@@ -52,7 +49,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
5249
consumerStrategy: ConsumerStrategy,
5350
override val driverKafkaParams: ju.Map[String, Object],
5451
readerOptions: CaseInsensitiveMap[String],
55-
driverGroupIdPrefix: String) extends KafkaOffsetReader with Logging {
52+
driverGroupIdPrefix: String) extends KafkaOffsetReaderBase {
5653

5754
private[kafka010] val maxOffsetFetchAttempts =
5855
readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, "3").toInt
@@ -102,7 +99,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
10299
private val maxRecordsPerPartition =
103100
readerOptions.get(KafkaSourceProvider.MAX_RECORDS_PER_PARTITION_OPTION_KEY).map(_.toLong)
104101

105-
private val rangeCalculator =
102+
override protected val rangeCalculator =
106103
new KafkaOffsetRangeCalculator(minPartitions, maxRecordsPerPartition)
107104

108105
/**
@@ -442,87 +439,6 @@ private[kafka010] class KafkaOffsetReaderAdmin(
442439
}
443440
}
444441

445-
private def getSortedExecutorList: Array[String] = {
446-
def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
447-
if (a.host == b.host) {
448-
a.executorId > b.executorId
449-
} else {
450-
a.host > b.host
451-
}
452-
}
453-
454-
val bm = SparkEnv.get.blockManager
455-
bm.master.getPeers(bm.blockManagerId).toArray
456-
.map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
457-
.sortWith(compare)
458-
.map(_.toString)
459-
}
460-
461-
override def getOffsetRangesFromResolvedOffsets(
462-
fromPartitionOffsets: PartitionOffsetMap,
463-
untilPartitionOffsets: PartitionOffsetMap,
464-
reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = {
465-
// Find the new partitions, and get their earliest offsets
466-
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
467-
val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
468-
if (newPartitionInitialOffsets.keySet != newPartitions) {
469-
// We cannot get from offsets for some partitions. It means they got deleted.
470-
val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
471-
reportDataLoss(
472-
s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
473-
() =>
474-
KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
475-
}
476-
logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}")
477-
newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
478-
reportDataLoss(
479-
s"Added partition $p starts from $o instead of 0. Some data may have been missed",
480-
() => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o))
481-
}
482-
483-
val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
484-
if (deletedPartitions.nonEmpty) {
485-
val (message, config) =
486-
if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
487-
(s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}",
488-
Some(ConsumerConfig.GROUP_ID_CONFIG))
489-
} else {
490-
(s"$deletedPartitions are gone. Some data may have been missed.", None)
491-
}
492-
493-
reportDataLoss(
494-
message,
495-
() =>
496-
KafkaExceptions.partitionsDeleted(deletedPartitions, config))
497-
}
498-
499-
// Use the until partitions to calculate offset ranges to ignore partitions that have
500-
// been deleted
501-
val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
502-
// Ignore partitions that we don't know the from offsets.
503-
newPartitionInitialOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
504-
}.toSeq
505-
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
506-
507-
val fromOffsets = fromPartitionOffsets ++ newPartitionInitialOffsets
508-
val untilOffsets = untilPartitionOffsets
509-
val ranges = topicPartitions.map { tp =>
510-
val fromOffset = fromOffsets(tp)
511-
val untilOffset = untilOffsets(tp)
512-
if (untilOffset < fromOffset) {
513-
reportDataLoss(
514-
s"Partition $tp's offset was changed from " +
515-
s"$fromOffset to $untilOffset. This could be either 1) a user error that the start " +
516-
"offset is set beyond available offset when starting query, or 2) the kafka " +
517-
"topic-partition is deleted and re-created.",
518-
() =>
519-
KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset))
520-
}
521-
KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
522-
}
523-
rangeCalculator.getRanges(ranges, getSortedExecutorList.toImmutableArraySeq)
524-
}
525-
526442
private def partitionsAssignedToAdmin(
527443
body: ju.Set[TopicPartition] => Map[TopicPartition, Long])
528444
: Map[TopicPartition, Long] = {

connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala

Lines changed: 3 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,7 @@ import scala.util.control.NonFatal
2626
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, OffsetAndTimestamp}
2727
import org.apache.kafka.common.TopicPartition
2828

29-
import org.apache.spark.SparkEnv
30-
import org.apache.spark.internal.Logging
31-
import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS, TOPIC_PARTITION_OFFSET}
32-
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
29+
import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS}
3330
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3431
import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset
3532
import org.apache.spark.util.{UninterruptibleThread, UninterruptibleThreadRunner}
@@ -52,7 +49,7 @@ private[kafka010] class KafkaOffsetReaderConsumer(
5249
consumerStrategy: ConsumerStrategy,
5350
override val driverKafkaParams: ju.Map[String, Object],
5451
readerOptions: CaseInsensitiveMap[String],
55-
driverGroupIdPrefix: String) extends KafkaOffsetReader with Logging {
52+
driverGroupIdPrefix: String) extends KafkaOffsetReaderBase {
5653

5754
/**
5855
* [[UninterruptibleThreadRunner]] ensures that all
@@ -101,7 +98,7 @@ private[kafka010] class KafkaOffsetReaderConsumer(
10198
private val maxRecordsPerPartition =
10299
readerOptions.get(KafkaSourceProvider.MAX_RECORDS_PER_PARTITION_OPTION_KEY).map(_.toLong)
103100

104-
private val rangeCalculator =
101+
override protected val rangeCalculator =
105102
new KafkaOffsetRangeCalculator(minPartitions, maxRecordsPerPartition)
106103

107104
private[kafka010] val offsetFetchAttemptIntervalMs =
@@ -491,87 +488,6 @@ private[kafka010] class KafkaOffsetReaderConsumer(
491488
}
492489
}
493490

494-
private def getSortedExecutorList(): Array[String] = {
495-
def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
496-
if (a.host == b.host) {
497-
a.executorId > b.executorId
498-
} else {
499-
a.host > b.host
500-
}
501-
}
502-
503-
val bm = SparkEnv.get.blockManager
504-
bm.master.getPeers(bm.blockManagerId).toArray
505-
.map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
506-
.sortWith(compare)
507-
.map(_.toString)
508-
}
509-
510-
override def getOffsetRangesFromResolvedOffsets(
511-
fromPartitionOffsets: PartitionOffsetMap,
512-
untilPartitionOffsets: PartitionOffsetMap,
513-
reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = {
514-
// Find the new partitions, and get their earliest offsets
515-
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
516-
val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq)
517-
if (newPartitionInitialOffsets.keySet != newPartitions) {
518-
// We cannot get from offsets for some partitions. It means they got deleted.
519-
val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
520-
reportDataLoss(
521-
s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed",
522-
() =>
523-
KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
524-
}
525-
logInfo(log"Partitions added: ${MDC(TOPIC_PARTITION_OFFSET, newPartitionInitialOffsets)}")
526-
newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
527-
reportDataLoss(
528-
s"Added partition $p starts from $o instead of 0. Some data may have been missed",
529-
() => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o))
530-
}
531-
532-
val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
533-
if (deletedPartitions.nonEmpty) {
534-
val (message, config) =
535-
if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
536-
(s"$deletedPartitions are gone.${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}",
537-
Some(ConsumerConfig.GROUP_ID_CONFIG))
538-
} else {
539-
(s"$deletedPartitions are gone. Some data may have been missed.", None)
540-
}
541-
542-
reportDataLoss(
543-
message,
544-
() =>
545-
KafkaExceptions.partitionsDeleted(deletedPartitions, config))
546-
}
547-
548-
// Use the until partitions to calculate offset ranges to ignore partitions that have
549-
// been deleted
550-
val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
551-
// Ignore partitions that we don't know the from offsets.
552-
newPartitionInitialOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
553-
}.toSeq
554-
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
555-
556-
val fromOffsets = fromPartitionOffsets ++ newPartitionInitialOffsets
557-
val untilOffsets = untilPartitionOffsets
558-
val ranges = topicPartitions.map { tp =>
559-
val fromOffset = fromOffsets(tp)
560-
val untilOffset = untilOffsets(tp)
561-
if (untilOffset < fromOffset) {
562-
reportDataLoss(
563-
s"Partition $tp's offset was changed from " +
564-
s"$fromOffset to $untilOffset. This could be either 1) a user error that the start " +
565-
"offset is set beyond available offset when starting query, or 2) the kafka " +
566-
"topic-partition is deleted and re-created.",
567-
() =>
568-
KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset))
569-
}
570-
KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None)
571-
}
572-
rangeCalculator.getRanges(ranges, getSortedExecutorList().toImmutableArraySeq)
573-
}
574-
575491
private def partitionsAssignedToConsumer(
576492
body: ju.Set[TopicPartition] => Map[TopicPartition, Long],
577493
fetchingEarliestOffset: Boolean = false)

0 commit comments

Comments
 (0)