From 0c33b5d7843a572272f895f6fd3520e66a9619fb Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Thu, 27 Nov 2025 18:34:21 +0800 Subject: [PATCH 1/9] feat: improve error message and sql generation for udfs --- python/databend_udf/udf.py | 17 +++++---- python/tests/test_sql_generation.py | 54 +++++++++++++++++++++++++++++ python/tests/test_stage_location.py | 2 +- 3 files changed, 66 insertions(+), 7 deletions(-) create mode 100644 python/tests/test_sql_generation.py diff --git a/python/databend_udf/udf.py b/python/databend_udf/udf.py index 0bce1eb..bb353df 100644 --- a/python/databend_udf/udf.py +++ b/python/databend_udf/udf.py @@ -358,9 +358,14 @@ def require_stage_locations(self, names: List[str]) -> Dict[str, StageLocation]: mapping = self._stage_mapping() missing = [name for name in names if name not in mapping] if missing: - raise ValueError( - "Missing stage mapping for parameter(s): " + ", ".join(sorted(missing)) + msg = ( + "Missing stage mapping for parameter(s): " + + ", ".join(sorted(missing)) + + ".\n" + "Please check your CREATE FUNCTION statement to ensure that the stage location is correctly specified.\n" + "For example: CREATE FUNCTION ... (stage_param STAGE_LOCATION) ...\n" ) + raise ValueError(msg) return {name: mapping[name] for name in names} @@ -493,7 +498,7 @@ def __init__( for kind, identifier in self._call_arg_layout: if kind == "stage": stage_ref_name = self._stage_param_to_ref.get(identifier, identifier) - self._sql_parameter_defs.append(f"STAGE_LOCATION {stage_ref_name}") + self._sql_parameter_defs.append(f"{stage_ref_name} STAGE_LOCATION") elif kind == "data": field = data_field_map[identifier] self._sql_parameter_defs.append( @@ -1144,11 +1149,11 @@ def add_function(self, udf: UserDefinedFunction): f"{field.name} {_inner_field_to_string(field)}" for field in udf._result_schema ) - output_type = f"({column_defs})" + output_type = f"TABLE ({column_defs})" else: output_type = _arrow_field_to_string(udf._result_schema[0]) sql = ( - f"CREATE FUNCTION {name} ({input_types}) " + f"CREATE OR REPLACE FUNCTION {name} ({input_types}) " f"RETURNS {output_type} LANGUAGE python " f"HANDLER = '{name}' ADDRESS = 'http://{self._location}';" ) @@ -1412,7 +1417,7 @@ def _arrow_field_to_string(field: pa.Field) -> str: def _inner_field_to_string(field: pa.Field) -> str: # inner field default is NOT NULL in databend type_str = _field_type_to_string(field) - return f"{type_str} NULL" if field.nullable else type_str + return f"{type_str} NOT NULL" if not field.nullable else type_str def _field_type_to_string(field: pa.Field) -> str: diff --git a/python/tests/test_sql_generation.py b/python/tests/test_sql_generation.py new file mode 100644 index 0000000..7f5c82f --- /dev/null +++ b/python/tests/test_sql_generation.py @@ -0,0 +1,54 @@ +import logging +from unittest.mock import MagicMock, patch +import pyarrow as pa +from databend_udf import udf, StageLocation, UDFServer + +@udf(input_types=["INT"], result_type="INT") +def scalar_func(x: int) -> int: + return x + +@udf(stage_refs=["stage_loc"], input_types=["INT"], result_type="INT") +def stage_func(stage_loc: StageLocation, x: int) -> int: + return x + +@udf(input_types=["INT"], result_type=["INT"], batch_mode=True) +def table_func(x: int): + yield pa.RecordBatch.from_arrays([pa.array([x])], names=["res"]) + +from prometheus_client import REGISTRY + +def setup_function(): + for collector in list(REGISTRY._collector_to_names): + REGISTRY.unregister(collector) + + +def test_scalar_sql(): + with patch("databend_udf.udf.logger") as mock_logger: + server = UDFServer("0.0.0.0:0") + server.add_function(scalar_func) + + args, _ = mock_logger.info.call_args + log_msg = args[0] + assert "CREATE OR REPLACE FUNCTION scalar_func (x INT)" in log_msg + assert "RETURNS INT LANGUAGE python" in log_msg + +def test_stage_sql(): + with patch("databend_udf.udf.logger") as mock_logger: + server = UDFServer("0.0.0.0:0") + server.add_function(stage_func) + + args, _ = mock_logger.info.call_args + log_msg = args[0] + assert "CREATE OR REPLACE FUNCTION stage_func (stage_loc STAGE_LOCATION, x INT)" in log_msg + assert "RETURNS INT LANGUAGE python" in log_msg + +def test_table_sql(): + with patch("databend_udf.udf.logger") as mock_logger: + server = UDFServer("0.0.0.0:0") + server.add_function(table_func) + + args, _ = mock_logger.info.call_args + log_msg = args[0] + assert "CREATE OR REPLACE FUNCTION table_func (x INT)" in log_msg + assert "RETURNS TABLE (col0 INT) LANGUAGE python" in log_msg + diff --git a/python/tests/test_stage_location.py b/python/tests/test_stage_location.py index 9ae93ef..4ccc988 100644 --- a/python/tests/test_stage_location.py +++ b/python/tests/test_stage_location.py @@ -150,7 +150,7 @@ def test_multiple_stage_entries(): def test_missing_stage_mapping(): - with pytest.raises(ValueError, match="Missing stage mapping"): + with pytest.raises(ValueError, match=r"Missing stage mapping(.|\n)*CREATE FUNCTION"): _collect(describe_stage, _make_batch([1]), Headers()) From 6528c6d7460a1940664481929e5e4dc6e6e94a42 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Thu, 27 Nov 2025 18:59:13 +0800 Subject: [PATCH 2/9] fix: resolve lint errors in test_sql_generation.py --- python/tests/test_sql_generation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/test_sql_generation.py b/python/tests/test_sql_generation.py index 7f5c82f..ba5f7a2 100644 --- a/python/tests/test_sql_generation.py +++ b/python/tests/test_sql_generation.py @@ -1,6 +1,7 @@ import logging from unittest.mock import MagicMock, patch import pyarrow as pa +from prometheus_client import REGISTRY from databend_udf import udf, StageLocation, UDFServer @udf(input_types=["INT"], result_type="INT") @@ -15,7 +16,6 @@ def stage_func(stage_loc: StageLocation, x: int) -> int: def table_func(x: int): yield pa.RecordBatch.from_arrays([pa.array([x])], names=["res"]) -from prometheus_client import REGISTRY def setup_function(): for collector in list(REGISTRY._collector_to_names): From 52f30ee47e7fc340a9cd2215662aefb1a7f79272 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Thu, 27 Nov 2025 19:03:13 +0800 Subject: [PATCH 3/9] fix: remove unused import in test_sql_generation.py --- python/tests/test_sql_generation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/test_sql_generation.py b/python/tests/test_sql_generation.py index ba5f7a2..fb2e766 100644 --- a/python/tests/test_sql_generation.py +++ b/python/tests/test_sql_generation.py @@ -1,5 +1,5 @@ import logging -from unittest.mock import MagicMock, patch +from unittest.mock import patch import pyarrow as pa from prometheus_client import REGISTRY from databend_udf import udf, StageLocation, UDFServer From 0ea59aef887d566e18c2b7261caa861f21cd2231 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Thu, 27 Nov 2025 19:04:04 +0800 Subject: [PATCH 4/9] fix: remove unused logging import --- python/tests/test_sql_generation.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/tests/test_sql_generation.py b/python/tests/test_sql_generation.py index fb2e766..a212aae 100644 --- a/python/tests/test_sql_generation.py +++ b/python/tests/test_sql_generation.py @@ -1,4 +1,3 @@ -import logging from unittest.mock import patch import pyarrow as pa from prometheus_client import REGISTRY From 7acede89b79e8f9f362cf0ae9affc7baeeecf647 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Thu, 27 Nov 2025 19:06:40 +0800 Subject: [PATCH 5/9] style: format code with ruff --- python/tests/test_sql_generation.py | 17 ++++++++++++----- python/tests/test_stage_location.py | 4 +++- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/python/tests/test_sql_generation.py b/python/tests/test_sql_generation.py index a212aae..02e3399 100644 --- a/python/tests/test_sql_generation.py +++ b/python/tests/test_sql_generation.py @@ -3,14 +3,17 @@ from prometheus_client import REGISTRY from databend_udf import udf, StageLocation, UDFServer + @udf(input_types=["INT"], result_type="INT") def scalar_func(x: int) -> int: return x + @udf(stage_refs=["stage_loc"], input_types=["INT"], result_type="INT") def stage_func(stage_loc: StageLocation, x: int) -> int: return x + @udf(input_types=["INT"], result_type=["INT"], batch_mode=True) def table_func(x: int): yield pa.RecordBatch.from_arrays([pa.array([x])], names=["res"]) @@ -25,29 +28,33 @@ def test_scalar_sql(): with patch("databend_udf.udf.logger") as mock_logger: server = UDFServer("0.0.0.0:0") server.add_function(scalar_func) - + args, _ = mock_logger.info.call_args log_msg = args[0] assert "CREATE OR REPLACE FUNCTION scalar_func (x INT)" in log_msg assert "RETURNS INT LANGUAGE python" in log_msg + def test_stage_sql(): with patch("databend_udf.udf.logger") as mock_logger: server = UDFServer("0.0.0.0:0") server.add_function(stage_func) - + args, _ = mock_logger.info.call_args log_msg = args[0] - assert "CREATE OR REPLACE FUNCTION stage_func (stage_loc STAGE_LOCATION, x INT)" in log_msg + assert ( + "CREATE OR REPLACE FUNCTION stage_func (stage_loc STAGE_LOCATION, x INT)" + in log_msg + ) assert "RETURNS INT LANGUAGE python" in log_msg + def test_table_sql(): with patch("databend_udf.udf.logger") as mock_logger: server = UDFServer("0.0.0.0:0") server.add_function(table_func) - + args, _ = mock_logger.info.call_args log_msg = args[0] assert "CREATE OR REPLACE FUNCTION table_func (x INT)" in log_msg assert "RETURNS TABLE (col0 INT) LANGUAGE python" in log_msg - diff --git a/python/tests/test_stage_location.py b/python/tests/test_stage_location.py index 4ccc988..04dcc7b 100644 --- a/python/tests/test_stage_location.py +++ b/python/tests/test_stage_location.py @@ -150,7 +150,9 @@ def test_multiple_stage_entries(): def test_missing_stage_mapping(): - with pytest.raises(ValueError, match=r"Missing stage mapping(.|\n)*CREATE FUNCTION"): + with pytest.raises( + ValueError, match=r"Missing stage mapping(.|\n)*CREATE FUNCTION" + ): _collect(describe_stage, _make_batch([1]), Headers()) From 4b1c8a2d1d279efc16fb46db12d1fc31c68205a4 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Thu, 27 Nov 2025 19:10:28 +0800 Subject: [PATCH 6/9] fix: resolve module name collision in tests --- python/tests/test_sql_generation.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/python/tests/test_sql_generation.py b/python/tests/test_sql_generation.py index 02e3399..4636670 100644 --- a/python/tests/test_sql_generation.py +++ b/python/tests/test_sql_generation.py @@ -4,6 +4,10 @@ from databend_udf import udf, StageLocation, UDFServer +import sys +import databend_udf.udf # Ensure module is loaded +udf_module = sys.modules["databend_udf.udf"] + @udf(input_types=["INT"], result_type="INT") def scalar_func(x: int) -> int: return x @@ -25,7 +29,7 @@ def setup_function(): def test_scalar_sql(): - with patch("databend_udf.udf.logger") as mock_logger: + with patch.object(udf_module, "logger") as mock_logger: server = UDFServer("0.0.0.0:0") server.add_function(scalar_func) @@ -36,7 +40,7 @@ def test_scalar_sql(): def test_stage_sql(): - with patch("databend_udf.udf.logger") as mock_logger: + with patch.object(udf_module, "logger") as mock_logger: server = UDFServer("0.0.0.0:0") server.add_function(stage_func) @@ -50,7 +54,7 @@ def test_stage_sql(): def test_table_sql(): - with patch("databend_udf.udf.logger") as mock_logger: + with patch.object(udf_module, "logger") as mock_logger: server = UDFServer("0.0.0.0:0") server.add_function(table_func) From 1a625bdb8c5b0af5535699c212acaac7cf744211 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Thu, 27 Nov 2025 19:12:36 +0800 Subject: [PATCH 7/9] test: use caplog fixture instead of mocking logger --- python/tests/test_sql_generation.py | 41 ++++++++++------------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/python/tests/test_sql_generation.py b/python/tests/test_sql_generation.py index 4636670..109b013 100644 --- a/python/tests/test_sql_generation.py +++ b/python/tests/test_sql_generation.py @@ -1,13 +1,9 @@ -from unittest.mock import patch +import logging import pyarrow as pa from prometheus_client import REGISTRY from databend_udf import udf, StageLocation, UDFServer -import sys -import databend_udf.udf # Ensure module is loaded -udf_module = sys.modules["databend_udf.udf"] - @udf(input_types=["INT"], result_type="INT") def scalar_func(x: int) -> int: return x @@ -28,37 +24,28 @@ def setup_function(): REGISTRY.unregister(collector) -def test_scalar_sql(): - with patch.object(udf_module, "logger") as mock_logger: +def test_scalar_sql(caplog): + with caplog.at_level(logging.INFO): server = UDFServer("0.0.0.0:0") server.add_function(scalar_func) - - args, _ = mock_logger.info.call_args - log_msg = args[0] - assert "CREATE OR REPLACE FUNCTION scalar_func (x INT)" in log_msg - assert "RETURNS INT LANGUAGE python" in log_msg + + assert "CREATE OR REPLACE FUNCTION scalar_func (x INT)" in caplog.text + assert "RETURNS INT LANGUAGE python" in caplog.text -def test_stage_sql(): - with patch.object(udf_module, "logger") as mock_logger: +def test_stage_sql(caplog): + with caplog.at_level(logging.INFO): server = UDFServer("0.0.0.0:0") server.add_function(stage_func) - args, _ = mock_logger.info.call_args - log_msg = args[0] - assert ( - "CREATE OR REPLACE FUNCTION stage_func (stage_loc STAGE_LOCATION, x INT)" - in log_msg - ) - assert "RETURNS INT LANGUAGE python" in log_msg + assert "CREATE OR REPLACE FUNCTION stage_func (stage_loc STAGE_LOCATION, x INT)" in caplog.text + assert "RETURNS INT LANGUAGE python" in caplog.text -def test_table_sql(): - with patch.object(udf_module, "logger") as mock_logger: +def test_table_sql(caplog): + with caplog.at_level(logging.INFO): server = UDFServer("0.0.0.0:0") server.add_function(table_func) - args, _ = mock_logger.info.call_args - log_msg = args[0] - assert "CREATE OR REPLACE FUNCTION table_func (x INT)" in log_msg - assert "RETURNS TABLE (col0 INT) LANGUAGE python" in log_msg + assert "CREATE OR REPLACE FUNCTION table_func (x INT)" in caplog.text + assert "RETURNS TABLE (col0 INT) LANGUAGE python" in caplog.text From f1c3696a94ff5dd311b50b8944231b6f39d1adb7 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Thu, 27 Nov 2025 19:14:48 +0800 Subject: [PATCH 8/9] style: format code with ruff --- python/tests/test_sql_generation.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/tests/test_sql_generation.py b/python/tests/test_sql_generation.py index 109b013..9634fbe 100644 --- a/python/tests/test_sql_generation.py +++ b/python/tests/test_sql_generation.py @@ -28,7 +28,7 @@ def test_scalar_sql(caplog): with caplog.at_level(logging.INFO): server = UDFServer("0.0.0.0:0") server.add_function(scalar_func) - + assert "CREATE OR REPLACE FUNCTION scalar_func (x INT)" in caplog.text assert "RETURNS INT LANGUAGE python" in caplog.text @@ -38,7 +38,10 @@ def test_stage_sql(caplog): server = UDFServer("0.0.0.0:0") server.add_function(stage_func) - assert "CREATE OR REPLACE FUNCTION stage_func (stage_loc STAGE_LOCATION, x INT)" in caplog.text + assert ( + "CREATE OR REPLACE FUNCTION stage_func (stage_loc STAGE_LOCATION, x INT)" + in caplog.text + ) assert "RETURNS INT LANGUAGE python" in caplog.text From cceacbe67b7f9fb0c5d0eda1133fbdd1e032d4e1 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Thu, 27 Nov 2025 19:24:21 +0800 Subject: [PATCH 9/9] chore: bump version to 0.2.9 --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index afe9cd7..8d6998c 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -7,7 +7,7 @@ classifiers = [ description = "Databend UDF Server" license = { text = "Apache-2.0" } name = "databend-udf" -version = "0.2.8" +version = "0.2.9" readme = "README.md" requires-python = ">=3.7" dependencies = [