Skip to content

Commit d274906

Browse files
committed
Implement input file expressions for Iceberg tables
- Support input_file_name(), input_file_block_start(), and input_file_block_length() for Iceberg tables which were missing in the OSS
1 parent 6d5d34d commit d274906

File tree

10 files changed

+276
-47
lines changed

10 files changed

+276
-47
lines changed

cpp/velox/compute/WholeStageResultIterator.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ WholeStageResultIterator::WholeStageResultIterator(
163163
nullptr,
164164
true,
165165
deleteFiles,
166-
std::unordered_map<std::string, std::string>(),
166+
metadataColumn,
167167
properties[idx]);
168168
} else {
169169
auto connectorId = kHiveConnectorId;

gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles(
2828
List<Long> starts,
2929
List<Long> lengths,
3030
List<Map<String, String>> partitionColumns,
31+
List<Map<String, String>> metadataColumns,
3132
LocalFilesNode.ReadFileFormat fileFormat,
3233
List<String> preferredLocations,
3334
List<List<DeleteFile>> deleteFilesList) {
@@ -37,6 +38,7 @@ public static IcebergLocalFilesNode makeIcebergLocalFiles(
3738
starts,
3839
lengths,
3940
partitionColumns,
41+
metadataColumns,
4042
fileFormat,
4143
preferredLocations,
4244
deleteFilesList);

gluten-iceberg/src/main/java/org/apache/gluten/substrait/rel/IcebergLocalFilesNode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
3535
List<Long> starts,
3636
List<Long> lengths,
3737
List<Map<String, String>> partitionColumns,
38+
List<Map<String, String>> metadataColumns,
3839
ReadFileFormat fileFormat,
3940
List<String> preferredLocations,
4041
List<List<DeleteFile>> deleteFilesList) {
@@ -46,7 +47,7 @@ public class IcebergLocalFilesNode extends LocalFilesNode {
4647
new ArrayList<>(),
4748
new ArrayList<>(),
4849
partitionColumns,
49-
new ArrayList<>(),
50+
metadataColumns,
5051
fileFormat,
5152
preferredLocations,
5253
new HashMap<>(),

gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ case class IcebergScanTransformer(
6464
IcebergScanTransformer.supportsBatchScan(scan)
6565
}
6666

67+
override def withNewOutput(newOutput: Seq[AttributeReference]): BatchScanExecTransformerBase = {
68+
this.copy(output = newOutput)
69+
}
70+
6771
override def doValidateInternal(): ValidationResult = {
6872
val validationResult = super.doValidateInternal();
6973
if (!validationResult.ok()) {

gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala

Lines changed: 15 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
2323

2424
import org.apache.spark.softaffinity.SoftAffinity
2525
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
26+
import org.apache.spark.sql.catalyst.expressions.{InputFileBlockLength, InputFileBlockStart, InputFileName}
2627
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
2728
import org.apache.spark.sql.types.StructType
2829

@@ -41,45 +42,7 @@ object GlutenIcebergSourceUtil {
4142
index: Int,
4243
readPartitionSchema: StructType): SplitInfo = inputPartition match {
4344
case partition: SparkInputPartition =>
44-
val paths = new JArrayList[String]()
45-
val starts = new JArrayList[JLong]()
46-
val lengths = new JArrayList[JLong]()
47-
val partitionColumns = new JArrayList[JMap[String, String]]()
48-
val deleteFilesList = new JArrayList[JList[DeleteFile]]()
49-
var fileFormat = ReadFileFormat.UnknownFormat
50-
51-
val tasks = partition.taskGroup[ScanTask]().tasks().asScala
52-
asFileScanTask(tasks.toList).foreach {
53-
task =>
54-
paths.add(
55-
BackendsApiManager.getTransformerApiInstance
56-
.encodeFilePathIfNeed(task.file().path().toString))
57-
starts.add(task.start())
58-
lengths.add(task.length())
59-
partitionColumns.add(getPartitionColumns(task, readPartitionSchema))
60-
deleteFilesList.add(task.deletes())
61-
val currentFileFormat = convertFileFormat(task.file().format())
62-
if (fileFormat == ReadFileFormat.UnknownFormat) {
63-
fileFormat = currentFileFormat
64-
} else if (fileFormat != currentFileFormat) {
65-
throw new UnsupportedOperationException(
66-
s"Only one file format is supported, " +
67-
s"find different file format $fileFormat and $currentFileFormat")
68-
}
69-
}
70-
val preferredLoc = SoftAffinity.getFilePartitionLocations(
71-
paths.asScala.toArray,
72-
inputPartition.preferredLocations())
73-
IcebergLocalFilesBuilder.makeIcebergLocalFiles(
74-
index,
75-
paths,
76-
starts,
77-
lengths,
78-
partitionColumns,
79-
fileFormat,
80-
preferredLoc.toList.asJava,
81-
deleteFilesList
82-
)
45+
genSplitInfo(Seq(partition), index, readPartitionSchema)
8346
case _ =>
8447
throw new UnsupportedOperationException("Only support iceberg SparkInputPartition.")
8548
}
@@ -93,6 +56,7 @@ object GlutenIcebergSourceUtil {
9356
val lengths = new JArrayList[JLong]()
9457
val partitionColumns = new JArrayList[JMap[String, String]]()
9558
val deleteFilesList = new JArrayList[JList[DeleteFile]]()
59+
val metadataColumns = new JArrayList[JMap[String, String]]
9660
val preferredLocs = new JArrayList[String]()
9761
var fileFormat = ReadFileFormat.UnknownFormat
9862

@@ -101,11 +65,19 @@ object GlutenIcebergSourceUtil {
10165
val tasks = partition.taskGroup[ScanTask]().tasks().asScala
10266
asFileScanTask(tasks.toList).foreach {
10367
task =>
68+
val path = task.file().location()
69+
val start = task.start()
70+
val length = task.length()
10471
paths.add(
10572
BackendsApiManager.getTransformerApiInstance
106-
.encodeFilePathIfNeed(task.file().path().toString))
107-
starts.add(task.start())
108-
lengths.add(task.length())
73+
.encodeFilePathIfNeed(path))
74+
starts.add(start)
75+
lengths.add(length)
76+
val metadataColumn = new JHashMap[String, String]()
77+
metadataColumn.put(InputFileName().prettyName, path)
78+
metadataColumn.put(InputFileBlockStart().prettyName, start.toString)
79+
metadataColumn.put(InputFileBlockLength().prettyName, length.toString)
80+
metadataColumns.add(metadataColumn)
10981
partitionColumns.add(getPartitionColumns(task, readPartitionSchema))
11082
deleteFilesList.add(task.deletes())
11183
val currentFileFormat = convertFileFormat(task.file().format())
@@ -125,6 +97,7 @@ object GlutenIcebergSourceUtil {
12597
starts,
12698
lengths,
12799
partitionColumns,
100+
metadataColumns,
128101
fileFormat,
129102
SoftAffinity
130103
.getFilePartitionLocations(paths.asScala.toArray, preferredLocs.asScala.toArray)

gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,15 @@
1717
package org.apache.gluten.execution
1818

1919
import org.apache.spark.SparkConf
20+
import org.apache.spark.TaskContext
2021
import org.apache.spark.sql.Row
22+
import org.apache.spark.sql.execution.ProjectExec
23+
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
24+
import org.apache.spark.sql.functions.{countDistinct, udf}
25+
26+
import org.scalatest.prop.TableDrivenPropertyChecks._
27+
28+
import java.nio.file.{Files, Paths}
2129

2230
abstract class IcebergSuite extends WholeStageTransformerSuite {
2331
protected val rootPath: String = getClass.getResource("/").getPath
@@ -664,4 +672,231 @@ abstract class IcebergSuite extends WholeStageTransformerSuite {
664672
assert(result.head.getString(1) == "test_data")
665673
}
666674
}
675+
676+
test("test iceberg input_file_name, input_file_block_start, input_file_block_length") {
677+
// Test both partitioned and non-partitioned tables
678+
val tables =
679+
org.scalatest.prop.TableDrivenPropertyChecks.Table(
680+
("table_name", "is_partitioned"),
681+
("partitioned_table", true),
682+
("non_partitioned_table", false)
683+
)
684+
forAll(tables) {
685+
(tableName: String, isPartitioned: Boolean) =>
686+
withTable(tableName) {
687+
// Create table with or without partitioning
688+
if (isPartitioned) {
689+
spark.sql(s"""
690+
|create table $tableName(id int, name string)
691+
|using iceberg
692+
|partitioned by (p string);
693+
|""".stripMargin)
694+
} else {
695+
spark.sql(s"""
696+
|create table $tableName(id int, name string, p string)
697+
|using iceberg;
698+
|""".stripMargin)
699+
}
700+
701+
// Same insert for both tables
702+
spark.sql(
703+
s"""
704+
|insert into table $tableName values
705+
|(4, 'a5', 'p4'),
706+
|(1, 'a1', 'p1'),
707+
|(1, 'a2', 'p1'),
708+
|(1, 'a2', 'p1'),
709+
|(1, 'a2', 'p1'),
710+
|(1, 'a2', 'p1'),
711+
|(1, 'a2', 'p1'),
712+
|(1, 'a2', 'p1'),
713+
|(1, 'a2', 'p1'),
714+
|(1, 'a2', 'p1'),
715+
|(2, 'a3', 'p2'),
716+
|(1, 'a2', 'p1'),
717+
|(3, 'a4', 'p3'),
718+
|(10, 'a4', 'p3');
719+
|""".stripMargin
720+
)
721+
722+
// Same query and validation for both tables
723+
runQueryAndCompare(s"""
724+
|select input_file_name() as file_name,
725+
|input_file_block_start(),
726+
|input_file_block_length()
727+
|from $tableName
728+
|order by 1, 2, 3;
729+
|""".stripMargin) {
730+
df =>
731+
{
732+
val plan = df.queryExecution.executedPlan
733+
assert(
734+
getExecutedPlan(df).count(
735+
plan => {
736+
plan.isInstanceOf[IcebergScanTransformer]
737+
}) == 1)
738+
assert(
739+
getExecutedPlan(df).count(
740+
plan => {
741+
plan.isInstanceOf[ProjectExecTransformer]
742+
}) == 1)
743+
assert(
744+
getExecutedPlan(df).count(
745+
plan => {
746+
plan.isInstanceOf[ProjectExec]
747+
}) == 0)
748+
assert(
749+
getExecutedPlan(df).count(
750+
plan => {
751+
plan.isInstanceOf[BatchScanExec]
752+
}) == 0)
753+
foreach(plan) {
754+
case plan: IcebergScanTransformer =>
755+
assert(plan.output.head.name == "input_file_name")
756+
assert(plan.output.apply(1).name == "input_file_block_start")
757+
assert(plan.output.last.name == "input_file_block_length")
758+
case _ => // do nothing
759+
}
760+
val getTaskId = udf(
761+
() => {
762+
val context = TaskContext.get()
763+
assert(context != null)
764+
context.taskAttemptId()
765+
})
766+
val dfWithTaskInfo = df.withColumn("task_id", getTaskId())
767+
val filesPerTask = dfWithTaskInfo
768+
.groupBy("task_id")
769+
.agg(
770+
countDistinct("file_name").alias("distinct_file_count")
771+
)
772+
.collect()
773+
assert(filesPerTask.length == 1)
774+
if (isPartitioned) {
775+
assert(filesPerTask.head.getAs[Long]("distinct_file_count") > 1)
776+
} else {
777+
assert(filesPerTask.head.getAs[Long]("distinct_file_count") == 1)
778+
}
779+
}
780+
}
781+
}
782+
}
783+
}
784+
785+
test("test iceberg input file name with non-ascii characters") {
786+
// Test both partitioned and non-partitioned tables
787+
val tables =
788+
org.scalatest.prop.TableDrivenPropertyChecks.Table(
789+
("table_name", "is_partitioned"),
790+
("partitioned_table", true),
791+
("non_partitioned_table", false)
792+
)
793+
forAll(tables) {
794+
(tableName: String, isPartitioned: Boolean) =>
795+
withTable(tableName) {
796+
// Create table with or without partitioning
797+
if (isPartitioned) {
798+
spark.sql(s"""
799+
|create table $tableName(id int, name string)
800+
|using iceberg
801+
|partitioned by (p string);
802+
|""".stripMargin)
803+
} else {
804+
spark.sql(s"""
805+
|create table $tableName(id int, name string, p string)
806+
|using iceberg;
807+
|""".stripMargin)
808+
}
809+
810+
// Same insert for both tables
811+
spark.sql(
812+
s"""
813+
|insert into table $tableName values
814+
|(4, 'a5', 'p4'),
815+
|(1, 'a1', 'p1'),
816+
|(1, 'a2', 'p1'),
817+
|(1, 'a2', 'p1'),
818+
|(1, 'a2', 'p1'),
819+
|(1, 'a2', 'p1'),
820+
|(1, 'a2', 'p1'),
821+
|(1, 'a2', 'p1'),
822+
|(1, 'a2', 'p1'),
823+
|(1, 'a2', 'p1'),
824+
|(2, 'a3', 'p2'),
825+
|(1, 'a2', 'p1'),
826+
|(3, 'a4', 'p3'),
827+
|(10, 'a4', 'p3');
828+
|""".stripMargin
829+
)
830+
831+
// Verify input_file_name() returns non-ascii file names
832+
val newTableName = s"${tableName}_with_non_ascii"
833+
withTable(newTableName) {
834+
if (isPartitioned) {
835+
spark.sql(s"""
836+
CREATE TABLE $newTableName (id int, name string)
837+
USING iceberg
838+
PARTITIONED BY (p string)
839+
""")
840+
} else {
841+
spark.sql(s"""
842+
CREATE TABLE $newTableName (id int, name string, p string)
843+
USING iceberg
844+
""")
845+
}
846+
847+
// Get a parent directory of partition directories
848+
val filePaths = spark
849+
.sql(s"""
850+
|select distinct input_file_name()
851+
|from $tableName
852+
|""".stripMargin)
853+
.collect()
854+
.map(_.getString(0))
855+
// this is a data directory for non-partitioned table
856+
val partitionDir = new java.io.File(filePaths.head).getParent
857+
val parentOfPartitionDirs = new java.io.File(partitionDir).getParent
858+
val nonAsciiPrefix = "논아스키코드" // scalastyle:ignore nonascii
859+
// Process each file path, copy it, and add to the new table
860+
filePaths.foreach {
861+
filePath =>
862+
// Convert file:/ URI to actual path
863+
val path = Paths.get(new java.net.URI(filePath))
864+
val directory = path.getParent
865+
val originalFileName = path.getFileName.toString
866+
867+
// Create new filename with non-ascii prefix
868+
val newFileName = nonAsciiPrefix + originalFileName
869+
val newPath = directory.resolve(newFileName)
870+
871+
// Copy the file
872+
Files.copy(path, newPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING)
873+
}
874+
875+
spark.sql(s"""
876+
CALL spark_catalog.system.add_files(
877+
table => '$newTableName',
878+
source_table => '`parquet`.`${if (isPartitioned) parentOfPartitionDirs
879+
else partitionDir}`'
880+
)
881+
""")
882+
883+
val fileNamesFromNewTable = spark
884+
.sql(s"""
885+
SELECT DISTINCT input_file_name() as file_name
886+
FROM $newTableName
887+
""")
888+
.collect()
889+
if (isPartitioned) {
890+
assert(fileNamesFromNewTable.length == 8)
891+
} else {
892+
assert(fileNamesFromNewTable.length == 2)
893+
}
894+
val (nonAsciiFiles, asciiFiles) = fileNamesFromNewTable.partition(
895+
_.getString(0)
896+
.contains(nonAsciiPrefix))
897+
assert(nonAsciiFiles.length == asciiFiles.length)
898+
}
899+
}
900+
}
901+
}
667902
}

0 commit comments

Comments
 (0)