alamb commented on code in PR #60: URL: https://github.com/apache/datafusion-ray/pull/60#discussion_r1955311978
########## src/util.rs: ########## @@ -0,0 +1,457 @@ +use std::collections::HashMap; +use std::fmt::Display; +use std::future::Future; +use std::io::Cursor; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; + +use arrow::array::RecordBatch; +use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError; +use arrow::ipc::convert::fb_to_schema; +use arrow::ipc::reader::StreamReader; +use arrow::ipc::writer::{IpcWriteOptions, StreamWriter}; +use arrow::ipc::{root_as_message, MetadataVersion}; +use arrow::pyarrow::*; +use arrow::util::pretty; +use arrow_flight::{FlightClient, FlightData, Ticket}; +use async_stream::stream; +use datafusion::common::internal_datafusion_err; +use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::datasource::physical_plan::ParquetExec; +use datafusion::error::DataFusionError; +use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, SessionStateBuilder}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_proto::physical_plan::AsExecutionPlan; +use futures::{Stream, StreamExt}; +use parking_lot::Mutex; +use pyo3::prelude::*; +use pyo3::types::{PyBytes, PyList}; +use tonic::transport::Channel; + +use crate::codec::RayCodec; +use crate::protobuf::FlightTicketData; +use crate::ray_stage_reader::RayStageReaderExec; +use crate::stage_service::ServiceClients; +use prost::Message; +use tokio::macros::support::thread_rng_n; + +pub(crate) trait ResultExt<T> { + fn to_py_err(self) -> PyResult<T>; +} + +impl<T, E> ResultExt<T> for Result<T, E> +where + E: std::fmt::Debug, +{ + fn to_py_err(self) -> PyResult<T> { + match self { + Ok(x) => Ok(x), + Err(e) => Err(PyErr::new::<pyo3::exceptions::PyException, _>(format!( + "{:?}", + e + ))), + } + } +} + +/// we need these two functions to go back and forth between IPC representations +/// from rust to rust to avoid using the C++ implementation from pyarrow as it +/// will generate unaligned data causing us errors +/// +/// not used in current arrow flight implementation, but leaving these here +#[pyfunction] +pub fn batch_to_ipc(py: Python, batch: PyArrowType<RecordBatch>) -> PyResult<Py<PyBytes>> { + let batch = batch.0; + + let bytes = batch_to_ipc_helper(&batch).to_py_err()?; + + //TODO: unsure about this next line. Compiler is happy but is this correct? + Ok(PyBytes::new_bound(py, &bytes).unbind()) +} + +#[pyfunction] +pub fn ipc_to_batch(bytes: &[u8], py: Python) -> PyResult<PyObject> { + let batch = ipc_to_batch_helper(bytes).to_py_err()?; + batch.to_pyarrow(py) +} + +fn batch_to_ipc_helper(batch: &RecordBatch) -> Result<Vec<u8>, ArrowError> { + let schema = batch.schema(); + let buffer: Vec<u8> = Vec::new(); + let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5) + .map_err(|e| internal_datafusion_err!("Cannot create ipcwriteoptions {e}"))?; + + let mut stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?; + stream_writer.write(batch)?; + stream_writer.into_inner() +} + +fn ipc_to_batch_helper(bytes: &[u8]) -> Result<RecordBatch, ArrowError> { + let mut stream_reader = StreamReader::try_new_buffered(Cursor::new(bytes), None)?; Review Comment: Also, once this is available - https://github.com/apache/arrow-rs/pull/7120 You can probably save quite a bit of time revalidating known good inputs ########## src/util.rs: ########## @@ -0,0 +1,457 @@ +use std::collections::HashMap; +use std::fmt::Display; +use std::future::Future; +use std::io::Cursor; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; + +use arrow::array::RecordBatch; +use arrow::datatypes::SchemaRef; +use arrow::error::ArrowError; +use arrow::ipc::convert::fb_to_schema; +use arrow::ipc::reader::StreamReader; +use arrow::ipc::writer::{IpcWriteOptions, StreamWriter}; +use arrow::ipc::{root_as_message, MetadataVersion}; +use arrow::pyarrow::*; +use arrow::util::pretty; +use arrow_flight::{FlightClient, FlightData, Ticket}; +use async_stream::stream; +use datafusion::common::internal_datafusion_err; +use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::datasource::physical_plan::ParquetExec; +use datafusion::error::DataFusionError; +use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, SessionStateBuilder}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_proto::physical_plan::AsExecutionPlan; +use futures::{Stream, StreamExt}; +use parking_lot::Mutex; +use pyo3::prelude::*; +use pyo3::types::{PyBytes, PyList}; +use tonic::transport::Channel; + +use crate::codec::RayCodec; +use crate::protobuf::FlightTicketData; +use crate::ray_stage_reader::RayStageReaderExec; +use crate::stage_service::ServiceClients; +use prost::Message; +use tokio::macros::support::thread_rng_n; + +pub(crate) trait ResultExt<T> { + fn to_py_err(self) -> PyResult<T>; +} + +impl<T, E> ResultExt<T> for Result<T, E> +where + E: std::fmt::Debug, +{ + fn to_py_err(self) -> PyResult<T> { + match self { + Ok(x) => Ok(x), + Err(e) => Err(PyErr::new::<pyo3::exceptions::PyException, _>(format!( + "{:?}", + e + ))), + } + } +} + +/// we need these two functions to go back and forth between IPC representations +/// from rust to rust to avoid using the C++ implementation from pyarrow as it +/// will generate unaligned data causing us errors +/// +/// not used in current arrow flight implementation, but leaving these here +#[pyfunction] +pub fn batch_to_ipc(py: Python, batch: PyArrowType<RecordBatch>) -> PyResult<Py<PyBytes>> { + let batch = batch.0; + + let bytes = batch_to_ipc_helper(&batch).to_py_err()?; + + //TODO: unsure about this next line. Compiler is happy but is this correct? + Ok(PyBytes::new_bound(py, &bytes).unbind()) +} + +#[pyfunction] +pub fn ipc_to_batch(bytes: &[u8], py: Python) -> PyResult<PyObject> { + let batch = ipc_to_batch_helper(bytes).to_py_err()?; + batch.to_pyarrow(py) +} + +fn batch_to_ipc_helper(batch: &RecordBatch) -> Result<Vec<u8>, ArrowError> { + let schema = batch.schema(); + let buffer: Vec<u8> = Vec::new(); + let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5) + .map_err(|e| internal_datafusion_err!("Cannot create ipcwriteoptions {e}"))?; + + let mut stream_writer = StreamWriter::try_new_with_options(buffer, &schema, options)?; + stream_writer.write(batch)?; + stream_writer.into_inner() +} + +fn ipc_to_batch_helper(bytes: &[u8]) -> Result<RecordBatch, ArrowError> { + let mut stream_reader = StreamReader::try_new_buffered(Cursor::new(bytes), None)?; Review Comment: FYI since you are reading from memory here anyways I don't think buffering adds much extra value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org