Skip to content

[SPARK-56757][PYTHON] Refactor scalar iterator Pandas UDF worker path#55755

Open
201573 wants to merge 1 commit intoapache:masterfrom
201573:codex/56757-scalar-pandas-iter-refactor
Open

[SPARK-56757][PYTHON] Refactor scalar iterator Pandas UDF worker path#55755
201573 wants to merge 1 commit intoapache:masterfrom
201573:codex/56757-scalar-pandas-iter-refactor

Conversation

@201573
Copy link
Copy Markdown

@201573 201573 commented May 8, 2026

What changes were proposed in this pull request?

This PR refactors SQL_SCALAR_PANDAS_ITER_UDF so that the scalar iterator Pandas UDF logic is handled inside read_udfs().

The worker now uses ArrowStreamSerializer for the scalar iterator Pandas UDF I/O path, converts input Arrow batches to pandas arguments in read_udfs(), validates the iterator and element return types there, and converts the pandas outputs back to Arrow record batches.

This keeps the existing row-count checks and return validation while making the serializer a pure Arrow I/O layer for this eval type.

Why are the changes needed?

This is part of SPARK-55388. The scalar iterator Pandas UDF path should be self-contained in read_udfs() instead of relying on ArrowStreamPandasUDFSerializer to perform UDF-specific logic.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added a test for scalar iterator Pandas UDFs with struct input and struct output.

Verified locally:

  • SPARK_HOME=/tmp/spark-56757-wt PYTHON_EXECUTABLE=/tmp/spark-venv-56757/bin/python ./python/run-tests --python-executables /tmp/spark-venv-56757/bin/python --testnames 'pyspark.sql.tests.pandas.test_pandas_udf_scalar ScalarPandasUDFTests.test_scalar_iter_udf_struct_input_and_output' -p 1
  • SPARK_HOME=/tmp/spark-56757-wt PYTHON_EXECUTABLE=/tmp/spark-venv-56757/bin/python ./python/run-tests --python-executables /tmp/spark-venv-56757/bin/python --testnames 'pyspark.sql.tests.pandas.test_pandas_udf_scalar ScalarPandasUDFTests.test_vectorized_udf_invalid_length,pyspark.sql.tests.pandas.test_pandas_udf_scalar ScalarPandasUDFTests.test_vectorized_udf_chained_struct_type,pyspark.sql.tests.pandas.test_pandas_udf_scalar ScalarPandasUDFTests.test_scalar_iter_pandas_udf_with_single_output_batch' -p 1
  • PYTHON_EXECUTABLE=/tmp/spark-venv-56757/bin/python ./dev/lint-python --compile
  • /tmp/spark-venv-56757/bin/ruff format --check python/pyspark/worker.py python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
  • /tmp/spark-venv-56757/bin/ruff check python/pyspark/worker.py python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
  • git diff --check

Was this patch authored or co-authored using generative AI tooling?

Generated-by: OpenAI Codex (GPT-5)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant