kylebarron commented on code in PR #825:
URL: https://github.com/apache/datafusion-python/pull/825#discussion_r1730041170


##########
src/context.rs:
##########
@@ -471,18 +474,31 @@ impl PySessionContext {
         name: Option<&str>,
         py: Python,
     ) -> PyResult<PyDataFrame> {
-        // Instantiate pyarrow Table object & convert to batches
-        let table = data.call_method0("to_batches")?;
+        let (schema, batches) =
+            if let Ok(stream_reader) = 
ArrowArrayStreamReader::from_pyarrow_bound(&data) {
+                // Works for any object that implements __arrow_c_stream__ in 
pycapsule.
+
+                let schema = stream_reader.schema().as_ref().to_owned();
+                let batches = stream_reader
+                    .collect::<std::result::Result<Vec<RecordBatch>, 
arrow::error::ArrowError>>()
+                    .map_err(DataFusionError::from)?;
+
+                (schema, batches)
+            } else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) {
+                // While this says RecordBatch, it will work for any object 
that implements
+                // __arrow_c_array__ and returns a StructArray.
+
+                (array.schema().as_ref().to_owned(), vec![array])

Review Comment:
   Similarly, you can just use `.clone()` instead of copying from the ref
   ```suggestion
                   (array.schema().clone(), vec![array])
   ```



##########
src/context.rs:
##########
@@ -471,18 +474,31 @@ impl PySessionContext {
         name: Option<&str>,
         py: Python,
     ) -> PyResult<PyDataFrame> {
-        // Instantiate pyarrow Table object & convert to batches
-        let table = data.call_method0("to_batches")?;
+        let (schema, batches) =
+            if let Ok(stream_reader) = 
ArrowArrayStreamReader::from_pyarrow_bound(&data) {
+                // Works for any object that implements __arrow_c_stream__ in 
pycapsule.
+
+                let schema = stream_reader.schema().as_ref().to_owned();
+                let batches = stream_reader
+                    .collect::<std::result::Result<Vec<RecordBatch>, 
arrow::error::ArrowError>>()

Review Comment:
   Are you intentionally using the long form here? IMO it's easier to read as 
   ```suggestion
                       .collect::<Result<Vec<_>, ArrowError>()
   ```



##########
src/dataframe.rs:
##########
@@ -451,6 +458,40 @@ impl PyDataFrame {
         Ok(table)
     }
 
+    fn __arrow_c_stream__<'py>(
+        &'py mut self,
+        py: Python<'py>,
+        requested_schema: Option<Bound<'py, PyCapsule>>,
+    ) -> PyResult<Bound<'py, PyCapsule>> {
+        let mut batches = wait_for_future(py, 
self.df.as_ref().clone().collect())?;
+        let mut schema: Schema = self.df.schema().to_owned().into();
+
+        if let Some(schema_capsule) = requested_schema {
+            validate_pycapsule(&schema_capsule, "arrow_schema")?;
+
+            let schema_ptr = unsafe { 
schema_capsule.reference::<FFI_ArrowSchema>() };
+            let desired_schema = 
Schema::try_from(schema_ptr).map_err(DataFusionError::from)?;

Review Comment:
   Ah right, `Schema::from_pyarrow_bound` accepts an _object_ with 
`__arrow_c_schema__`, not the raw pointer itself. This is why in pyo3-arrow I 
expose helper functions to work with the raw capsules.



##########
src/context.rs:
##########
@@ -471,18 +474,31 @@ impl PySessionContext {
         name: Option<&str>,
         py: Python,
     ) -> PyResult<PyDataFrame> {
-        // Instantiate pyarrow Table object & convert to batches
-        let table = data.call_method0("to_batches")?;
+        let (schema, batches) =
+            if let Ok(stream_reader) = 
ArrowArrayStreamReader::from_pyarrow_bound(&data) {
+                // Works for any object that implements __arrow_c_stream__ in 
pycapsule.
+
+                let schema = stream_reader.schema().as_ref().to_owned();

Review Comment:
   It doesn't seem that you use a `Schema` instead of a `SchemaRef`; why not 
just use
   ```suggestion
                   let schema = stream_reader.schema().clone();
   ```



##########
src/dataframe.rs:
##########
@@ -451,6 +458,40 @@ impl PyDataFrame {
         Ok(table)
     }
 
+    fn __arrow_c_stream__<'py>(
+        &'py mut self,
+        py: Python<'py>,
+        requested_schema: Option<Bound<'py, PyCapsule>>,
+    ) -> PyResult<Bound<'py, PyCapsule>> {
+        let mut batches = wait_for_future(py, 
self.df.as_ref().clone().collect())?;
+        let mut schema: Schema = self.df.schema().to_owned().into();
+
+        if let Some(schema_capsule) = requested_schema {
+            validate_pycapsule(&schema_capsule, "arrow_schema")?;
+
+            let schema_ptr = unsafe { 
schema_capsule.reference::<FFI_ArrowSchema>() };
+            let desired_schema = 
Schema::try_from(schema_ptr).map_err(DataFusionError::from)?;
+
+            schema = project_schema(schema, desired_schema)
+                .map_err(|e| DataFusionError::ArrowError(e))?;
+
+            batches = batches
+                .into_iter()
+                .map(|record_batch| record_batch_into_schema(record_batch, 
&schema))
+                .collect::<Result<Vec<RecordBatch>, ArrowError>>()
+                .map_err(|e| DataFusionError::ArrowError(e))?;

Review Comment:
   I suppose this is ok because you already have collected the full query into 
memory. In my export in arro3/pyo3-arrow I do the casting on the fly in the 
iterator to avoid the memory overhead of doing casting and materializing the 
full export. See 
https://github.com/kylebarron/arro3/blob/a7f4a7e946390ed5a8ac480d039cdfa7dcc84fab/pyo3-arrow/src/ffi/to_python/utils.rs#L91-L100



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to