ntjohnson1 commented on code in PR #1546:
URL: 
https://github.com/apache/datafusion-python/pull/1546#discussion_r3293043594


##########
crates/core/src/codec.rs:
##########
@@ -232,16 +232,39 @@ fn strip_wire_header<'a>(
 #[derive(Debug)]
 pub struct PythonLogicalCodec {
     inner: Arc<dyn LogicalExtensionCodec>,
+    python_udf_inlining: bool,

Review Comment:
   NIT: throughout this UDF seems to be used in two contexts. UDF as short hand 
for scalar UDF and UDF as short hand for the broader set of all flavors of user 
functions udf,udaf,udwf. Everytime I've been reviewing these I kind of forget 
that and have to revisit things to make sure I'm doing the right mapping in my 
head. Maybe that's expected and standard. No change request more of just an FYI 
in case there are thoughts on how to resolve this overloading for potentially 
making it easier to bring on future people.



##########
python/datafusion/ipc.py:
##########
@@ -53,6 +53,36 @@ def init_worker():
    payloads are not portable across Python minor versions. See
    :meth:`datafusion.Expr.to_bytes` for examples of what travels by
    value vs. by reference.
+
+On the driver side, call :func:`set_sender_ctx` to control how
+:func:`pickle.dumps` encodes expressions — for example, to apply
+:meth:`SessionContext.with_python_udf_inlining` to every pickled
+expression on this thread:
+
+.. code-block:: python

Review Comment:
   NIT: I don't think doc test hits code-block, I think it needs the >>> prefix 
and since this looks standalone executable might be nice to make it in that 
format



##########
crates/core/src/codec.rs:
##########
@@ -301,48 +324,104 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
     }
 
     fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> 
Result<()> {
-        if try_encode_python_scalar_udf(node, buf)? {
+        if self.python_udf_inlining && try_encode_python_scalar_udf(node, 
buf)? {
             return Ok(());
         }
         self.inner.try_encode_udf(node, buf)
     }
 
     fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> 
{
-        if let Some(udf) = try_decode_python_scalar_udf(buf)? {
-            return Ok(udf);
+        if self.python_udf_inlining {
+            if let Some(udf) = try_decode_python_scalar_udf(buf)? {
+                return Ok(udf);
+            }
+        } else {
+            refuse_if_inline(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF", name)?;
         }
         self.inner.try_decode_udf(name, buf)
     }
 
     fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> 
Result<()> {
-        if try_encode_python_udaf(node, buf)? {
+        if self.python_udf_inlining && try_encode_python_udaf(node, buf)? {
             return Ok(());
         }
         self.inner.try_encode_udaf(node, buf)
     }
 
     fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> 
Result<Arc<AggregateUDF>> {
-        if let Some(udaf) = try_decode_python_udaf(buf)? {
-            return Ok(udaf);
+        if self.python_udf_inlining {
+            if let Some(udaf) = try_decode_python_udaf(buf)? {
+                return Ok(udaf);
+            }
+        } else {
+            refuse_if_inline(buf, PY_AGG_UDF_FAMILY, "aggregate UDF", name)?;
         }
         self.inner.try_decode_udaf(name, buf)
     }
 
     fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> 
Result<()> {
-        if try_encode_python_udwf(node, buf)? {
+        if self.python_udf_inlining && try_encode_python_udwf(node, buf)? {
             return Ok(());
         }
         self.inner.try_encode_udwf(node, buf)
     }
 
     fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> 
Result<Arc<WindowUDF>> {
-        if let Some(udwf) = try_decode_python_udwf(buf)? {
-            return Ok(udwf);
+        if self.python_udf_inlining {
+            if let Some(udwf) = try_decode_python_udwf(buf)? {
+                return Ok(udwf);
+            }
+        } else {
+            refuse_if_inline(buf, PY_WINDOW_UDF_FAMILY, "window UDF", name)?;
         }
         self.inner.try_decode_udwf(name, buf)
     }
 }
 
+/// Strict-mode gate: if `buf` is a well-framed inline payload for
+/// `family`, return the strict-refusal error; otherwise return
+/// `Ok(())` so the caller can delegate to its `inner` codec.
+///
+/// Routing through [`read_framed_payload`] (rather than a bare
+/// `starts_with` probe) means malformed inline bytes — wrong
+/// wire-format version, mismatched Python version, truncated header —
+/// surface *their* diagnostic instead of the strict-mode message.
+/// The strict message implies sender intent ("inlining is disabled"),
+/// so it should fire only when the bytes really would have decoded.
+///
+/// Fast path: short-circuit on the family-magic prefix before
+/// acquiring the GIL. Plans with many non-Python UDFs would otherwise
+/// pay a GIL acquisition per decode call just to confirm "not a
+/// Python UDF". `read_framed_payload` itself rejects buffers that
+/// don't start with `family`, so this is purely an optimization.
+fn refuse_if_inline(buf: &[u8], family: &[u8], kind: &str, name: &str) -> 
Result<()> {
+    if !buf.starts_with(family) {
+        return Ok(());
+    }
+    Python::attach(|py| match read_framed_payload(py, buf, family, kind)? {
+        Some(_) => Err(refuse_inline_payload(kind, name)),
+        None => Ok(()),
+    })
+}
+
+/// Build the error returned by a strict codec when it receives an
+/// inline Python-UDF payload it has been told not to deserialize.
+fn refuse_inline_payload(kind: &str, name: &str) -> 
datafusion::error::DataFusionError {
+    // `Execution`, not `Plan`: this is a wire-format decode refusal at
+    // codec time, not a planner-stage failure. Downstream error
+    // classification keys off the variant — surfacing this as a planner
+    // error would mis-route it into "fix your SQL" buckets.
+    datafusion::error::DataFusionError::Execution(format!(

Review Comment:
   It would be nice if there was a page on this so we could include a url with 
even more context. I think your descriptions in the previous two prs are good 
but I suspect someone will stumble upon this with very little context as a 
general user and thinking about how to mitigate that. Can be wrapped in next PR 
or even a follow on for further nits/clarifiaction.



##########
python/datafusion/context.py:
##########
@@ -1769,3 +1769,48 @@ def with_physical_extension_codec(self, codec: Any) -> 
SessionContext:
         new = SessionContext.__new__(SessionContext)
         new.ctx = new_internal
         return new
+
+    def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext:
+        """Control whether Python UDFs are embedded in serialized expressions.
+
+        When ``enabled=True`` (the default), serialized expressions carry
+        the Python code for any scalar, aggregate, or window UDFs they
+        reference. The receiver rebuilds the UDFs from those bytes and
+        does not need to register them first.
+
+        When ``enabled=False``, serialized expressions store only the
+        UDF names. This has two uses:
+
+        * **Cross-language portability.** The bytes can be decoded by a
+          non-Python receiver, which must already have UDFs registered
+          under matching names.
+        * **Safer deserialization.** :meth:`Expr.from_bytes` will refuse
+          to rebuild Python UDFs rather than call ``cloudpickle.loads``
+          on untrusted input.
+
+        The setting affects :meth:`Expr.to_bytes` and
+        :meth:`Expr.from_bytes` whenever this session is passed as the
+        ``ctx`` argument. :func:`pickle.dumps` and :func:`pickle.loads`
+        do not pass a context, so to apply the setting through pickle,
+        register this session with
+        :func:`datafusion.ipc.set_sender_ctx` on the sender and
+        :func:`datafusion.ipc.set_worker_ctx` on the receiver.
+
+        .. warning:: Security
+            This setting narrows only :meth:`Expr.from_bytes`. Calling
+            :func:`pickle.loads` on untrusted bytes remains unsafe
+            regardless of the toggle.
+
+        Returns a new :class:`SessionContext` with the toggle applied;
+        the original session is unchanged.
+
+        Examples:
+            >>> from datafusion import SessionContext
+            >>> strict = 
SessionContext().with_python_udf_inlining(enabled=False)
+            >>> isinstance(strict, SessionContext)
+            True

Review Comment:
   This doesn't really demonstrate this functionality



##########
python/datafusion/ipc.py:
##########
@@ -53,6 +53,36 @@ def init_worker():
    payloads are not portable across Python minor versions. See
    :meth:`datafusion.Expr.to_bytes` for examples of what travels by
    value vs. by reference.
+
+On the driver side, call :func:`set_sender_ctx` to control how
+:func:`pickle.dumps` encodes expressions — for example, to apply
+:meth:`SessionContext.with_python_udf_inlining` to every pickled
+expression on this thread:
+
+.. code-block:: python
+
+    from datafusion import SessionContext
+    from datafusion.ipc import clear_sender_ctx, set_sender_ctx
+
+    driver_ctx = SessionContext().with_python_udf_inlining(enabled=False)
+    set_sender_ctx(driver_ctx)
+    try:
+        pickle.dumps(expr)  # encoded with inlining disabled
+    finally:
+        clear_sender_ctx()
+
+Without a sender context the default codec is used (Python UDF
+inlining on). The sender context only affects pickle / ``to_bytes``
+encoding; explicit ``expr.to_bytes(ctx)`` calls still use the supplied
+``ctx``.
+
+The thread-local holds a strong reference to the installed

Review Comment:
   I typically think of thread-local as an adjective. So I think you mean the 
`thread-local sender context` which helps differentiate from the `_local.ctx`



##########
python/datafusion/ipc.py:
##########
@@ -125,6 +158,67 @@ def get_worker_ctx() -> SessionContext | None:
     return getattr(_local, "ctx", None)
 
 
+def set_sender_ctx(ctx: SessionContext) -> None:
+    """Install this driver's :class:`SessionContext` for outbound pickles.
+
+    Controls how :func:`pickle.dumps` encodes :class:`Expr` instances on
+    this thread. The most useful application is propagating a session
+    configured with
+    :meth:`SessionContext.with_python_udf_inlining` so the toggle takes
+    effect through pickle (which otherwise calls
+    :meth:`Expr.to_bytes` with no context and uses the default codec).
+
+    Idempotent: overwrites any previous value. Stored in a thread-local
+    slot, so worker threads on the driver may install their own contexts.
+    Does not affect :meth:`Expr.to_bytes` calls that pass an explicit
+    ``ctx`` — those continue to use the supplied context.
+
+    Examples:
+        >>> from datafusion import SessionContext
+        >>> from datafusion.ipc import (
+        ...     set_sender_ctx, get_sender_ctx, clear_sender_ctx,
+        ... )
+        >>> driver = SessionContext().with_python_udf_inlining(enabled=False)
+        >>> set_sender_ctx(driver)
+        >>> get_sender_ctx() is driver
+        True
+        >>> clear_sender_ctx()

Review Comment:
   Do skipping this step cause other doc tests to fail and this is basically 
cleanup? If not looks unneeded for this doc string



##########
python/datafusion/context.py:
##########
@@ -1769,3 +1769,48 @@ def with_physical_extension_codec(self, codec: Any) -> 
SessionContext:
         new = SessionContext.__new__(SessionContext)
         new.ctx = new_internal
         return new
+
+    def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext:
+        """Control whether Python UDFs are embedded in serialized expressions.
+
+        When ``enabled=True`` (the default), serialized expressions carry

Review Comment:
   Enabled isn't an optional argument with a default to true here. So either 
this is misaligned or it's saying that the class has things enabled by default 
which isn't clear in the docstring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to