From c966caf27cc507c22cdd167f618ef5245f8382c3 Mon Sep 17 00:00:00 2001 From: zouyunhe Date: Tue, 25 Nov 2025 12:08:16 +0000 Subject: [PATCH] Fix functions can not mapped to velox --- .github/workflows/flink.yml | 3 +- .../workflows/util/install-flink-resources.sh | 101 ++++++++++++++++++ gluten-flink/docs/Flink.md | 2 +- .../gluten/rexnode/RexNodeConverter.java | 16 +-- .../org/apache/gluten/rexnode/TypeUtils.java | 65 ++++++++--- .../functions/DecimalRexCallConverters.java | 18 +++- .../functions/RexCallConverterFactory.java | 6 +- .../functions/SearchRexCallConverter.java | 2 +- .../stream/custom/ScalarFunctionsTest.java | 40 +++++++ 9 files changed, 220 insertions(+), 33 deletions(-) create mode 100755 .github/workflows/util/install-flink-resources.sh diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index d6de6118da44..4c0f1fdd4e8e 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -59,8 +59,9 @@ jobs: source /opt/rh/gcc-toolset-11/enable sudo dnf install -y patchelf sudo yum install https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm -y + sudo .github/workflows/util/install-flink-resources.sh git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard f389bafb05ebf3563eb3a06ea7574d06720b37e9 + cd velox4j && git reset --hard 14eea127c5088f972cdf1ca0987fd95429485a0e git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. diff --git a/.github/workflows/util/install-flink-resources.sh b/.github/workflows/util/install-flink-resources.sh new file mode 100755 index 000000000000..23c638cef823 --- /dev/null +++ b/.github/workflows/util/install-flink-resources.sh @@ -0,0 +1,101 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +LIBRDKAFKA_VERSION="v2.10.0" +CPPKAFKA_VERSION="v0.4.1" + +function wget_and_untar { + local URL=$1 + local DIR=$2 + mkdir -p "${DEPENDENCY_DIR}" + pushd "${DEPENDENCY_DIR}" + SUDO="${SUDO:-""}" + if [ -d "${DIR}" ]; then + if prompt "${DIR} already exists. Delete?"; then + ${SUDO} rm -rf "${DIR}" + else + popd + return + fi + fi + mkdir -p "${DIR}" + pushd "${DIR}" + curl ${CURL_OPTIONS} -L "${URL}" > $2.tar.gz + tar -xz --strip-components=1 -f $2.tar.gz + popd + popd +} + +function cmake_install_dir { + pushd "./${DEPENDENCY_DIR}/$1" + # remove the directory argument + shift + cmake_install $@ + popd +} + +function cmake_install { + local NAME=$(basename "$(pwd)") + local BINARY_DIR=_build + SUDO="${SUDO:-""}" + if [ -d "${BINARY_DIR}" ]; then + if prompt "Do you want to rebuild ${NAME}?"; then + ${SUDO} rm -rf "${BINARY_DIR}" + else + return 0 + fi + fi + + mkdir -p "${BINARY_DIR}" + COMPILER_FLAGS=$(get_cxx_flags) + # Add platform specific CXX flags if any + COMPILER_FLAGS+=${OS_CXXFLAGS} + + # CMAKE_POSITION_INDEPENDENT_CODE is required so that Velox can be built into dynamic libraries \ + cmake -Wno-dev ${CMAKE_OPTIONS} -B"${BINARY_DIR}" \ + -GNinja \ + -DCMAKE_POSITION_INDEPENDENT_CODE=ON \ + "${INSTALL_PREFIX+-DCMAKE_PREFIX_PATH=}${INSTALL_PREFIX-}" \ + "${INSTALL_PREFIX+-DCMAKE_INSTALL_PREFIX=}${INSTALL_PREFIX-}" \ + -DCMAKE_CXX_FLAGS="$COMPILER_FLAGS" \ + -DBUILD_TESTING=OFF \ + "$@" + # Exit if the build fails. + cmake --build "${BINARY_DIR}" "-j ${NPROC}" || { echo 'build failed' ; exit 1; } + ${SUDO} cmake --install "${BINARY_DIR}" +} + +function run_and_time { + time "$@" || (echo "Failed to run $* ." ; exit 1 ) + { echo "+ Finished running $*"; } 2> /dev/null +} + +function install_librdkafka { + wget_and_untar https://github.com/confluentinc/librdkafka/archive/refs/tags/${LIBRDKAFKA_VERSION}.tar.gz librdkafka + cmake_install_dir librdkafka -DBUILD_TESTS=OFF +} + +function install_cppkafka { + wget_and_untar https://github.com/mfontanini/cppkafka/archive/refs/tags/${CPPKAFKA_VERSION}.tar.gz cppkafka + cmake_install_dir cppkafka -DBUILD_TESTS=OFF +} + +function install_velox_deps { + run_and_time install_librdkafka + run_and_time install_cppkafka +} + +install_velox_deps diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md index 92dc6fbc373d..bc3e2ed09240 100644 --- a/gluten-flink/docs/Flink.md +++ b/gluten-flink/docs/Flink.md @@ -48,7 +48,7 @@ As some features have not been committed to upstream, you have to use the follow ## fetch velox4j code git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git cd velox4j -git reset --hard f389bafb05ebf3563eb3a06ea7574d06720b37e9 +git reset --hard 14eea127c5088f972cdf1ca0987fd95429485a0e mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true ``` **Get gluten** diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java index 4475114dc26d..337300c59270 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java @@ -24,6 +24,7 @@ import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr; import io.github.zhztheplayer.velox4j.expression.TypedExpr; import io.github.zhztheplayer.velox4j.type.Type; +import io.github.zhztheplayer.velox4j.variant.ArrayValue; import io.github.zhztheplayer.velox4j.variant.BigIntValue; import io.github.zhztheplayer.velox4j.variant.BooleanValue; import io.github.zhztheplayer.velox4j.variant.DoubleValue; @@ -46,6 +47,7 @@ import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.NlsString; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; @@ -133,18 +135,16 @@ public static Variant toVariant(RexLiteral literal) { } } - public static List toTypedExpr(Set ranges, RelDataType relDataType) { - List results = new ArrayList<>(ranges.size()); - Type resType = toType(relDataType); + public static TypedExpr toTypedExpr(Set ranges, RelDataType relDataType) { + List values = new ArrayList<>(ranges.size()); for (Range range : ranges) { if (range.lowerEndpoint() != range.upperEndpoint()) { throw new RuntimeException("Not support multi ranges " + range); } - results.add( - new ConstantTypedExpr( - resType, toVariant(range.lowerEndpoint(), relDataType.getSqlTypeName()), null)); + values.add(toVariant(range.lowerEndpoint(), relDataType.getSqlTypeName())); } - return results; + Variant arrayValue = new ArrayValue(values); + return ConstantTypedExpr.create(arrayValue); } public static List toTypedExpr(Range range, RelDataType relDataType) { @@ -165,6 +165,8 @@ public static Variant toVariant(Comparable comparable, SqlTypeName typeName) { return new IntegerValue(((BigDecimal) comparable).intValue()); case VARCHAR: return new VarCharValue(comparable.toString()); + case CHAR: + return new VarCharValue(((NlsString) comparable).getValue()); default: throw new RuntimeException("Unsupported range type: " + typeName); } diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/TypeUtils.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/TypeUtils.java index 156174129d8a..c16d563d673c 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/TypeUtils.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/TypeUtils.java @@ -20,6 +20,9 @@ import io.github.zhztheplayer.velox4j.expression.TypedExpr; import io.github.zhztheplayer.velox4j.type.*; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -54,24 +57,52 @@ public static boolean isIntegerType(Type type) { return type instanceof IntegerType; } + public static List promoteTypes(List exprs) { + Type targetType = null; + int targetPriority = -1, targetDecimalPrecision = -1, targetDecimalScale = -1; + boolean allDecimalType = true; + for (TypedExpr expr : exprs) { + Type returnType = expr.getReturnType(); + if (returnType instanceof DecimalType) { + int precision = ((DecimalType) returnType).getPrecision(); + int scale = ((DecimalType) returnType).getScale(); + if (precision > targetDecimalPrecision) { + targetDecimalPrecision = precision; + } + if (scale > targetDecimalScale) { + targetDecimalScale = scale; + } + } else { + allDecimalType = false; + int priority = getNumericTypePriority(returnType); + if (priority > targetPriority) { + targetPriority = priority; + targetType = returnType; + } + } + } + if (targetType == null && allDecimalType) { + targetType = new DecimalType(targetDecimalPrecision, targetDecimalScale); + } else if (targetType == null) { + throw new FlinkRuntimeException("Logical error, target type can not be null."); + } + List res = new ArrayList<>(); + for (TypedExpr expr : exprs) { + Type returnType = expr.getReturnType(); + TypedExpr promotedExpr = + returnType.equals(targetType) + ? expr + : CastTypedExpr.create( + targetType, + expr instanceof CastTypedExpr ? ((CastTypedExpr) expr).getInputs().get(0) : expr, + expr instanceof CastTypedExpr ? ((CastTypedExpr) expr).isNullOnFailure() : false); + res.add(promotedExpr); + } + return res; + } + public static List promoteTypeForArithmeticExpressions( TypedExpr leftExpr, TypedExpr rightExpr) { - Type leftType = leftExpr.getReturnType(); - Type rightType = rightExpr.getReturnType(); - - int leftPriority = getNumericTypePriority(leftType); - int rightPriority = getNumericTypePriority(rightType); - - Type targetType = leftPriority >= rightPriority ? leftType : rightType; - - TypedExpr promotedLeft = - leftType.equals(targetType) ? leftExpr : CastTypedExpr.create(targetType, leftExpr, false); - - TypedExpr promotedRight = - rightType.equals(targetType) - ? rightExpr - : CastTypedExpr.create(targetType, rightExpr, false); - - return Arrays.asList(promotedLeft, promotedRight); + return promoteTypes(Arrays.asList(leftExpr, rightExpr)); } } diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/DecimalRexCallConverters.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/DecimalRexCallConverters.java index 7e41dc13423c..a00abeac6c81 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/DecimalRexCallConverters.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/DecimalRexCallConverters.java @@ -18,6 +18,7 @@ import org.apache.gluten.rexnode.RexConversionContext; import org.apache.gluten.rexnode.RexNodeConverter; +import org.apache.gluten.rexnode.TypeUtils; import org.apache.gluten.rexnode.ValidationResult; import io.github.zhztheplayer.velox4j.expression.CallTypedExpr; @@ -40,10 +41,18 @@ class DecimalArithmeticOperatorRexCallConverters extends BaseRexCallConverter { + private boolean needAlignParameterTypes = false; + public DecimalArithmeticOperatorRexCallConverters(String functionName) { super(functionName); } + public DecimalArithmeticOperatorRexCallConverters( + String functionName, boolean needAlignParameterTypes) { + this(functionName); + this.needAlignParameterTypes = needAlignParameterTypes; + } + @Override public ValidationResult isSuitable(RexCall callNode, RexConversionContext context) { Type resultType = getResultType(callNode); @@ -67,17 +76,20 @@ public ValidationResult isSuitable(RexCall callNode, RexConversionContext contex public TypedExpr toTypedExpr(RexCall callNode, RexConversionContext context) { List params = getParams(callNode, context); Type resultType = getResultType(callNode); + List castedParams = params.stream() .map(param -> castExprToDecimalType(param, resultType)) .collect(Collectors.toList()); - return new CallTypedExpr(resultType, castedParams, functionName); + if (needAlignParameterTypes) { + return new CallTypedExpr(resultType, TypeUtils.promoteTypes(castedParams), functionName); + } else { + return new CallTypedExpr(resultType, castedParams, functionName); + } } - // If the type is not decimal, convert it to decimal type. private TypedExpr castExprToDecimalType(TypedExpr expr, Type functionResultType) { Type returnType = expr.getReturnType(); - if (returnType instanceof IntegerType) { // Cast BigInt to DecimalType. return CastTypedExpr.create(new DecimalType(10, 0), expr, false); diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java index 1353ff820051..bd27816700bb 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java @@ -44,14 +44,14 @@ public class RexCallConverterFactory { () -> new BasicArithmeticOperatorRexCallConverter("greaterthan"), () -> new StringCompareRexCallConverter("greaterthan"), () -> new StringNumberCompareRexCallConverter("greaterthan"), - () -> new DecimalArithmeticOperatorRexCallConverters("greaterthan"))), + () -> new DecimalArithmeticOperatorRexCallConverters("greaterthan", true))), Map.entry( "<", Arrays.asList( () -> new BasicArithmeticOperatorRexCallConverter("lessthan"), () -> new StringCompareRexCallConverter("lessthan"), () -> new StringNumberCompareRexCallConverter("lessthan"), - () -> new DecimalArithmeticOperatorRexCallConverters("lessthan"))), + () -> new DecimalArithmeticOperatorRexCallConverters("lessthan", true))), Map.entry( "=", Arrays.asList( @@ -103,7 +103,7 @@ public class RexCallConverterFactory { () -> new TimestampIntervalRexCallConverter("lessthanorequal"))), Map.entry("PROCTIME", Arrays.asList(() -> new DefaultRexCallConverter("unix_timestamp"))), Map.entry("OR", Arrays.asList(() -> new DefaultRexCallConverter("or"))), - Map.entry("IS NOT NULL", Arrays.asList(() -> new DefaultRexCallConverter("is_not_null"))), + Map.entry("IS NOT NULL", Arrays.asList(() -> new DefaultRexCallConverter("isnotnull"))), Map.entry( "REGEXP_EXTRACT", Arrays.asList(() -> new DefaultRexCallConverter("regexp_extract"))), Map.entry("LOWER", Arrays.asList(() -> new DefaultRexCallConverter("lower"))), diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SearchRexCallConverter.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SearchRexCallConverter.java index 625183669be4..7fb51ee48346 100644 --- a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SearchRexCallConverter.java +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SearchRexCallConverter.java @@ -61,7 +61,7 @@ public TypedExpr toTypedExpr(RexCall callNode, RexConversionContext context) { Sarg sarg = (Sarg) rexLiteral.getValue(); Set ranges = sarg.rangeSet.asRanges(); if (ranges.size() > 1) { - params.addAll(RexNodeConverter.toTypedExpr(ranges, rexLiteral.getType())); + params.add(RexNodeConverter.toTypedExpr(ranges, rexLiteral.getType())); TypedExpr ignore = new ConstantTypedExpr(new BooleanType(), new BooleanValue(true), null); return new CallTypedExpr(resultType, params, "in"); diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java index 68d04f53d5c4..53aac9405399 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java @@ -229,6 +229,24 @@ void testDecimal() { query = "select b + e as x from tblDecimal where a > 0"; runAndCheck(query, Arrays.asList("+I[2.0]", "+I[5.0]", "+I[7.0]")); + + query = "select a from tblDecimal where b > 2"; + runAndCheck(query, Arrays.asList("+I[3]")); + + query = "select a from tblDecimal where b > 2.0"; + runAndCheck(query, Arrays.asList("+I[3]")); + + query = "select a from tblDecimal where b > cast(2.0 as decimal(5,1))"; + runAndCheck(query, Arrays.asList("+I[3]")); + + query = "select a from tblDecimal where b < 2"; + runAndCheck(query, Arrays.asList("+I[1]")); + + query = "select a from tblDecimal where b < 2.0"; + runAndCheck(query, Arrays.asList("+I[1]")); + + query = "select a from tblDecimal where b < cast(2.0 as decimal(5,1))"; + runAndCheck(query, Arrays.asList("+I[1]")); } @Test @@ -306,4 +324,26 @@ void testNotEqual() { String query = "select a <> 2.20 as x from tblLess where a > 0"; runAndCheck(query, Arrays.asList("+I[true]", "+I[false]", "+I[true]")); } + + @Test + void testIn() { + List rows = + Arrays.asList( + Row.of(1, 1L, "2025-06-24 10:00:01", "1991-01-01 00:00:01"), + Row.of(2, 2L, "2025-06-24 10:00:02", "1991-01-01 00:00:02"), + Row.of(3, 3L, "2025-06-24 10:00:03", "1991-01-01 00:00:03")); + createSimpleBoundedValuesTable("tblIn", "a int, b bigint, c string, d string", rows); + String query = "select d from tblIn where a in(1,2)"; + runAndCheck(query, Arrays.asList("+I[1991-01-01 00:00:01]", "+I[1991-01-01 00:00:02]")); + query = "select b from tblIn where c in('2025-06-24 10:00:02', '2025-06-24 10:00:03')"; + runAndCheck(query, Arrays.asList("+I[2]", "+I[3]")); + } + + @Test + void testIsNotNull() { + List rows = Arrays.asList(Row.of(1, 1L, "abc"), Row.of(2, 2L, null)); + createSimpleBoundedValuesTable("tblIsNotNull", "a int, b bigint, c string", rows); + String query = "select a from tblIsNotNull where c is not null"; + runAndCheck(query, Arrays.asList("+I[1]")); + } }