-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Spark: Handle complex type in expression in RemoteScanPlanning #14882
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,6 +64,7 @@ | |
| import org.apache.iceberg.exceptions.NoSuchNamespaceException; | ||
| import org.apache.iceberg.exceptions.NoSuchTableException; | ||
| import org.apache.iceberg.exceptions.NoSuchViewException; | ||
| import org.apache.iceberg.expressions.Expression; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
|
|
@@ -657,7 +658,7 @@ public static PlanTableScanResponse planTableScan( | |
| .fromSnapshotInclusive(request.startSnapshotId()) | ||
| .toSnapshot(request.endSnapshotId()); | ||
|
|
||
| configuredScan = configureScan(incrementalScan, request); | ||
| configuredScan = configureScan(incrementalScan, request, incrementalScan.schema()); | ||
| } else { | ||
| // Regular table scan at a specific snapshot | ||
| TableScan tableScan = table.newScan(); | ||
|
|
@@ -667,7 +668,7 @@ public static PlanTableScanResponse planTableScan( | |
| } | ||
|
|
||
| // Apply filters and projections using common method | ||
| configuredScan = configureScan(tableScan, request); | ||
| configuredScan = configureScan(tableScan, request, tableScan.schema()); | ||
| } | ||
|
|
||
| if (shouldPlanAsync.test(configuredScan)) { | ||
|
|
@@ -773,18 +774,20 @@ static void clearPlanningState() { | |
| * | ||
| * @param scan the scan to configure | ||
| * @param request the plan table scan request containing filters and projections | ||
| * @param schema the table schema to use for type-aware filter deserialization | ||
| * @param <T> the specific scan type (TableScan, IncrementalAppendScan, etc.) | ||
| * @return the configured scan with filters and projections applied | ||
| */ | ||
| private static <T extends Scan<T, FileScanTask, ?>> T configureScan( | ||
| T scan, PlanTableScanRequest request) { | ||
| T scan, PlanTableScanRequest request, Schema schema) { | ||
| T configuredScan = scan; | ||
|
|
||
| if (request.select() != null) { | ||
| configuredScan = configuredScan.select(request.select()); | ||
| } | ||
| if (request.filter() != null) { | ||
| configuredScan = configuredScan.filter(request.filter()); | ||
| Expression filter = request.filter(schema); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this change is probably not needed |
||
| if (filter != null) { | ||
| configuredScan = configuredScan.filter(filter); | ||
| } | ||
| if (request.statsFields() != null) { | ||
| configuredScan = configuredScan.includeColumnStats(request.statsFields()); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -82,6 +82,7 @@ public static FileScanTask fromJson( | |
| DataFile dataFile = | ||
| (DataFile) ContentFileParser.fromJson(JsonUtil.get(DATA_FILE, jsonNode), specsById); | ||
| int specId = dataFile.specId(); | ||
| PartitionSpec spec = specsById.get(specId); | ||
|
|
||
| DeleteFile[] deleteFiles = null; | ||
| if (jsonNode.has(DELETE_FILE_REFERENCES)) { | ||
|
|
@@ -96,13 +97,12 @@ public static FileScanTask fromJson( | |
|
|
||
| Expression filter = null; | ||
| if (jsonNode.has(RESIDUAL_FILTER)) { | ||
| filter = ExpressionParser.fromJson(jsonNode.get(RESIDUAL_FILTER)); | ||
| filter = ExpressionParser.fromJson(jsonNode.get(RESIDUAL_FILTER), spec.schema()); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was caught during the execution phase of spark, need to pass schema for residual |
||
| } | ||
|
|
||
| String schemaString = SchemaParser.toJson(specsById.get(specId).schema()); | ||
| String specString = PartitionSpecParser.toJson(specsById.get(specId)); | ||
| ResidualEvaluator boundResidual = | ||
| ResidualEvaluator.of(specsById.get(specId), filter, isCaseSensitive); | ||
| String schemaString = SchemaParser.toJson(spec.schema()); | ||
| String specString = PartitionSpecParser.toJson(spec); | ||
| ResidualEvaluator boundResidual = ResidualEvaluator.of(spec, filter, isCaseSensitive); | ||
|
|
||
| return new BaseFileScanTask(dataFile, deleteFiles, schemaString, specString, boundResidual); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,16 +18,22 @@ | |
| */ | ||
| package org.apache.iceberg.rest.requests; | ||
|
|
||
| import com.fasterxml.jackson.core.JsonProcessingException; | ||
| import com.fasterxml.jackson.databind.JsonNode; | ||
| import java.io.UncheckedIOException; | ||
| import java.util.List; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.expressions.Expression; | ||
| import org.apache.iceberg.expressions.ExpressionParser; | ||
| import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.rest.RESTRequest; | ||
| import org.apache.iceberg.util.JsonUtil; | ||
|
|
||
| public class PlanTableScanRequest implements RESTRequest { | ||
| private final Long snapshotId; | ||
| private final List<String> select; | ||
| private final Expression filter; | ||
| private final JsonNode filterJson; | ||
| private final boolean caseSensitive; | ||
| private final boolean useSnapshotSchema; | ||
| private final Long startSnapshotId; | ||
|
|
@@ -43,8 +49,47 @@ public List<String> select() { | |
| return select; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the filter expression, deserializing it without schema context. | ||
| * | ||
| * <p>Note: This method does not perform type-aware deserialization and may not work correctly for | ||
| * BINARY, FIXED, and DECIMAL types. Use {@link #filter(Schema)} instead for proper type handling. | ||
| * | ||
| * @return the filter expression, or null if no filter was specified | ||
| * @deprecated since 1.11.0, will be removed in 1.12.0; use {@link #filter(Schema)} instead for | ||
| * proper type-aware deserialization | ||
| */ | ||
| @Deprecated | ||
| public Expression filter() { | ||
| return filter; | ||
| if (filterJson == null) { | ||
| return null; | ||
| } | ||
| return ExpressionParser.fromJson(filterJson); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the filter expression, deserializing it with the provided schema for type inference. | ||
| * | ||
| * <p>This method should be preferred over {@link #filter()} as it properly handles BINARY, FIXED, | ||
| * and DECIMAL types by using schema information for type-aware deserialization. | ||
| * | ||
| * @param schema the table schema to use for type-aware deserialization of filter values | ||
| * @return the filter expression, or null if no filter was specified | ||
| */ | ||
| public Expression filter(Schema schema) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let me think about this a bit more. I also think we have a few more cases across the codebase where we also ser/de
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the other thing we might need to consider is how we would be lazily binding this in other client implementations. @Fokko does pyiceberg have examples of how it does a late-binding similar to this one? |
||
| if (filterJson == null) { | ||
| return null; | ||
| } | ||
| return ExpressionParser.fromJson(filterJson, schema); | ||
| } | ||
|
|
||
| /** | ||
| * Returns the raw filter JSON node, if available. Package-private for use by parser. | ||
| * | ||
| * @return the raw filter JSON, or null if no filter JSON was stored | ||
| */ | ||
| JsonNode filterJson() { | ||
| return filterJson; | ||
| } | ||
|
|
||
| public boolean caseSensitive() { | ||
|
|
@@ -74,7 +119,7 @@ public Long minRowsRequested() { | |
| private PlanTableScanRequest( | ||
| Long snapshotId, | ||
| List<String> select, | ||
| Expression filter, | ||
| JsonNode filterJson, | ||
| boolean caseSensitive, | ||
| boolean useSnapshotSchema, | ||
| Long startSnapshotId, | ||
|
|
@@ -83,7 +128,7 @@ private PlanTableScanRequest( | |
| Long minRowsRequested) { | ||
| this.snapshotId = snapshotId; | ||
| this.select = select; | ||
| this.filter = filter; | ||
| this.filterJson = filterJson; | ||
| this.caseSensitive = caseSensitive; | ||
| this.useSnapshotSchema = useSnapshotSchema; | ||
| this.startSnapshotId = startSnapshotId; | ||
|
|
@@ -111,14 +156,21 @@ public void validate() { | |
| Preconditions.checkArgument( | ||
| minRowsRequested >= 0L, "Invalid scan: minRowsRequested is negative"); | ||
| } | ||
|
|
||
| if (null != filterJson) { | ||
| Preconditions.checkArgument( | ||
| filterJson.isBoolean() || filterJson.isObject(), | ||
| "Cannot parse expression from non-object: %s", | ||
| filterJson); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return MoreObjects.toStringHelper(this) | ||
| .add("snapshotId", snapshotId) | ||
| .add("select", select) | ||
| .add("filter", filter) | ||
| .add("filter", filterJson) | ||
| .add("caseSensitive", caseSensitive) | ||
| .add("useSnapshotSchema", useSnapshotSchema) | ||
| .add("startSnapshotId", startSnapshotId) | ||
|
|
@@ -135,7 +187,7 @@ public static Builder builder() { | |
| public static class Builder { | ||
| private Long snapshotId; | ||
| private List<String> select; | ||
| private Expression filter; | ||
| private JsonNode filterJson; | ||
| private boolean caseSensitive = true; | ||
| private boolean useSnapshotSchema = false; | ||
| private Long startSnapshotId; | ||
|
|
@@ -160,8 +212,38 @@ public Builder withSelect(List<String> projection) { | |
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the filter expression for the scan. | ||
| * | ||
| * @param expression the filter expression | ||
| * @return this builder | ||
| * @deprecated since 1.11.0, will be removed in 1.12.0; this method serializes the expression to | ||
| * JSON immediately, which may lose type information for BINARY, FIXED, and DECIMAL types | ||
| */ | ||
| @Deprecated | ||
| public Builder withFilter(Expression expression) { | ||
| this.filter = expression; | ||
| if (expression != null) { | ||
| try { | ||
| // Serialize expression to JSON immediately for deferred type-aware deserialization | ||
| String jsonString = ExpressionParser.toJson(expression); | ||
| this.filterJson = JsonUtil.mapper().readTree(jsonString); | ||
| } catch (JsonProcessingException e) { | ||
| throw new UncheckedIOException("Failed to serialize filter expression to JSON", e); | ||
| } | ||
| } else { | ||
| this.filterJson = null; | ||
| } | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the filter JSON node directly. Package-private for use by parser. | ||
| * | ||
| * @param filterJsonNode the filter as a JSON node | ||
| * @return this builder | ||
| */ | ||
| Builder withFilterJson(JsonNode filterJsonNode) { | ||
| this.filterJson = filterJsonNode; | ||
| return this; | ||
| } | ||
|
|
||
|
|
@@ -199,7 +281,7 @@ public PlanTableScanRequest build() { | |
| return new PlanTableScanRequest( | ||
| snapshotId, | ||
| select, | ||
| filter, | ||
| filterJson, | ||
| caseSensitive, | ||
| useSnapshotSchema, | ||
| startSnapshotId, | ||
|
|
||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these changes are probably not required to fix the underlying issue, so we might want to separate them out and test them individually