Skip to content

Commit e80a738

Browse files
committed
chore: move udf_query into its own module
2 parents c94e670 + 81be755 commit e80a738

File tree

39 files changed

+959
-274
lines changed

39 files changed

+959
-274
lines changed

Cargo.lock

Lines changed: 1 addition & 19 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ datafusion-sql = { version = "49.0.1", default-features = false }
2525
datafusion-udf-wasm-arrow2bytes = { path = "arrow2bytes", version = "0.1.0" }
2626
datafusion-udf-wasm-bundle = { path = "guests/bundle", version = "0.1.0" }
2727
datafusion-udf-wasm-guest = { path = "guests/rust", version = "0.1.0" }
28+
datafusion-udf-wasm-host = { path = "host", version = "0.1.0" }
2829
datafusion-udf-wasm-python = { path = "guests/python", version = "0.1.0" }
2930
datafusion-udf-wasm-query = { path = "query", version = "0.1.0" }
3031
http = { version = "1.3.1", default-features = false }
3132
hyper = { version = "1.7", default-features = false }
33+
insta = { version = "1.43.2", "default-features" = false }
3234
pyo3 = { version = "0.27.1", default-features = false, features = ["macros"] }
3335
sqlparser = { version = "0.55.0", default-features = false, features = [
3436
"std",

guests/python/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,14 @@ Types are mapped to/from [Apache Arrow] as follows:
5858
| Python | Arrow |
5959
| ------------ | ----------- |
6060
| [`bool`] | [`Boolean`] |
61+
| [`bytes`] | [`Binary`] |
62+
| [`date`] | [`Date32`] |
6163
| [`datetime`] | [`Timestamp`] w/ [`Microsecond`] and NO timezone |
6264
| [`float`] | [`Float64`] |
6365
| [`int`] | [`Int64`] |
6466
| [`str`] | [`Utf8`] |
67+
| [`time`] | [`Time64`] w/ [`Microsecond`] and NO timezone |
68+
| [`timedelta`]| [`Duration`] |
6569

6670
Additional types may be supported in the future.
6771

@@ -192,12 +196,20 @@ There is NO I/O available that escapes the sandbox. The [Python Standard Library
192196
[Apache DataFusion]: https://datafusion.apache.org/
193197
[`bool`]: https://docs.python.org/3/library/stdtypes.html#boolean-type-bool
194198
[`Boolean`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Boolean
199+
[`bytes`]: https://docs.python.org/3/library/stdtypes.html#bytes
200+
[`Binary`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Binary
201+
[`date`]: https://docs.python.org/3/library/datetime.html#datetime.date
202+
[`Date32`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Date32
195203
[`datetime`]: https://docs.python.org/3/library/datetime.html#datetime.datetime
196204
[`float`]: https://docs.python.org/3/library/stdtypes.html#numeric-types-int-float-complex
197205
[`Float64`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Float64
198206
[`functools.cache`]: https://docs.python.org/3/library/functools.html#functools.cache
199207
[`int`]: https://docs.python.org/3/library/stdtypes.html#numeric-types-int-float-complex
200208
[`Int64`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Int64
209+
[`time`]: https://docs.python.org/3/library/datetime.html#datetime.time
210+
[`Time64`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Time64
211+
[`timedelta`]: https://docs.python.org/3/library/datetime.html#datetime.timedelta
212+
[`Duration`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Duration
201213
[`Microsecond`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.TimeUnit.html#variant.Microsecond
202214
[Python 3.14.0]: https://www.python.org/downloads/release/python-3140
203215
[Python Standard Library]: https://docs.python.org/3/library/index.html

guests/python/src/conversion.rs

Lines changed: 250 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@ use std::{ops::ControlFlow, sync::Arc};
33

44
use arrow::{
55
array::{
6-
Array, ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, StringBuilder,
6+
Array, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
7+
Float64Builder, Int64Builder, StringBuilder, Time64MicrosecondBuilder,
78
TimestampMicrosecondBuilder,
89
},
910
datatypes::{DataType, TimeUnit},
1011
};
1112
use chrono::{DateTime, Datelike, NaiveDate, TimeZone, Timelike, Utc};
1213
use datafusion_common::{
1314
cast::{
14-
as_boolean_array, as_float64_array, as_int64_array, as_string_array,
15+
as_binary_array, as_boolean_array, as_date32_array, as_duration_microsecond_array,
16+
as_float64_array, as_int64_array, as_string_array, as_time64_microsecond_array,
1517
as_timestamp_microsecond_array,
1618
},
1719
error::Result as DataFusionResult,
@@ -20,8 +22,8 @@ use datafusion_common::{
2022
use pyo3::{
2123
Bound, BoundObject, IntoPyObjectExt, PyAny, Python,
2224
types::{
23-
PyAnyMethods, PyDateAccess, PyDateTime, PyInt, PyNone, PyStringMethods, PyTimeAccess,
24-
PyTzInfoAccess,
25+
PyAnyMethods, PyBytes, PyDate, PyDateAccess, PyDateTime, PyDelta, PyInt, PyNone,
26+
PyStringMethods, PyTime, PyTimeAccess, PyTzInfoAccess,
2527
},
2628
};
2729

@@ -52,6 +54,10 @@ impl PythonType {
5254
Self::Float => DataType::Float64,
5355
Self::Int => DataType::Int64,
5456
Self::Str => DataType::Utf8,
57+
Self::Bytes => DataType::Binary,
58+
Self::Date => DataType::Date32,
59+
Self::Time => DataType::Time64(TimeUnit::Microsecond),
60+
Self::Timedelta => DataType::Duration(TimeUnit::Microsecond),
5561
}
5662
}
5763

@@ -175,6 +181,105 @@ impl PythonType {
175181
.transpose()
176182
});
177183

184+
Ok(Box::new(it))
185+
}
186+
Self::Bytes => {
187+
let array = as_binary_array(array)?;
188+
189+
let it = array.into_iter().map(move |maybe_val| {
190+
maybe_val
191+
.map(|val| {
192+
PyBytes::new(py, val).into_bound_py_any(py).map_err(|e| {
193+
exec_datafusion_err!(
194+
"cannot convert Rust `&[u8]` value to Python bytes: {e}"
195+
)
196+
})
197+
})
198+
.transpose()
199+
});
200+
201+
Ok(Box::new(it))
202+
}
203+
Self::Date => {
204+
let array = as_date32_array(array)?;
205+
206+
let it = array.into_iter().map(move |maybe_val| {
207+
maybe_val
208+
.map(|val| {
209+
let days_since_epoch = val;
210+
let epoch = NaiveDate::from_epoch_days(0)
211+
.ok_or_else(|| exec_datafusion_err!("cannot create epoch date"))?;
212+
let date = epoch + chrono::Duration::days(days_since_epoch as i64);
213+
214+
PyDate::new(
215+
py,
216+
date.year(),
217+
date.month()
218+
.try_into()
219+
.map_err(|e| exec_datafusion_err!("month out of range: {e}"))?,
220+
date.day()
221+
.try_into()
222+
.map_err(|e| exec_datafusion_err!("day out of range: {e}"))?,
223+
)
224+
.map_err(|e| exec_datafusion_err!("cannot create PyDate: {e}"))?
225+
.into_bound_py_any(py)
226+
.map_err(|e| exec_datafusion_err!("cannot convert PyDate to any: {e}"))
227+
})
228+
.transpose()
229+
});
230+
231+
Ok(Box::new(it))
232+
}
233+
Self::Time => {
234+
let array = as_time64_microsecond_array(array)?;
235+
236+
let it = array.into_iter().map(move |maybe_val| {
237+
maybe_val
238+
.map(|val| {
239+
let microseconds = val;
240+
let total_seconds = microseconds / 1_000_000;
241+
let remaining_microseconds = (microseconds % 1_000_000) as u32;
242+
243+
let hours = (total_seconds / 3600) as u8;
244+
let minutes = ((total_seconds % 3600) / 60) as u8;
245+
let seconds = (total_seconds % 60) as u8;
246+
247+
PyTime::new(py, hours, minutes, seconds, remaining_microseconds, None)
248+
.map_err(|e| exec_datafusion_err!("cannot create PyTime: {e}"))?
249+
.into_bound_py_any(py)
250+
.map_err(|e| {
251+
exec_datafusion_err!("cannot convert PyTime to any: {e}")
252+
})
253+
})
254+
.transpose()
255+
});
256+
257+
Ok(Box::new(it))
258+
}
259+
Self::Timedelta => {
260+
let array = as_duration_microsecond_array(array)?;
261+
262+
let it = array.into_iter().map(move |maybe_val| {
263+
maybe_val
264+
.map(|val| {
265+
let microseconds = val;
266+
let total_seconds = microseconds / 1_000_000;
267+
let remaining_microseconds = microseconds % 1_000_000;
268+
269+
PyDelta::new(
270+
py,
271+
(total_seconds / 86400) as i32, // days
272+
(total_seconds % 86400) as i32, // seconds
273+
remaining_microseconds as i32, // microseconds
274+
false,
275+
)
276+
.map_err(|e| exec_datafusion_err!("cannot create PyDelta: {e}"))?
277+
.into_bound_py_any(py)
278+
.map_err(|e| exec_datafusion_err!("cannot convert PyDelta to any: {e}"))
279+
})
280+
.transpose()
281+
});
282+
178283
Ok(Box::new(it))
179284
}
180285
}
@@ -190,6 +295,10 @@ impl PythonType {
190295
Self::Float => Box::new(Float64Builder::with_capacity(num_rows)),
191296
Self::Int => Box::new(Int64Builder::with_capacity(num_rows)),
192297
Self::Str => Box::new(StringBuilder::with_capacity(num_rows, 1024)),
298+
Self::Bytes => Box::new(BinaryBuilder::with_capacity(num_rows, 1024)),
299+
Self::Date => Box::new(Date32Builder::with_capacity(num_rows)),
300+
Self::Time => Box::new(Time64MicrosecondBuilder::with_capacity(num_rows)),
301+
Self::Timedelta => Box::new(DurationMicrosecondBuilder::with_capacity(num_rows)),
193302
}
194303
}
195304
}
@@ -422,3 +531,140 @@ impl<'py> ArrayBuilder<'py> for TimestampMicrosecondBuilder {
422531
Arc::new(self.finish())
423532
}
424533
}
534+
535+
impl<'py> ArrayBuilder<'py> for BinaryBuilder {
536+
fn push(&mut self, val: Bound<'py, PyAny>) -> DataFusionResult<()> {
537+
let val = val.cast_exact::<PyBytes>().map_err(|_| {
538+
exec_datafusion_err!("expected `bytes` but got {}", py_representation(&val))
539+
})?;
540+
let val: &[u8] = val.extract().map_err(|_| {
541+
exec_datafusion_err!("cannot extract bytes from {}", py_representation(val))
542+
})?;
543+
self.append_value(val);
544+
Ok(())
545+
}
546+
547+
fn skip(&mut self) {
548+
self.append_null();
549+
}
550+
551+
fn finish(&mut self) -> ArrayRef {
552+
Arc::new(self.finish())
553+
}
554+
}
555+
556+
impl<'py> ArrayBuilder<'py> for Date32Builder {
557+
fn push(&mut self, val: Bound<'py, PyAny>) -> DataFusionResult<()> {
558+
let val = val.cast_exact::<PyDate>().map_err(|_| {
559+
exec_datafusion_err!("expected `date` but got {}", py_representation(&val))
560+
})?;
561+
562+
let date =
563+
NaiveDate::from_ymd_opt(val.get_year(), val.get_month().into(), val.get_day().into())
564+
.ok_or_else(|| {
565+
exec_datafusion_err!(
566+
"cannot create NaiveDate based on year-month-day of {}",
567+
py_representation(val)
568+
)
569+
})?;
570+
571+
let epoch = NaiveDate::from_epoch_days(0)
572+
.ok_or_else(|| exec_datafusion_err!("cannot create epoch date"))?;
573+
574+
let days_since_epoch = date.signed_duration_since(epoch).num_days();
575+
let days_since_epoch_i32: i32 = days_since_epoch.try_into().map_err(|_| {
576+
exec_datafusion_err!(
577+
"date is out of range for Date32: {} days since epoch",
578+
days_since_epoch
579+
)
580+
})?;
581+
582+
self.append_value(days_since_epoch_i32);
583+
Ok(())
584+
}
585+
586+
fn skip(&mut self) {
587+
self.append_null();
588+
}
589+
590+
fn finish(&mut self) -> ArrayRef {
591+
Arc::new(self.finish())
592+
}
593+
}
594+
595+
impl<'py> ArrayBuilder<'py> for Time64MicrosecondBuilder {
596+
fn push(&mut self, val: Bound<'py, PyAny>) -> DataFusionResult<()> {
597+
let val = val.cast_exact::<PyTime>().map_err(|_| {
598+
exec_datafusion_err!("expected `time` but got {}", py_representation(&val))
599+
})?;
600+
601+
if let Some(tzinfo) = val.get_tzinfo() {
602+
let s = tzinfo
603+
.str()
604+
.and_then(|name| name.to_str().map(|s| s.to_owned()))
605+
.unwrap_or_else(|_| "<unknown>".to_owned());
606+
return exec_err!("expected no tzinfo, got {s}");
607+
}
608+
609+
let hours = val.get_hour() as i64;
610+
let minutes = val.get_minute() as i64;
611+
let seconds = val.get_second() as i64;
612+
let microseconds = val.get_microsecond() as i64;
613+
614+
let total_microseconds = hours * 3600 * 1_000_000
615+
+ minutes * 60 * 1_000_000
616+
+ seconds * 1_000_000
617+
+ microseconds;
618+
619+
self.append_value(total_microseconds);
620+
Ok(())
621+
}
622+
623+
fn skip(&mut self) {
624+
self.append_null();
625+
}
626+
627+
fn finish(&mut self) -> ArrayRef {
628+
Arc::new(self.finish())
629+
}
630+
}
631+
632+
impl<'py> ArrayBuilder<'py> for DurationMicrosecondBuilder {
633+
fn push(&mut self, val: Bound<'py, PyAny>) -> DataFusionResult<()> {
634+
let val = val.cast_exact::<PyDelta>().map_err(|_| {
635+
exec_datafusion_err!("expected `timedelta` but got {}", py_representation(&val))
636+
})?;
637+
638+
// Extract the timedelta components using the standard methods
639+
let days: i64 = val
640+
.getattr("days")
641+
.map_err(|_| exec_datafusion_err!("cannot get days from timedelta"))?
642+
.extract()
643+
.map_err(|_| exec_datafusion_err!("cannot extract days as i64"))?;
644+
645+
let seconds: i64 = val
646+
.getattr("seconds")
647+
.map_err(|_| exec_datafusion_err!("cannot get seconds from timedelta"))?
648+
.extract()
649+
.map_err(|_| exec_datafusion_err!("cannot extract seconds as i64"))?;
650+
651+
let microseconds: i64 = val
652+
.getattr("microseconds")
653+
.map_err(|_| exec_datafusion_err!("cannot get microseconds from timedelta"))?
654+
.extract()
655+
.map_err(|_| exec_datafusion_err!("cannot extract microseconds as i64"))?;
656+
657+
let total_microseconds = days * 86400 * 1_000_000 + seconds * 1_000_000 + microseconds;
658+
659+
self.append_value(total_microseconds);
660+
Ok(())
661+
}
662+
663+
fn skip(&mut self) {
664+
self.append_null();
665+
}
666+
667+
fn finish(&mut self) -> ArrayRef {
668+
Arc::new(self.finish())
669+
}
670+
}

0 commit comments

Comments
 (0)