Skip to content

Commit f43174d

Browse files
jaychiaJay Chia
andauthored
[FEAT] Enable feature-flagged native downloader in daft.read_parquet (#1190)
Enables changes to `daft.read_parquet` such that when we specify `use_native_downloader=True`, this uses our new Rust-based native Parquet downloading and parsing for: 1. Schema inference 2. Retrieving per-file metadata (currently only the number of rows) 3. Reading the actual `Table` --------- Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
1 parent bacd70e commit f43174d

File tree

15 files changed

+115
-18
lines changed

15 files changed

+115
-18
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

daft/datasources.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
import sys
44
from dataclasses import dataclass
55
from enum import Enum
6+
from typing import TYPE_CHECKING
7+
8+
if TYPE_CHECKING:
9+
from daft.io import IOConfig
610

711
if sys.version_info < (3, 8):
812
from typing_extensions import Protocol
@@ -41,5 +45,9 @@ def scan_type(self):
4145

4246
@dataclass(frozen=True)
4347
class ParquetSourceInfo(SourceInfo):
48+
49+
use_native_downloader: bool
50+
io_config: IOConfig | None
51+
4452
def scan_type(self):
4553
return StorageType.PARQUET

daft/execution/execution_step.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,8 @@ def _handle_tabular_files_scan(
400400
schema=schema,
401401
fs=fs,
402402
read_options=read_options,
403+
io_config=scan._source_info.io_config,
404+
use_native_downloader=scan._source_info.use_native_downloader,
403405
)
404406
for fp in filepaths
405407
]

daft/filesystem.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
)
2828

2929
from daft.datasources import ParquetSourceInfo, SourceInfo
30+
from daft.table import Table
3031

3132
_CACHED_FSES: dict[str, FileSystem] = {}
3233

@@ -328,9 +329,16 @@ def glob_path_with_stats(
328329

329330
# Set number of rows if available.
330331
if isinstance(source_info, ParquetSourceInfo):
331-
parquet_metadatas = ThreadPoolExecutor().map(_get_parquet_metadata_single, filepaths_to_infos.keys())
332-
for path, parquet_metadata in zip(filepaths_to_infos.keys(), parquet_metadatas):
333-
filepaths_to_infos[path]["rows"] = parquet_metadata.num_rows
332+
if source_info.use_native_downloader:
333+
parquet_statistics = Table.read_parquet_statistics(
334+
list(filepaths_to_infos.keys()), source_info.io_config
335+
).to_pydict()
336+
for path, num_rows in zip(parquet_statistics["uris"], parquet_statistics["row_count"]):
337+
filepaths_to_infos[path]["rows"] = num_rows
338+
else:
339+
parquet_metadatas = ThreadPoolExecutor().map(_get_parquet_metadata_single, filepaths_to_infos.keys())
340+
for path, parquet_metadata in zip(filepaths_to_infos.keys(), parquet_metadatas):
341+
filepaths_to_infos[path]["rows"] = parquet_metadata.num_rows
334342

335343
return [
336344
ListingInfo(path=_ensure_path_protocol(protocol, path), **infos) for path, infos in filepaths_to_infos.items()

daft/io/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
from __future__ import annotations
22

3-
from daft.daft import PyIOConfig as IOConfig
4-
from daft.daft import PyS3Config as S3Config
53
from daft.io._csv import read_csv
64
from daft.io._json import read_json
5+
from daft.io.config import IOConfig, S3Config
76
from daft.io.file_path import from_glob_path
87
from daft.io.parquet import read_parquet
98

daft/io/config.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from __future__ import annotations
2+
3+
import json
4+
5+
from daft.daft import PyIOConfig as IOConfig
6+
from daft.daft import PyS3Config as S3Config
7+
8+
9+
def _io_config_from_json(io_config_json: str) -> IOConfig:
10+
"""Used when deserializing a serialized IOConfig object"""
11+
data = json.loads(io_config_json)
12+
s3_config = S3Config(**data["s3"]) if "s3" in data else None
13+
return IOConfig(s3=s3_config)

daft/io/parquet.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# isort: dont-add-import: from __future__ import annotations
22

3-
from typing import Dict, List, Optional, Union
3+
from typing import TYPE_CHECKING, Dict, List, Optional, Union
44

55
import fsspec
66

@@ -10,12 +10,17 @@
1010
from daft.datatype import DataType
1111
from daft.io.common import _get_tabular_files_scan
1212

13+
if TYPE_CHECKING:
14+
from daft.io import IOConfig
15+
1316

1417
@PublicAPI
1518
def read_parquet(
1619
path: Union[str, List[str]],
1720
schema_hints: Optional[Dict[str, DataType]] = None,
1821
fs: Optional[fsspec.AbstractFileSystem] = None,
22+
io_config: Optional["IOConfig"] = None,
23+
use_native_downloader: bool = False,
1924
) -> DataFrame:
2025
"""Creates a DataFrame from Parquet file(s)
2126
@@ -31,6 +36,9 @@ def read_parquet(
3136
disable all schema inference on data being read, and throw an error if data being read is incompatible.
3237
fs (fsspec.AbstractFileSystem): fsspec FileSystem to use for reading data.
3338
By default, Daft will automatically construct a FileSystem instance internally.
39+
io_config (IOConfig): Config to be used with the native downloader
40+
use_native_downloader: Whether to use the native downloader instead of PyArrow for reading Parquet. This
41+
is currently experimental.
3442
3543
returns:
3644
DataFrame: parsed DataFrame
@@ -41,7 +49,10 @@ def read_parquet(
4149
plan = _get_tabular_files_scan(
4250
path,
4351
schema_hints,
44-
ParquetSourceInfo(),
52+
ParquetSourceInfo(
53+
io_config=io_config,
54+
use_native_downloader=use_native_downloader,
55+
),
4556
fs,
4657
)
4758
return DataFrame(plan)

daft/runners/runner_io.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ def sample_schema(
100100
return schema_inference.from_parquet(
101101
file=filepath,
102102
fs=fs,
103+
io_config=source_info.io_config,
104+
use_native_downloader=source_info.use_native_downloader,
103105
)
104106
else:
105107
raise NotImplementedError(f"Schema inference for {source_info} not implemented")

daft/table/schema_inference.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
if TYPE_CHECKING:
1818
import fsspec
1919

20+
from daft.io import IOConfig
21+
2022

2123
def from_csv(
2224
file: FileInput,
@@ -73,8 +75,14 @@ def from_json(
7375
def from_parquet(
7476
file: FileInput,
7577
fs: fsspec.AbstractFileSystem | None = None,
78+
io_config: IOConfig | None = None,
79+
use_native_downloader: bool = False,
7680
) -> Schema:
7781
"""Infers a Schema from a Parquet file"""
82+
if use_native_downloader:
83+
assert isinstance(file, (str, pathlib.Path))
84+
return Schema.from_parquet(str(file), io_config=io_config)
85+
7886
if not isinstance(file, (str, pathlib.Path)):
7987
# BytesIO path.
8088
f = file

daft/table/table.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,6 @@ def read_parquet_statistics(
364364
io_config: IOConfig | None = None,
365365
) -> Table:
366366
if not isinstance(paths, Series):
367-
paths = Series.from_pylist(paths)
368-
367+
paths = Series.from_pylist(paths, name="uris")
368+
assert paths.name() == "uris", f"Expected input series to have name 'uris', but found: {paths.name()}"
369369
return Table._from_pytable(_read_parquet_statistics(uris=paths._series, io_config=io_config))

0 commit comments

Comments
 (0)