Skip to content

Commit 81be755

Browse files
authored
Merge pull request #136 from influxdata/tm/more-scalar-types
feat: add support for Python `Bytes`, `Date`, `Time` & `TimeDelta` scalar types.
2 parents 9e540ae + 1c05f6d commit 81be755

File tree

9 files changed

+862
-4
lines changed

9 files changed

+862
-4
lines changed

guests/python/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,14 @@ Types are mapped to/from [Apache Arrow] as follows:
5858
| Python | Arrow |
5959
| ------------ | ----------- |
6060
| [`bool`] | [`Boolean`] |
61+
| [`bytes`] | [`Binary`] |
62+
| [`date`] | [`Date32`] |
6163
| [`datetime`] | [`Timestamp`] w/ [`Microsecond`] and NO timezone |
6264
| [`float`] | [`Float64`] |
6365
| [`int`] | [`Int64`] |
6466
| [`str`] | [`Utf8`] |
67+
| [`time`] | [`Time64`] w/ [`Microsecond`] and NO timezone |
68+
| [`timedelta`]| [`Duration`] |
6569

6670
Additional types may be supported in the future.
6771

@@ -192,12 +196,20 @@ There is NO I/O available that escapes the sandbox. The [Python Standard Library
192196
[Apache DataFusion]: https://datafusion.apache.org/
193197
[`bool`]: https://docs.python.org/3/library/stdtypes.html#boolean-type-bool
194198
[`Boolean`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Boolean
199+
[`bytes`]: https://docs.python.org/3/library/stdtypes.html#bytes
200+
[`Binary`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Binary
201+
[`date`]: https://docs.python.org/3/library/datetime.html#datetime.date
202+
[`Date32`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Date32
195203
[`datetime`]: https://docs.python.org/3/library/datetime.html#datetime.datetime
196204
[`float`]: https://docs.python.org/3/library/stdtypes.html#numeric-types-int-float-complex
197205
[`Float64`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Float64
198206
[`functools.cache`]: https://docs.python.org/3/library/functools.html#functools.cache
199207
[`int`]: https://docs.python.org/3/library/stdtypes.html#numeric-types-int-float-complex
200208
[`Int64`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Int64
209+
[`time`]: https://docs.python.org/3/library/datetime.html#datetime.time
210+
[`Time64`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Time64
211+
[`timedelta`]: https://docs.python.org/3/library/datetime.html#datetime.timedelta
212+
[`Duration`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.DataType.html#variant.Duration
201213
[`Microsecond`]: https://docs.rs/arrow/latest/arrow/datatypes/enum.TimeUnit.html#variant.Microsecond
202214
[Python 3.14.0]: https://www.python.org/downloads/release/python-3140
203215
[Python Standard Library]: https://docs.python.org/3/library/index.html

guests/python/src/conversion.rs

Lines changed: 250 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@ use std::{ops::ControlFlow, sync::Arc};
33

44
use arrow::{
55
array::{
6-
Array, ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, StringBuilder,
6+
Array, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, DurationMicrosecondBuilder,
7+
Float64Builder, Int64Builder, StringBuilder, Time64MicrosecondBuilder,
78
TimestampMicrosecondBuilder,
89
},
910
datatypes::{DataType, TimeUnit},
1011
};
1112
use chrono::{DateTime, Datelike, NaiveDate, TimeZone, Timelike, Utc};
1213
use datafusion_common::{
1314
cast::{
14-
as_boolean_array, as_float64_array, as_int64_array, as_string_array,
15+
as_binary_array, as_boolean_array, as_date32_array, as_duration_microsecond_array,
16+
as_float64_array, as_int64_array, as_string_array, as_time64_microsecond_array,
1517
as_timestamp_microsecond_array,
1618
},
1719
error::Result as DataFusionResult,
@@ -20,8 +22,8 @@ use datafusion_common::{
2022
use pyo3::{
2123
Bound, BoundObject, IntoPyObjectExt, PyAny, Python,
2224
types::{
23-
PyAnyMethods, PyDateAccess, PyDateTime, PyInt, PyNone, PyStringMethods, PyTimeAccess,
24-
PyTzInfoAccess,
25+
PyAnyMethods, PyBytes, PyDate, PyDateAccess, PyDateTime, PyDelta, PyInt, PyNone,
26+
PyStringMethods, PyTime, PyTimeAccess, PyTzInfoAccess,
2527
},
2628
};
2729

@@ -52,6 +54,10 @@ impl PythonType {
5254
Self::Float => DataType::Float64,
5355
Self::Int => DataType::Int64,
5456
Self::Str => DataType::Utf8,
57+
Self::Bytes => DataType::Binary,
58+
Self::Date => DataType::Date32,
59+
Self::Time => DataType::Time64(TimeUnit::Microsecond),
60+
Self::Timedelta => DataType::Duration(TimeUnit::Microsecond),
5561
}
5662
}
5763

@@ -175,6 +181,105 @@ impl PythonType {
175181
.transpose()
176182
});
177183

184+
Ok(Box::new(it))
185+
}
186+
Self::Bytes => {
187+
let array = as_binary_array(array)?;
188+
189+
let it = array.into_iter().map(move |maybe_val| {
190+
maybe_val
191+
.map(|val| {
192+
PyBytes::new(py, val).into_bound_py_any(py).map_err(|e| {
193+
exec_datafusion_err!(
194+
"cannot convert Rust `&[u8]` value to Python bytes: {e}"
195+
)
196+
})
197+
})
198+
.transpose()
199+
});
200+
201+
Ok(Box::new(it))
202+
}
203+
Self::Date => {
204+
let array = as_date32_array(array)?;
205+
206+
let it = array.into_iter().map(move |maybe_val| {
207+
maybe_val
208+
.map(|val| {
209+
let days_since_epoch = val;
210+
let epoch = NaiveDate::from_epoch_days(0)
211+
.ok_or_else(|| exec_datafusion_err!("cannot create epoch date"))?;
212+
let date = epoch + chrono::Duration::days(days_since_epoch as i64);
213+
214+
PyDate::new(
215+
py,
216+
date.year(),
217+
date.month()
218+
.try_into()
219+
.map_err(|e| exec_datafusion_err!("month out of range: {e}"))?,
220+
date.day()
221+
.try_into()
222+
.map_err(|e| exec_datafusion_err!("day out of range: {e}"))?,
223+
)
224+
.map_err(|e| exec_datafusion_err!("cannot create PyDate: {e}"))?
225+
.into_bound_py_any(py)
226+
.map_err(|e| exec_datafusion_err!("cannot convert PyDate to any: {e}"))
227+
})
228+
.transpose()
229+
});
230+
231+
Ok(Box::new(it))
232+
}
233+
Self::Time => {
234+
let array = as_time64_microsecond_array(array)?;
235+
236+
let it = array.into_iter().map(move |maybe_val| {
237+
maybe_val
238+
.map(|val| {
239+
let microseconds = val;
240+
let total_seconds = microseconds / 1_000_000;
241+
let remaining_microseconds = (microseconds % 1_000_000) as u32;
242+
243+
let hours = (total_seconds / 3600) as u8;
244+
let minutes = ((total_seconds % 3600) / 60) as u8;
245+
let seconds = (total_seconds % 60) as u8;
246+
247+
PyTime::new(py, hours, minutes, seconds, remaining_microseconds, None)
248+
.map_err(|e| exec_datafusion_err!("cannot create PyTime: {e}"))?
249+
.into_bound_py_any(py)
250+
.map_err(|e| {
251+
exec_datafusion_err!("cannot convert PyTime to any: {e}")
252+
})
253+
})
254+
.transpose()
255+
});
256+
257+
Ok(Box::new(it))
258+
}
259+
Self::Timedelta => {
260+
let array = as_duration_microsecond_array(array)?;
261+
262+
let it = array.into_iter().map(move |maybe_val| {
263+
maybe_val
264+
.map(|val| {
265+
let microseconds = val;
266+
let total_seconds = microseconds / 1_000_000;
267+
let remaining_microseconds = microseconds % 1_000_000;
268+
269+
PyDelta::new(
270+
py,
271+
(total_seconds / 86400) as i32, // days
272+
(total_seconds % 86400) as i32, // seconds
273+
remaining_microseconds as i32, // microseconds
274+
false,
275+
)
276+
.map_err(|e| exec_datafusion_err!("cannot create PyDelta: {e}"))?
277+
.into_bound_py_any(py)
278+
.map_err(|e| exec_datafusion_err!("cannot convert PyDelta to any: {e}"))
279+
})
280+
.transpose()
281+
});
282+
178283
Ok(Box::new(it))
179284
}
180285
}
@@ -190,6 +295,10 @@ impl PythonType {
190295
Self::Float => Box::new(Float64Builder::with_capacity(num_rows)),
191296
Self::Int => Box::new(Int64Builder::with_capacity(num_rows)),
192297
Self::Str => Box::new(StringBuilder::with_capacity(num_rows, 1024)),
298+
Self::Bytes => Box::new(BinaryBuilder::with_capacity(num_rows, 1024)),
299+
Self::Date => Box::new(Date32Builder::with_capacity(num_rows)),
300+
Self::Time => Box::new(Time64MicrosecondBuilder::with_capacity(num_rows)),
301+
Self::Timedelta => Box::new(DurationMicrosecondBuilder::with_capacity(num_rows)),
193302
}
194303
}
195304
}
@@ -422,3 +531,140 @@ impl<'py> ArrayBuilder<'py> for TimestampMicrosecondBuilder {
422531
Arc::new(self.finish())
423532
}
424533
}
534+
535+
impl<'py> ArrayBuilder<'py> for BinaryBuilder {
536+
fn push(&mut self, val: Bound<'py, PyAny>) -> DataFusionResult<()> {
537+
let val = val.cast_exact::<PyBytes>().map_err(|_| {
538+
exec_datafusion_err!("expected `bytes` but got {}", py_representation(&val))
539+
})?;
540+
let val: &[u8] = val.extract().map_err(|_| {
541+
exec_datafusion_err!("cannot extract bytes from {}", py_representation(val))
542+
})?;
543+
self.append_value(val);
544+
Ok(())
545+
}
546+
547+
fn skip(&mut self) {
548+
self.append_null();
549+
}
550+
551+
fn finish(&mut self) -> ArrayRef {
552+
Arc::new(self.finish())
553+
}
554+
}
555+
556+
impl<'py> ArrayBuilder<'py> for Date32Builder {
557+
fn push(&mut self, val: Bound<'py, PyAny>) -> DataFusionResult<()> {
558+
let val = val.cast_exact::<PyDate>().map_err(|_| {
559+
exec_datafusion_err!("expected `date` but got {}", py_representation(&val))
560+
})?;
561+
562+
let date =
563+
NaiveDate::from_ymd_opt(val.get_year(), val.get_month().into(), val.get_day().into())
564+
.ok_or_else(|| {
565+
exec_datafusion_err!(
566+
"cannot create NaiveDate based on year-month-day of {}",
567+
py_representation(val)
568+
)
569+
})?;
570+
571+
let epoch = NaiveDate::from_epoch_days(0)
572+
.ok_or_else(|| exec_datafusion_err!("cannot create epoch date"))?;
573+
574+
let days_since_epoch = date.signed_duration_since(epoch).num_days();
575+
let days_since_epoch_i32: i32 = days_since_epoch.try_into().map_err(|_| {
576+
exec_datafusion_err!(
577+
"date is out of range for Date32: {} days since epoch",
578+
days_since_epoch
579+
)
580+
})?;
581+
582+
self.append_value(days_since_epoch_i32);
583+
Ok(())
584+
}
585+
586+
fn skip(&mut self) {
587+
self.append_null();
588+
}
589+
590+
fn finish(&mut self) -> ArrayRef {
591+
Arc::new(self.finish())
592+
}
593+
}
594+
595+
impl<'py> ArrayBuilder<'py> for Time64MicrosecondBuilder {
596+
fn push(&mut self, val: Bound<'py, PyAny>) -> DataFusionResult<()> {
597+
let val = val.cast_exact::<PyTime>().map_err(|_| {
598+
exec_datafusion_err!("expected `time` but got {}", py_representation(&val))
599+
})?;
600+
601+
if let Some(tzinfo) = val.get_tzinfo() {
602+
let s = tzinfo
603+
.str()
604+
.and_then(|name| name.to_str().map(|s| s.to_owned()))
605+
.unwrap_or_else(|_| "<unknown>".to_owned());
606+
return exec_err!("expected no tzinfo, got {s}");
607+
}
608+
609+
let hours = val.get_hour() as i64;
610+
let minutes = val.get_minute() as i64;
611+
let seconds = val.get_second() as i64;
612+
let microseconds = val.get_microsecond() as i64;
613+
614+
let total_microseconds = hours * 3600 * 1_000_000
615+
+ minutes * 60 * 1_000_000
616+
+ seconds * 1_000_000
617+
+ microseconds;
618+
619+
self.append_value(total_microseconds);
620+
Ok(())
621+
}
622+
623+
fn skip(&mut self) {
624+
self.append_null();
625+
}
626+
627+
fn finish(&mut self) -> ArrayRef {
628+
Arc::new(self.finish())
629+
}
630+
}
631+
632+
impl<'py> ArrayBuilder<'py> for DurationMicrosecondBuilder {
633+
fn push(&mut self, val: Bound<'py, PyAny>) -> DataFusionResult<()> {
634+
let val = val.cast_exact::<PyDelta>().map_err(|_| {
635+
exec_datafusion_err!("expected `timedelta` but got {}", py_representation(&val))
636+
})?;
637+
638+
// Extract the timedelta components using the standard methods
639+
let days: i64 = val
640+
.getattr("days")
641+
.map_err(|_| exec_datafusion_err!("cannot get days from timedelta"))?
642+
.extract()
643+
.map_err(|_| exec_datafusion_err!("cannot extract days as i64"))?;
644+
645+
let seconds: i64 = val
646+
.getattr("seconds")
647+
.map_err(|_| exec_datafusion_err!("cannot get seconds from timedelta"))?
648+
.extract()
649+
.map_err(|_| exec_datafusion_err!("cannot extract seconds as i64"))?;
650+
651+
let microseconds: i64 = val
652+
.getattr("microseconds")
653+
.map_err(|_| exec_datafusion_err!("cannot get microseconds from timedelta"))?
654+
.extract()
655+
.map_err(|_| exec_datafusion_err!("cannot extract microseconds as i64"))?;
656+
657+
let total_microseconds = days * 86400 * 1_000_000 + seconds * 1_000_000 + microseconds;
658+
659+
self.append_value(total_microseconds);
660+
Ok(())
661+
}
662+
663+
fn skip(&mut self) {
664+
self.append_null();
665+
}
666+
667+
fn finish(&mut self) -> ArrayRef {
668+
Arc::new(self.finish())
669+
}
670+
}

guests/python/src/inspect.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,24 @@ impl<'a, 'py> FromPyObject<'a, 'py> for PythonType {
2323
// https://docs.python.org/3/library/builtins.html
2424
let mod_builtins = py.import(intern!(py, "builtins"))?;
2525
let type_bool = mod_builtins.getattr(intern!(py, "bool"))?;
26+
let type_bytes = mod_builtins.getattr(intern!(py, "bytes"))?;
2627
let type_float = mod_builtins.getattr(intern!(py, "float"))?;
2728
let type_int = mod_builtins.getattr(intern!(py, "int"))?;
2829
let type_str = mod_builtins.getattr(intern!(py, "str"))?;
2930

3031
// https://docs.python.org/3/library/datetime.html
3132
let mod_datetime = py.import(intern!(py, "datetime"))?;
33+
let type_date = mod_datetime.getattr(intern!(py, "date"))?;
3234
let type_dateime = mod_datetime.getattr(intern!(py, "datetime"))?;
35+
let type_time = mod_datetime.getattr(intern!(py, "time"))?;
36+
let type_timedelta = mod_datetime.getattr(intern!(py, "timedelta"))?;
3337

3438
if ob.is(type_bool) {
3539
Ok(Self::Bool)
40+
} else if ob.is(type_bytes) {
41+
Ok(Self::Bytes)
42+
} else if ob.is(type_date) {
43+
Ok(Self::Date)
3644
} else if ob.is(type_dateime) {
3745
Ok(Self::DateTime)
3846
} else if ob.is(type_float) {
@@ -41,6 +49,10 @@ impl<'a, 'py> FromPyObject<'a, 'py> for PythonType {
4149
Ok(Self::Int)
4250
} else if ob.is(type_str) {
4351
Ok(Self::Str)
52+
} else if ob.is(type_time) {
53+
Ok(Self::Time)
54+
} else if ob.is(type_timedelta) {
55+
Ok(Self::Timedelta)
4456
} else {
4557
Err(PyErr::new::<PyTypeError, _>(format!(
4658
"unknown annotation type: {}",

0 commit comments

Comments
 (0)