Skip to content

Commit cd292b4

Browse files
committed
#805 Fix PR suggestions (Thanks @codarabbitai).
1 parent 45a5e01 commit cd292b4

File tree

6 files changed

+46
-22
lines changed

6 files changed

+46
-22
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,10 @@ object CobolParametersParser extends Logging {
832832
}
833833
}
834834

835+
if (!params.contains(PARAM_RECORD_LENGTH_FIELD) && params.contains(PARAM_RECORD_LENGTH_MAP)) {
836+
throw new IllegalArgumentException(s"Option '$PARAM_RECORD_LENGTH_MAP' requires '$PARAM_RECORD_LENGTH_FIELD' to be specified.")
837+
}
838+
835839
if (params.contains(PARAM_RECORD_LENGTH)) {
836840
val incorrectParameters = new ListBuffer[String]
837841
if (isText) {
@@ -950,6 +954,10 @@ object CobolParametersParser extends Logging {
950954
params.contains(PARAM_STRICT_INTEGRAL_PRECISION) && params(PARAM_STRICT_INTEGRAL_PRECISION).toBoolean)
951955
throw new IllegalArgumentException(s"Options '$PARAM_DISPLAY_PIC_ALWAYS_STRING' and '$PARAM_STRICT_INTEGRAL_PRECISION' cannot be used together.")
952956

957+
if (params.contains(PARAM_ENABLE_INDEXES) && !params(PARAM_ENABLE_INDEXES).toBoolean &&
958+
params.contains(PARAM_ENABLE_INDEX_CACHE) && params(PARAM_ENABLE_INDEX_CACHE).toBoolean)
959+
throw new IllegalArgumentException(s"When '$PARAM_ENABLE_INDEXES' = false, '$PARAM_ENABLE_INDEX_CACHE' cannot be true.")
960+
953961
if (validateRedundantOptions && unusedKeys.nonEmpty) {
954962
val unusedKeyStr = unusedKeys.mkString(",")
955963
val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr."

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,15 @@ class SerializableConfiguration(@transient var value: Configuration) extends Ser
6565
class CobolRelation(sourceDirs: Seq[String],
6666
cobolReader: Reader,
6767
localityParams: LocalityParameters,
68-
debugIgnoreFileSize: Boolean,
69-
indexCachingAllowed: Boolean)
68+
debugIgnoreFileSize: Boolean)
7069
(@transient val sqlContext: SQLContext)
7170
extends BaseRelation
7271
with Serializable
7372
with TableScan {
7473

7574
private val filesList = CobolRelation.getListFilesWithOrder(sourceDirs, sqlContext, isRecursiveRetrieval)
7675

77-
private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext, indexCachingAllowed)(localityParams)
76+
private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext, cobolReader.getReaderProperties.isIndexCachingAllowed)(localityParams)
7877

7978
override def schema: StructType = {
8079
cobolReader.getSparkSchema

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,11 @@ class DefaultSource
5858

5959
val cobolParameters = CobolParametersParser.parse(new Parameters(parameters))
6060
CobolParametersValidator.checkSanity(cobolParameters)
61-
val indexCachingAllowed = cobolParameters.variableLengthParams match {
62-
case Some(varLenParams) => varLenParams.isIndexCachingAllowed
63-
case None => false
64-
}
6561

6662
new CobolRelation(cobolParameters.sourcePaths,
6763
buildEitherReader(sqlContext.sparkSession, cobolParameters),
6864
LocalityParameters.extract(cobolParameters),
69-
cobolParameters.debugIgnoreFileSize,
70-
indexCachingAllowed)(sqlContext)
65+
cobolParameters.debugIgnoreFileSize)(sqlContext)
7166
}
7267

7368
/** Writer relation */

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import scala.collection.mutable.ArrayBuffer
4646
* In a nutshell, ideally, there will be as many partitions as are there are indexes.
4747
*/
4848
private[cobol] object IndexBuilder extends Logging {
49-
private val indexCache = new ConcurrentHashMap[String, Array[SparseIndexEntry]]()
49+
private[cobol] val indexCache = new ConcurrentHashMap[String, Array[SparseIndexEntry]]()
5050

5151
def buildIndex(filesList: Array[FileWithOrder],
5252
cobolReader: Reader,
@@ -124,7 +124,7 @@ private[cobol] object IndexBuilder extends Logging {
124124
val conf = sqlContext.sparkContext.hadoopConfiguration
125125
val sconf = new SerializableConfiguration(conf)
126126

127-
// Splitting between files for which indexes are chached and teh list of files for which indexes are not cached
127+
// Splitting between files for which indexes are cached and the list of files for which indexes are not cached
128128
val cachedFiles = if (cachingAllowed) {
129129
filesList.filter(f => indexCache.containsKey(f.filePath))
130130
} else {
@@ -381,7 +381,7 @@ private[cobol] object IndexBuilder extends Logging {
381381
private def createIndexRDD(indexes: Array[SparseIndexEntry], sqlContext: SQLContext): RDD[SparseIndexEntry] = {
382382
val indexCount = indexes.length
383383

384-
val numPartitions = Math.min(indexCount, Constants.maxNumPartitions)
384+
val numPartitions = Math.max(1, Math.min(indexCount, Constants.maxNumPartitions))
385385
logger.info(s"Index elements count: ${indexes.length}, number of partitions = $numPartitions")
386386

387387
sqlContext.sparkContext.parallelize(indexes, numPartitions)

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelationSpec.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
6464
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
6565
testReader,
6666
localityParams = localityParams,
67-
debugIgnoreFileSize = false,
68-
indexCachingAllowed = false)(sqlContext)
67+
debugIgnoreFileSize = false)(sqlContext)
6968
val cobolData: RDD[Row] = relation.parseRecords(testReader, oneRowRDD)
7069

7170
val cobolDataFrame = sqlContext.createDataFrame(cobolData, sparkSchema)
@@ -89,8 +88,7 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
8988
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
9089
testReader,
9190
localityParams = localityParams,
92-
debugIgnoreFileSize = false,
93-
indexCachingAllowed = false)(sqlContext)
91+
debugIgnoreFileSize = false)(sqlContext)
9492

9593
val caught = intercept[Exception] {
9694
relation.parseRecords(testReader, oneRowRDD).collect()
@@ -105,8 +103,7 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
105103
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
106104
testReader,
107105
localityParams = localityParams,
108-
debugIgnoreFileSize = false,
109-
indexCachingAllowed = false)(sqlContext)
106+
debugIgnoreFileSize = false)(sqlContext)
110107

111108
val caught = intercept[SparkException] {
112109
relation.parseRecords(testReader, oneRowRDD).collect()

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test37RecordLengthMappingSpec.scala

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616

1717
package za.co.absa.cobrix.spark.cobol.source.integration
1818

19+
import org.apache.hadoop.fs.Path
1920
import org.apache.spark.SparkException
2021
import org.scalatest.wordspec.AnyWordSpec
22+
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.{PARAM_ENABLE_INDEXES, PARAM_ENABLE_INDEX_CACHE}
2123
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
2224
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
25+
import za.co.absa.cobrix.spark.cobol.source.index.IndexBuilder
2326

2427
class Test37RecordLengthMappingSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture {
2528
private val copybook =
@@ -171,17 +174,15 @@ class Test37RecordLengthMappingSpec extends AnyWordSpec with SparkTestBase with
171174
}
172175

173176
"work for data with offsets and indexes and index cache" in {
174-
withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile =>
177+
withTempBinFile("record_length_mapping", ".tmp", dataSimple) { tempFile =>
175178
val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}"""
176179

177180
val df = spark.read
178181
.format("cobol")
179182
.option("copybook_contents", copybook)
180183
.option("record_format", "F")
181184
.option("record_length_field", "SEG-ID")
182-
.option("file_start_offset", 1)
183-
.option("file_end_offset", 2)
184-
.option("input_split_records", "2")
185+
.option("input_split_records", "1")
185186
.option("enable_index_cache", "true")
186187
.option("pedantic", "true")
187188
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
@@ -190,11 +191,35 @@ class Test37RecordLengthMappingSpec extends AnyWordSpec with SparkTestBase with
190191
val actualInitial = df.orderBy("SEG_ID").toJSON.collect().mkString(",")
191192
val actualCached = df.orderBy("SEG_ID").toJSON.collect().mkString(",")
192193

194+
val pathNameAsCached = s"file:$tempFile"
195+
196+
assert(IndexBuilder.indexCache.get(pathNameAsCached) != null)
197+
assert(IndexBuilder.indexCache.get(pathNameAsCached).length == 2)
198+
193199
assert(actualInitial == expected)
194200
assert(actualCached == expected)
195201
}
196202
}
197203

204+
"throw an exception when index caching is requested while indexes are turned off" in {
205+
withTempBinFile("record_length_mapping", ".tmp", dataWithFileOffsets) { tempFile =>
206+
val ex = intercept[IllegalArgumentException] {
207+
spark.read
208+
.format("cobol")
209+
.option("copybook_contents", copybook)
210+
.option("record_format", "F")
211+
.option("record_length_field", "SEG-ID")
212+
.option("enable_indexes", "false")
213+
.option("enable_index_cache", "true")
214+
.option("pedantic", "true")
215+
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
216+
.load(tempFile)
217+
}
218+
219+
assert(ex.getMessage == s"When '$PARAM_ENABLE_INDEXES' = false, '$PARAM_ENABLE_INDEX_CACHE' cannot be true.")
220+
}
221+
}
222+
198223
"throw an exception for unknown mapping" in {
199224
withTempBinFile("record_length_mapping", ".tmp", dataSimple) { tempFile =>
200225
val df = spark.read

0 commit comments

Comments
 (0)