Skip to content

Commit aad6c51

Browse files
Yicong-Huangzhengruifeng
authored andcommitted
[SPARK-54182][SQL][PYTHON] Optimize non-arrow conversion of df.toPandas
### What changes were proposed in this pull request? Following up with #52680, this PR optimizes the non-Arrow path of `toPandas()` to eliminate intermediate DataFrame creation. **Key optimizations:** 1. **Avoid intermediate DataFrame copy** - `pd.DataFrame.from_records(rows)` → Direct column extraction via `zip(*rows)` - 2 DataFrame creations → 1 DataFrame creation 2. **Optimize column-by-column conversion** (especially for wide tables) - Tuples → Lists for faster Series construction - Implicit dtype inference → Explicit `dtype=object` - `pd.concat(axis="columns")` + column rename → `pd.concat(axis=1, keys=columns)` - Result: **43-67% speedup for 50-100 columns** ### Why are the changes needed? **Problem:** Current flow creates DataFrame twice: - `rows` → `pd.DataFrame.from_records()` → temporary DataFrame → `pd.concat()` → final DataFrame The intermediate DataFrame is immediately discarded, wasting memory. This is especially inefficient for wide tables where column-by-column overhead is significant. ### Does this PR introduce _any_ user-facing change? No. This is a pure performance optimization with no API or behavior changes. ### How was this patch tested? - Existing unit tests. - Benchmark **Benchmark setup:** - **Hardware**: Driver memory 4GB, Executor memory 4GB - **Configuration**: `spark.sql.execution.arrow.pyspark.enabled=false` (testing non-Arrow path) - **Iterations**: 10 iterations per test case for statistical reliability - **Test cases**: - Simple (numeric columns) - Mixed (int, string, double, boolean) - Timestamp (date and timestamp types) - Nested (struct and array types) - Wide (5, 10, 50, 100 column counts) ### Performance Results **General Benchmark** (10 iterations): | Test Case | Rows | OLD → NEW | Speedup | |------------|------|-----------|---------| | simple | 1M | 1.376s → 1.383s | ≈ Tied | | mixed | 1M | 2.396s → 2.553s | 6% slower | | timestamp | 500K | 4.323s → 4.392s | ≈ Tied | | nested | 100K | 0.558s → 0.580s | 4% slower | | wide (50) | 100K | 1.458s → **1.141s** | **28% faster** 🚀 | **Column Width Benchmark** (100K rows, 10 iterations): | Columns | OLD → NEW | Speedup | |---------|-----------|---------| | 5 | 0.188s → 0.179s | 5% faster | | 10 | 0.262s → 0.270s | ≈ Tied | | 50 | 1.430s → **0.998s** | **43% faster** 🚀 | | 100 | 3.320s → **1.988s** | **67% faster** 🚀 | ### Was this patch authored or co-authored using generative AI tooling? Yes. Co-Generated-by Cursor Closes #52897 from Yicong-Huang/SPARK-54182/refactor/avoid-intermedia-df-in-non-arrow-toPandas. Authored-by: Yicong-Huang <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent 674ed48 commit aad6c51

File tree

1 file changed

+17
-12
lines changed

1 file changed

+17
-12
lines changed

python/pyspark/sql/pandas/conversion.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from typing import (
1919
Any,
2020
Callable,
21+
Iterator,
2122
List,
2223
Optional,
2324
Union,
@@ -208,18 +209,20 @@ def toPandas(self) -> "PandasDataFrameLike":
208209

209210
# Below is toPandas without Arrow optimization.
210211
rows = self.collect()
211-
if len(rows) > 0:
212-
pdf = pd.DataFrame.from_records(
213-
rows, index=range(len(rows)), columns=self.columns # type: ignore[arg-type]
214-
)
215-
else:
216-
pdf = pd.DataFrame(columns=self.columns)
217212

218-
if len(pdf.columns) > 0:
213+
if len(self.columns) > 0:
219214
timezone = sessionLocalTimeZone
220215
struct_in_pandas = pandasStructHandlingMode
221216

222-
return pd.concat(
217+
# Extract columns from rows and apply converters
218+
if len(rows) > 0:
219+
# Use iterator to avoid materializing intermediate data structure
220+
columns_data: Iterator[Any] = iter(zip(*rows))
221+
else:
222+
columns_data = iter([] for _ in self.schema.fields)
223+
224+
# Build DataFrame from columns
225+
pdf = pd.concat(
223226
[
224227
_create_converter_to_pandas(
225228
field.dataType,
@@ -230,13 +233,15 @@ def toPandas(self) -> "PandasDataFrameLike":
230233
),
231234
error_on_duplicated_field_names=False,
232235
timestamp_utc_localized=False,
233-
)(pser)
234-
for (_, pser), field in zip(pdf.items(), self.schema.fields)
236+
)(pd.Series(col_data, dtype=object))
237+
for col_data, field in zip(columns_data, self.schema.fields)
235238
],
236-
axis="columns",
239+
axis=1,
240+
keys=self.columns,
237241
)
238-
else:
239242
return pdf
243+
else:
244+
return pd.DataFrame(columns=[], index=range(len(rows)))
240245

241246
def toArrow(self) -> "pa.Table":
242247
from pyspark.sql.dataframe import DataFrame

0 commit comments

Comments
 (0)