diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index a4ccf4da6e8a..e90f1c305002 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -18,6 +18,7 @@ from typing import ( Any, Callable, + Iterator, List, Optional, Union, @@ -208,18 +209,20 @@ def toPandas(self) -> "PandasDataFrameLike": # Below is toPandas without Arrow optimization. rows = self.collect() - if len(rows) > 0: - pdf = pd.DataFrame.from_records( - rows, index=range(len(rows)), columns=self.columns # type: ignore[arg-type] - ) - else: - pdf = pd.DataFrame(columns=self.columns) - if len(pdf.columns) > 0: + if len(self.columns) > 0: timezone = sessionLocalTimeZone struct_in_pandas = pandasStructHandlingMode - return pd.concat( + # Extract columns from rows and apply converters + if len(rows) > 0: + # Use iterator to avoid materializing intermediate data structure + columns_data: Iterator[Any] = iter(zip(*rows)) + else: + columns_data = iter([] for _ in self.schema.fields) + + # Build DataFrame from columns + pdf = pd.concat( [ _create_converter_to_pandas( field.dataType, @@ -230,13 +233,15 @@ def toPandas(self) -> "PandasDataFrameLike": ), error_on_duplicated_field_names=False, timestamp_utc_localized=False, - )(pser) - for (_, pser), field in zip(pdf.items(), self.schema.fields) + )(pd.Series(col_data, dtype=object)) + for col_data, field in zip(columns_data, self.schema.fields) ], - axis="columns", + axis=1, + keys=self.columns, ) - else: return pdf + else: + return pd.DataFrame(columns=[], index=range(len(rows))) def toArrow(self) -> "pa.Table": from pyspark.sql.dataframe import DataFrame