Skip to content

Commit c5868b1

Browse files
authored
[I/O] Change back to random access read for Parquet. (#953)
This PR reverts back to random access reads for Parquet, since streaming reads introduced performance regressions and automatically choosing a good buffer size may be difficult to do at the Daft layer.
1 parent 942ad48 commit c5868b1

File tree

1 file changed

+18
-22
lines changed

1 file changed

+18
-22
lines changed

daft/table/table_io.py

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,6 @@
2626
FileInput = Union[pathlib.Path, str, IO[bytes]]
2727

2828

29-
# The number of rows to read per batch. This is sized to generate 10MiB batches
30-
# for rows about 1KiB in size.
31-
_PARQUET_FRAGMENT_BATCH_SIZE = 100000
32-
33-
3429
@contextlib.contextmanager
3530
def _open_stream(
3631
file: FileInput,
@@ -99,28 +94,29 @@ def read_parquet(
9994
paths, fs = _resolve_paths_and_filesystem(file, fs)
10095
assert len(paths) == 1
10196
path = paths[0]
102-
fragment = pads.ParquetFileFormat().make_fragment(path, filesystem=fs)
103-
schema = fragment.metadata.schema.to_arrow_schema()
97+
f = fs.open_input_file(path)
98+
pqf = papq.ParquetFile(f)
10499
# If no rows required, we manually construct an empty table with the right schema
105100
if read_options.num_rows == 0:
106-
table = pa.Table.from_arrays([pa.array([], type=field.type) for field in schema], schema=schema)
101+
arrow_schema = pqf.metadata.schema.to_arrow_schema()
102+
table = pa.Table.from_arrays([pa.array([], type=field.type) for field in arrow_schema], schema=arrow_schema)
107103
elif read_options.num_rows is not None:
108-
# Read the file by row group.
109-
frags = fragment.split_by_row_group()
110-
tables = []
111-
rows_read = 0
112-
for frag in frags:
113-
for batch in frag.to_batches(columns=read_options.column_names, batch_size=_PARQUET_FRAGMENT_BATCH_SIZE):
114-
tables.append(pa.Table.from_batches([batch], schema=schema))
115-
rows_read += len(batch)
116-
if rows_read >= read_options.num_rows:
117-
break
118-
if rows_read >= read_options.num_rows:
104+
# Only read the required row groups.
105+
rows_needed = read_options.num_rows
106+
for i in range(pqf.metadata.num_row_groups):
107+
row_group_meta = pqf.metadata.row_group(i)
108+
rows_needed -= row_group_meta.num_rows
109+
if rows_needed <= 0:
119110
break
120-
table = pa.concat_tables(tables)
121-
table = table.slice(length=read_options.num_rows)
111+
table = pqf.read_row_groups(list(range(i + 1)), columns=read_options.column_names)
112+
if rows_needed < 0:
113+
# Need to truncate the table to the row limit.
114+
table = table.slice(length=read_options.num_rows)
122115
else:
123-
table = fragment.to_table(columns=read_options.column_names)
116+
table = papq.read_table(
117+
f,
118+
columns=read_options.column_names,
119+
)
124120

125121
return Table.from_arrow(table)
126122

0 commit comments

Comments
 (0)