timsaucer commented on code in PR #1544:
URL: 
https://github.com/apache/datafusion-python/pull/1544#discussion_r3260578573


##########
crates/core/src/codec.rs:
##########
@@ -284,3 +365,282 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
         self.inner.try_decode_udwf(name, buf)
     }
 }
+
+// 
=============================================================================
+// Shared Python scalar UDF encode / decode helpers
+//
+// Both `PythonLogicalCodec` and `PythonPhysicalCodec` consult these on
+// every `try_encode_udf` / `try_decode_udf` call. Same wire format on
+// both layers — a Python `ScalarUDF` referenced inside a `LogicalPlan`
+// or an `ExecutionPlan` round-trips identically.
+// 
=============================================================================
+
+/// Encode a Python scalar UDF inline if `node` is one. Returns
+/// `Ok(true)` when the payload (`DFPYUDF` family prefix, version byte,
+/// cloudpickled tuple) was written and the caller should skip its
+/// inner codec. Returns `Ok(false)` for any non-Python UDF, signalling
+/// the caller to delegate to its `inner`.
+pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut 
Vec<u8>) -> Result<bool> {
+    let Some(py_udf) = node
+        .inner()
+        .as_any()
+        .downcast_ref::<PythonFunctionScalarUDF>()
+    else {
+        return Ok(false);
+    };
+
+    Python::attach(|py| -> Result<bool> {
+        let bytes = encode_python_scalar_udf(py, py_udf)
+            .map_err(|e| 
datafusion::error::DataFusionError::External(Box::new(e)))?;
+        write_wire_header(buf, PY_SCALAR_UDF_FAMILY);
+        buf.extend_from_slice(&bytes);
+        Ok(true)
+    })
+}
+
+/// Decode an inline Python scalar UDF payload. Returns `Ok(None)`
+/// when `buf` does not carry the `DFPYUDF` family prefix, signalling
+/// the caller to delegate to its `inner` codec (and eventually the
+/// `FunctionRegistry`).
+pub(crate) fn try_decode_python_scalar_udf(buf: &[u8]) -> 
Result<Option<Arc<ScalarUDF>>> {
+    let Some(payload) = strip_wire_header(buf, PY_SCALAR_UDF_FAMILY, "scalar 
UDF")? else {
+        return Ok(None);
+    };
+
+    Python::attach(|py| -> Result<Option<Arc<ScalarUDF>>> {
+        let udf = decode_python_scalar_udf(py, payload)
+            .map_err(|e| 
datafusion::error::DataFusionError::External(Box::new(e)))?;
+        Ok(Some(Arc::new(ScalarUDF::new_from_impl(udf))))
+    })
+}
+
+/// Build the cloudpickle payload for a `PythonFunctionScalarUDF`.

Review Comment:
   Good points! I updated docstring and also added in version checking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to