kylebarron commented on code in PR #825:
URL: https://github.com/apache/datafusion-python/pull/825#discussion_r1722533590


##########
python/datafusion/dataframe.py:
##########
@@ -524,3 +524,19 @@ def unnest_columns(self, *columns: str, preserve_nulls: 
bool = True) -> DataFram
         """
         columns = [c for c in columns]
         return DataFrame(self.df.unnest_columns(columns, 
preserve_nulls=preserve_nulls))
+
+    def __arrow_c_stream__(self, requested_schema: pa.Schema) -> Any:
+        """Export an Arrow PyCapsule Stream.
+
+        This will execute and collect the DataFrame. We will attempt to 
respect the
+        requested schema, but only trivial transformations will be applied 
such as only
+        returning the fields listed in the requested schema if their data 
types match
+        those in the DataFrame.

Review Comment:
   It doesn't look like any transformations are currently applied?



##########
src/context.rs:
##########
@@ -471,18 +474,35 @@ impl PySessionContext {
         name: Option<&str>,
         py: Python,
     ) -> PyResult<PyDataFrame> {
-        // Instantiate pyarrow Table object & convert to batches
-        let table = data.call_method0("to_batches")?;
+        let mut batches = None;
+        let mut schema = None;
 
-        let schema = data.getattr("schema")?;
-        let schema = schema.extract::<PyArrowType<Schema>>()?;
+        if let Ok(stream_reader) = 
ArrowArrayStreamReader::from_pyarrow_bound(&data) {
+            // Works for any object that implements __arrow_c_stream__ in 
pycapsule.
+
+            schema = Some(stream_reader.schema().as_ref().to_owned());
+            batches = Some(stream_reader.filter_map(|v| v.ok()).collect());
+        } else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) {
+            // While this says RecordBatch, it will work for any object that 
implements
+            // __arrow_c_array__ in pycapsule.

Review Comment:
   ```suggestion
               // While this says RecordBatch, it will work for any object that 
implements
               // __arrow_c_array__ and returns a StructArray.
   ```
   
   as a technical matter, other types of arrays also implement 
`__arrow_c_array__`, but importing it to `RecordBatch` will fail on non-struct 
types.



##########
src/context.rs:
##########
@@ -471,18 +474,35 @@ impl PySessionContext {
         name: Option<&str>,
         py: Python,
     ) -> PyResult<PyDataFrame> {
-        // Instantiate pyarrow Table object & convert to batches
-        let table = data.call_method0("to_batches")?;
+        let mut batches = None;
+        let mut schema = None;
 
-        let schema = data.getattr("schema")?;
-        let schema = schema.extract::<PyArrowType<Schema>>()?;
+        if let Ok(stream_reader) = 
ArrowArrayStreamReader::from_pyarrow_bound(&data) {
+            // Works for any object that implements __arrow_c_stream__ in 
pycapsule.
+
+            schema = Some(stream_reader.schema().as_ref().to_owned());
+            batches = Some(stream_reader.filter_map(|v| v.ok()).collect());
+        } else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) {
+            // While this says RecordBatch, it will work for any object that 
implements
+            // __arrow_c_array__ in pycapsule.
+
+            schema = Some(array.schema().as_ref().to_owned());
+            batches = Some(vec![array]);
+        }
+
+        if batches.is_none() || schema.is_none() {
+            return Err(PyTypeError::new_err(
+                "Expected either a Arrow Array or Arrow Stream in 
from_arrow_table().",
+            ));
+        }
+
+        let batches = batches.unwrap();
+        let schema = schema.unwrap();

Review Comment:
   I'd personally opt for a different control flow that I find slightly 
clearer, but just a nit:
   ```suggestion
           let (schema, batches) = if let Ok(stream_reader) = 
ArrowArrayStreamReader::from_pyarrow_bound(&data) {
               // Works for any object that implements __arrow_c_stream__ in 
pycapsule.
   
               schema = Some(stream_reader.schema().as_ref().to_owned());
               batches = Some(stream_reader.filter_map(|v| v.ok()).collect());
               (schema, batches)
           } else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) {
               // While this says RecordBatch, it will work for any object that 
implements
               // __arrow_c_array__ in pycapsule.
   
               schema = Some(array.schema().as_ref().to_owned());
               batches = Some(vec![array]);
               (schema, batches)
           } else {
               return Err(PyTypeError::new_err(
                   "Expected either a Arrow Array or Arrow Stream in 
from_arrow_table().",
               ));        
           }
   ```



##########
src/context.rs:
##########
@@ -471,18 +474,35 @@ impl PySessionContext {
         name: Option<&str>,
         py: Python,
     ) -> PyResult<PyDataFrame> {
-        // Instantiate pyarrow Table object & convert to batches
-        let table = data.call_method0("to_batches")?;
+        let mut batches = None;
+        let mut schema = None;
 
-        let schema = data.getattr("schema")?;
-        let schema = schema.extract::<PyArrowType<Schema>>()?;
+        if let Ok(stream_reader) = 
ArrowArrayStreamReader::from_pyarrow_bound(&data) {
+            // Works for any object that implements __arrow_c_stream__ in 
pycapsule.
+
+            schema = Some(stream_reader.schema().as_ref().to_owned());
+            batches = Some(stream_reader.filter_map(|v| v.ok()).collect());

Review Comment:
   I don't think we should ignore errors here. I'd suggest returning an error 
if one of these batches errored.



##########
src/dataframe.rs:
##########
@@ -451,6 +454,26 @@ impl PyDataFrame {
         Ok(table)
     }
 
+    #[allow(unused_variables)]
+    fn __arrow_c_stream__<'py>(
+        &'py mut self,
+        py: Python<'py>,
+        requested_schema: Option<Bound<'py, PyCapsule>>,
+    ) -> PyResult<Bound<'py, PyCapsule>> {
+        let batches = wait_for_future(py, self.df.as_ref().clone().collect())?
+            .into_iter()
+            .map(|r| Ok(r));

Review Comment:
   It would be cool if we didn't have to collect all the batches ahead of time, 
but could synchronously collect one batch at a time within the iterator. I have 
no idea if datafusion would support that though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to