alamb commented on code in PR #60:
URL: https://github.com/apache/datafusion-ray/pull/60#discussion_r1955311978


##########
src/util.rs:
##########
@@ -0,0 +1,457 @@
+use std::collections::HashMap;
+use std::fmt::Display;
+use std::future::Future;
+use std::io::Cursor;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::time::Duration;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError;
+use arrow::ipc::convert::fb_to_schema;
+use arrow::ipc::reader::StreamReader;
+use arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
+use arrow::ipc::{root_as_message, MetadataVersion};
+use arrow::pyarrow::*;
+use arrow::util::pretty;
+use arrow_flight::{FlightClient, FlightData, Ticket};
+use async_stream::stream;
+use datafusion::common::internal_datafusion_err;
+use datafusion::common::tree_node::{Transformed, TreeNode};
+use datafusion::datasource::physical_plan::ParquetExec;
+use datafusion::error::DataFusionError;
+use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, 
SessionStateBuilder};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{displayable, ExecutionPlan, 
ExecutionPlanProperties};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_proto::physical_plan::AsExecutionPlan;
+use futures::{Stream, StreamExt};
+use parking_lot::Mutex;
+use pyo3::prelude::*;
+use pyo3::types::{PyBytes, PyList};
+use tonic::transport::Channel;
+
+use crate::codec::RayCodec;
+use crate::protobuf::FlightTicketData;
+use crate::ray_stage_reader::RayStageReaderExec;
+use crate::stage_service::ServiceClients;
+use prost::Message;
+use tokio::macros::support::thread_rng_n;
+
+pub(crate) trait ResultExt<T> {
+    fn to_py_err(self) -> PyResult<T>;
+}
+
+impl<T, E> ResultExt<T> for Result<T, E>
+where
+    E: std::fmt::Debug,
+{
+    fn to_py_err(self) -> PyResult<T> {
+        match self {
+            Ok(x) => Ok(x),
+            Err(e) => Err(PyErr::new::<pyo3::exceptions::PyException, 
_>(format!(
+                "{:?}",
+                e
+            ))),
+        }
+    }
+}
+
+/// we need these two functions to go back and forth between IPC 
representations
+/// from rust to rust to avoid using the C++ implementation from pyarrow as it
+/// will generate unaligned data causing us errors
+///
+/// not used in current arrow flight implementation, but leaving these here
+#[pyfunction]
+pub fn batch_to_ipc(py: Python, batch: PyArrowType<RecordBatch>) -> 
PyResult<Py<PyBytes>> {
+    let batch = batch.0;
+
+    let bytes = batch_to_ipc_helper(&batch).to_py_err()?;
+
+    //TODO:  unsure about this next line.  Compiler is happy but is this 
correct?
+    Ok(PyBytes::new_bound(py, &bytes).unbind())
+}
+
+#[pyfunction]
+pub fn ipc_to_batch(bytes: &[u8], py: Python) -> PyResult<PyObject> {
+    let batch = ipc_to_batch_helper(bytes).to_py_err()?;
+    batch.to_pyarrow(py)
+}
+
+fn batch_to_ipc_helper(batch: &RecordBatch) -> Result<Vec<u8>, ArrowError> {
+    let schema = batch.schema();
+    let buffer: Vec<u8> = Vec::new();
+    let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)
+        .map_err(|e| internal_datafusion_err!("Cannot create ipcwriteoptions 
{e}"))?;
+
+    let mut stream_writer = StreamWriter::try_new_with_options(buffer, 
&schema, options)?;
+    stream_writer.write(batch)?;
+    stream_writer.into_inner()
+}
+
+fn ipc_to_batch_helper(bytes: &[u8]) -> Result<RecordBatch, ArrowError> {
+    let mut stream_reader = StreamReader::try_new_buffered(Cursor::new(bytes), 
None)?;

Review Comment:
   Also, once this is available
    - https://github.com/apache/arrow-rs/pull/7120
   You can probably save quite a bit of time revalidating known good inputs



##########
src/util.rs:
##########
@@ -0,0 +1,457 @@
+use std::collections::HashMap;
+use std::fmt::Display;
+use std::future::Future;
+use std::io::Cursor;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::time::Duration;
+
+use arrow::array::RecordBatch;
+use arrow::datatypes::SchemaRef;
+use arrow::error::ArrowError;
+use arrow::ipc::convert::fb_to_schema;
+use arrow::ipc::reader::StreamReader;
+use arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
+use arrow::ipc::{root_as_message, MetadataVersion};
+use arrow::pyarrow::*;
+use arrow::util::pretty;
+use arrow_flight::{FlightClient, FlightData, Ticket};
+use async_stream::stream;
+use datafusion::common::internal_datafusion_err;
+use datafusion::common::tree_node::{Transformed, TreeNode};
+use datafusion::datasource::physical_plan::ParquetExec;
+use datafusion::error::DataFusionError;
+use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, 
SessionStateBuilder};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{displayable, ExecutionPlan, 
ExecutionPlanProperties};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_proto::physical_plan::AsExecutionPlan;
+use futures::{Stream, StreamExt};
+use parking_lot::Mutex;
+use pyo3::prelude::*;
+use pyo3::types::{PyBytes, PyList};
+use tonic::transport::Channel;
+
+use crate::codec::RayCodec;
+use crate::protobuf::FlightTicketData;
+use crate::ray_stage_reader::RayStageReaderExec;
+use crate::stage_service::ServiceClients;
+use prost::Message;
+use tokio::macros::support::thread_rng_n;
+
+pub(crate) trait ResultExt<T> {
+    fn to_py_err(self) -> PyResult<T>;
+}
+
+impl<T, E> ResultExt<T> for Result<T, E>
+where
+    E: std::fmt::Debug,
+{
+    fn to_py_err(self) -> PyResult<T> {
+        match self {
+            Ok(x) => Ok(x),
+            Err(e) => Err(PyErr::new::<pyo3::exceptions::PyException, 
_>(format!(
+                "{:?}",
+                e
+            ))),
+        }
+    }
+}
+
+/// we need these two functions to go back and forth between IPC 
representations
+/// from rust to rust to avoid using the C++ implementation from pyarrow as it
+/// will generate unaligned data causing us errors
+///
+/// not used in current arrow flight implementation, but leaving these here
+#[pyfunction]
+pub fn batch_to_ipc(py: Python, batch: PyArrowType<RecordBatch>) -> 
PyResult<Py<PyBytes>> {
+    let batch = batch.0;
+
+    let bytes = batch_to_ipc_helper(&batch).to_py_err()?;
+
+    //TODO:  unsure about this next line.  Compiler is happy but is this 
correct?
+    Ok(PyBytes::new_bound(py, &bytes).unbind())
+}
+
+#[pyfunction]
+pub fn ipc_to_batch(bytes: &[u8], py: Python) -> PyResult<PyObject> {
+    let batch = ipc_to_batch_helper(bytes).to_py_err()?;
+    batch.to_pyarrow(py)
+}
+
+fn batch_to_ipc_helper(batch: &RecordBatch) -> Result<Vec<u8>, ArrowError> {
+    let schema = batch.schema();
+    let buffer: Vec<u8> = Vec::new();
+    let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)
+        .map_err(|e| internal_datafusion_err!("Cannot create ipcwriteoptions 
{e}"))?;
+
+    let mut stream_writer = StreamWriter::try_new_with_options(buffer, 
&schema, options)?;
+    stream_writer.write(batch)?;
+    stream_writer.into_inner()
+}
+
+fn ipc_to_batch_helper(bytes: &[u8]) -> Result<RecordBatch, ArrowError> {
+    let mut stream_reader = StreamReader::try_new_buffered(Cursor::new(bytes), 
None)?;

Review Comment:
   FYI since you are reading from memory here anyways I don't think buffering 
adds much extra value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to