[WIP][SPARK-56661] Introducing logical and physical planning nodes for language-agnostic Spark UDFs#55768
Conversation
5a35dee to
df7bde7
Compare
| * Creates a [[WorkerSession]] via [[SparkEnv#getExternalUDFDispatcher]] | ||
| * and registers cancellation on task failure. The provided function | ||
| * receives the session and must return the result iterator. Moreover, | ||
| * the function MUST close the session once all input data has been sent. |
There was a problem hiding this comment.
"all input data have been sent"
what does this mean , do you try to say all udf results have been consumed?
There was a problem hiding this comment.
No, we should call close once all the input rows have been sent to the UDF. This is the signal that no more input is to be expected, and the UDF can finish processing after it has consumed all of this data. This is aligned with what we discussed offline earlier today.
I changed the comment slightly to make this point clearer. Could you have a look at this new comment?
| val session = dispatcher.createSession(securityScope) | ||
|
|
||
| // Make sure to cancel the session, if the task fails | ||
| taskContext.addTaskFailureListener { (_, _) => |
There was a problem hiding this comment.
we may need to add another completion listener as well to call session.close()
The reason is that, spark doesn't have to consume the whole result iterator, e.g., in case of 'limit'. So if we rely on the iterator's last element being consumed, then we may miss the close.
There was a problem hiding this comment.
This seems unused, either
- not introducing this class in this PR
- use it in
MapPartitionsExternalUDFExecbut give a f that throws unimplemented error.
There was a problem hiding this comment.
We may need to add another completion listener as well to call session.close()
As discussed offline, this is actually not needed. In case of early termination of the task (e.g., through a limit), we cancel the execution instead. The close() call on the session should be done by the user of this function when all input has been sent to the UDF.
This seems unused, either
Actually, ExternalUDFExec is used as the parent class of MapPartitionsExternalUDFExec. However, I agree with your point that we could make the future use much clearer by calling withUDFWorkerSession in doExecute of MapPartitionsExternalUDFExec. I changed the PR to do exactly this and then throw the NotImplementedError when we have received the session.
| DirectUnixSocketWorkerDispatcher, DirectWorkerProcess, | ||
| DirectWorkerSession} | ||
|
|
||
| /** |
There was a problem hiding this comment.
Any chance we can reuse the testing dispatcher defined in https://github.com/apache/spark/blob/master/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/DirectWorkerDispatcherSuite.scala (can be updated if necessary)? As that is supposed to be agnostic to a worker spec.
So we can reduce some duplication and in case of API changes, we need to only update one place.
There was a problem hiding this comment.
Yes, good idea! I moved the TestDispatcher into a test-only shared file that can be reused here. There are still some parts of the implementation that remain in this suite, as this test relies on an actual socket connection, and the test in /udf/ only checks for file existence. It would be weird to move the logic from this test into /udf/ as well, as this logic is not consumed in the /udf package.
…-agnostic MapPartition Spark UDFs
df7bde7 to
2605573
Compare
| * Dispatcher factory to generate UDF worker dispatchers | ||
| * using the new UDF framework proposed in SPARK-55278 | ||
| */ | ||
| private val udfDispatcherManager: UDFDispatcherManager = |
There was a problem hiding this comment.
Do we need to create this on the driver as well? In general the patten in SparkEnv is that we initialize variables.
| */ | ||
| @Experimental | ||
| case class ExternalUserDefinedFunction( | ||
| name: String, |
There was a problem hiding this comment.
You can merge this with the ExternalUDF source file
What changes were proposed in this pull request?
This PR introduces new logical and physical Catalyst nodes for language-agnostic User Defined Functions (UDF) as part of SPIP SPARK-55278, which proposes language-agnostic UDFs.
As a first step towards the goal of language-agnostic UDFs, we want to target mapPartition UDFs like
pyspark.sql.DataFrame.mapInArrow,pyspark.RDD.mapPartitions, orpyspark.sql.DataFrame.mapInArrow. The overarching goal is to deprecate the current, language-specific Catalyst nodes (likemapInArrow). However, for now, the new nodes will exist in addition to the old ones until the new framework has reach maturity.In summary, this PR introduces:
ExternalUDFExpression, which captures language-agnostic UDF properties (payload, name, etc.)ExternalUDF, which serves as a base class for all language-agnostic UDF nodesMapPartitionExternalUDF, which is the new, language-agnostic map partition nodeWorkerDispatcherManager- A manager class which manages UDF Dispatchers based on the targetUDFWorkerSpecificationNone of the changes introduced above are currently consumed in Spark.
Why are the changes needed?
This is the first step toward language-agnostic UDF execution for Spark. Existing physical and logical planning nodes need to be replaced eventually to achieve this goal as they make language-specific assumptions.
Does this PR introduce any user-facing change?
No
How was this patch tested?
New unit-tests were added.
Was this patch authored or co-authored using generative AI tooling?
Partially. However, the code was manually reviewed and adjusted.