Skip to content

Commit 6d5f111

Browse files
micheal-oanishshri-db
authored andcommitted
[SPARK-51972][SS] Introduce State Store file integrity verification using checksum
### What changes were proposed in this pull request? Introducing file integrity verification for state store files by also generating and uploading checksum file. This allows us to verify the file checksum during read. Introduced a new spark conf to enable/disable this (enabled by default). This can be enabled then disabled and vice versa, on the same checkpoint location, giving users the flexibility to turn on/off as needed. This implementation is completely backward compatible, it can be enabled on an existing query/checkpoint, and can be disabled later on without breaking the query/checkpoint. It is currently used for the following state files: delta, snapshot, changelog, zip. We can later enable this for other checkpoint files too e.g. operator metadata files, commit/offset log etc. ### Why are the changes needed? Integrity verification ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New tests added ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52787 from micheal-o/file_checksum. Authored-by: micheal-o <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
1 parent 610bb52 commit 6d5f111

File tree

20 files changed

+1272
-26
lines changed

20 files changed

+1272
-26
lines changed

common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ public enum LogKeys implements LogKey {
9393
CHECKPOINT_PATH,
9494
CHECKPOINT_ROOT,
9595
CHECKPOINT_TIME,
96+
CHECKSUM,
9697
CHOSEN_WATERMARK,
9798
CLASSIFIER,
9899
CLASS_LOADER,

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,13 @@
626626
],
627627
"sqlState" : "42P08"
628628
},
629+
"CHECKPOINT_FILE_CHECKSUM_VERIFICATION_FAILED" : {
630+
"message" : [
631+
"Checksum verification failed, the file may be corrupted. File: <fileName>",
632+
"Expected (file size: <expectedSize>, checksum: <expectedChecksum>), Computed (file size: <computedSize>, checksum: <computedChecksum>)."
633+
],
634+
"sqlState" : "XX000"
635+
},
629636
"CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND" : {
630637
"message" : [
631638
"Checkpoint block <rddBlockId> not found!",

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2665,6 +2665,23 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
26652665
cause = null)
26662666
}
26672667

2668+
def checkpointFileChecksumVerificationFailed(
2669+
file: Path,
2670+
expectedSize: Long,
2671+
expectedChecksum: Int,
2672+
computedSize: Long,
2673+
computedChecksum: Int): Throwable = {
2674+
new SparkException(
2675+
errorClass = "CHECKPOINT_FILE_CHECKSUM_VERIFICATION_FAILED",
2676+
messageParameters = Map(
2677+
"fileName" -> file.toString,
2678+
"expectedSize" -> expectedSize.toString,
2679+
"expectedChecksum" -> expectedChecksum.toString,
2680+
"computedSize" -> computedSize.toString,
2681+
"computedChecksum" -> computedChecksum.toString),
2682+
cause = null)
2683+
}
2684+
26682685
def cannotReadCheckpoint(expectedVersion: String, actualVersion: String): Throwable = {
26692686
new SparkException (
26702687
errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_CHECKPOINT",

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3430,6 +3430,15 @@ object SQLConf {
34303430
.booleanConf
34313431
.createWithDefault(true)
34323432

3433+
val STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED =
3434+
buildConf("spark.sql.streaming.checkpoint.fileChecksum.enabled")
3435+
.internal()
3436+
.doc("When true, checksum would be generated and verified for checkpoint files. " +
3437+
"This is used to detect file corruption.")
3438+
.version("4.1.0")
3439+
.booleanConf
3440+
.createWithDefault(true)
3441+
34333442
val PARALLEL_FILE_LISTING_IN_STATS_COMPUTATION =
34343443
buildConf("spark.sql.statistics.parallelFileListingInStatsComputation.enabled")
34353444
.internal()
@@ -6728,6 +6737,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
67286737

67296738
def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
67306739

6740+
def checkpointFileChecksumEnabled: Boolean = getConf(STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED)
6741+
67316742
def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
67326743

67336744
def useDeprecatedKafkaOffsetFetching: Boolean = getConf(USE_DEPRECATED_KAFKA_OFFSET_FETCHING)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CheckpointFileManager.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ trait CheckpointFileManager {
9393
* checkpoint path.
9494
*/
9595
def createCheckpointDirectory(): Path
96+
97+
def close(): Unit = {}
9698
}
9799

98100
object CheckpointFileManager extends Logging {

0 commit comments

Comments
 (0)