Hi Mike, I've been thinking how you are considering adding metadata to the RecordBatch.
The struct it is now defined as pub struct RecordBatch { > schema: SchemaRef, > columns: Vec<Arc<Array>>, > } Are you suggesting something like this? pub struct RecordBatch { > schema: SchemaRef, > columns: Vec<Arc<Array>>, metadata: RecodBatchMetadata } Because if that is the case, why not use the Schema metadata. The schema metadata is a hashmap that can hold any information you require for example, here im using the metadata to create a schema and then use that schema with IPC and the RecordBatch fn main() { > let mut schema_metadata: HashMap<String, String> = HashMap::new(); * schema_metadata.insert("file_name".to_string(), > "my_file.parquet".to_string());* > let schema = Schema::new_with_metadata( > vec![ > Field::new("index", DataType::Int32, false), > Field::new("word", DataType::Utf8, false), > ], > schema_metadata, > ); > let a = Int32Array::from(vec![1, 2, 3, 4, 5]); > let b = StringArray::from(vec!["one", "two", "three", "four", "five"]); > let batch = > RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a), > Arc::new(b)]).unwrap(); > let stream = TcpStream::connect("127.0.0.1:8000").unwrap(); > let mut writer = StreamWriter::try_new(stream, &schema).unwrap(); > writer.write(&batch).unwrap(); > writer.finish().unwrap(); > } I dont know how the Spark implementation works, but I think it would be possible to emulate it using this metadata. Don't you think so? Fernando On Thu, Feb 25, 2021 at 4:36 AM Mike Seddon <seddo...@gmail.com> wrote: > Thanks Micah. > > It is actually Rust implementation that is the odd one out. Ideally adding > a metadata KeyValue to the RecordBatch plus your suggested 'reserved' key > would be the best option. > > On Thu, Feb 25, 2021 at 3:26 PM Micah Kornfield <emkornfi...@gmail.com> > wrote: > > > Thanks for looking into it. I would guess it is likely possible "hoist" > > metadata from a record batch schema object to the Message but understand > if > > it isn't something you want to pursue. > > > > On Wed, Feb 24, 2021 at 8:19 PM Mike Seddon <seddo...@gmail.com> wrote: > > > >> Hi Micah, > >> Thank you for providing this information. I have reviewed the > >> documentation you provided and have a few conclusions: > >> > >> 1. RecordBatch does not have the capability to attach user defined > >> metadata (KeyValue attributes): > >> https://github.com/apache/arrow/blob/master/format/Message.fbs#L83 > >> 2. Schema does have this capability but it would not work to pass > >> per-batch input files as the design indicates that the Schema object > would > >> be passed once and then a series of interleaved DictionaryBatch or > >> MessageBatch messages must meet the Schema: > >> > https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst#ipc-streaming-format > >> > >> In the Rust implementation each RecordBatch embeds the Schema so that > >> each schema can have different metadata (like filename in this case). I > >> think this will have to be implemented Rust as a memory-only attribute > >> which does not get persisted unless more significant changes to the > >> protocol are made. > >> > >> Thanks > >> Mike > >> > >> On Thu, Feb 25, 2021 at 11:14 AM Micah Kornfield <emkornfi...@gmail.com > > > >> wrote: > >> > >>> The process would be to create a PR proposal to update to the custom > >>> metadata specification [1] to reserve a new word and describe its use. > >>> Then send a [DISCUSS] email on this list. Once there is consensus we > can > >>> formally vote and merge the change. > >>> > >>> [1] > >>> > >>> > https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst > >>> > >>> On Wed, Feb 24, 2021 at 3:47 PM Mike Seddon <seddo...@gmail.com> > wrote: > >>> > >>> > Thanks for both of your comments. > >>> > > >>> > @Andrew Schema.metadata does look like a logical place to house the > >>> > information so that would solve part of the problem. Do you have any > >>> > thoughts on whether we change the function signature: > >>> > > >>> > From: <Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + > >>> > Sync>; > >>> > To: <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata) -> > >>> > Result<ColumnarValue> + Send + Sync>; > >>> > > >>> > @Micah It would be nice to have a reserved metadata key so this could > >>> be > >>> > shared but I am not sure of the admin process for the Arrow project > to > >>> > agree something like that. Is there a forum? > >>> > > >>> > On Thu, Feb 25, 2021 at 8:58 AM Micah Kornfield < > emkornfi...@gmail.com > >>> > > >>> > wrote: > >>> > > >>> > > At least C++ (and the IPC format) a schema can be shared across the > >>> many > >>> > > RecordBatch's which might have different sources. > >>> > > > >>> > > It might be useful to define a reserved metadata key (similar to > >>> > > extension types) so that the data can be interpreted consistently. > >>> > > > >>> > > On Wed, Feb 24, 2021 at 11:29 AM Andrew Lamb <al...@influxdata.com > > > >>> > wrote: > >>> > > > >>> > > > I wonder if you could add the file_name as metadata on the > >>> `Schema` of > >>> > > the > >>> > > > RecordBatch rather than the RecordBatch itself? Since every > >>> RecordBatch > >>> > > has > >>> > > > a schema, I don't fully understand the need to add something > >>> additional > >>> > > to > >>> > > > the RecordBatch > >>> > > > > >>> > > > > >>> > > > > >>> > > > >>> > > >>> > https://docs.rs/arrow/3.0.0/arrow/datatypes/struct.Schema.html#method.new_with_metadata > >>> > > > > >>> > > > On Wed, Feb 24, 2021 at 1:20 AM Mike Seddon <seddo...@gmail.com> > >>> > wrote: > >>> > > > > >>> > > > > Hi, > >>> > > > > > >>> > > > > One of Apache Spark's very useful SQL functions is the > >>> > > 'input_file_name' > >>> > > > > SQL function which provides a simple API for identifying the > >>> source > >>> > of > >>> > > a > >>> > > > > row of data when sourced from a file-based source like Parquet > or > >>> > CSV. > >>> > > > This > >>> > > > > is particularly useful for identifying which chunk/partition > of a > >>> > > Parquet > >>> > > > > the row came from and is used heavily by the DeltaLake format > to > >>> > > > determine > >>> > > > > which files are impacted for MERGE operations. > >>> > > > > > >>> > > > > I have built a functional proof-of-concept for DataFusion but > it > >>> > > requires > >>> > > > > modifying the RecordBatch struct to include a 'metadata' struct > >>> > > > > (RecordBatchMetadata) to carry the source file name attached to > >>> each > >>> > > > batch. > >>> > > > > > >>> > > > > It also requires changing the ScalarFunctionImplementation > >>> signature > >>> > to > >>> > > > > support exposing the metadata (and therefore all the > functions). > >>> > > > > > >>> > > > > From: <Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + > >>> Send + > >>> > > > > Sync>; > >>> > > > > To: <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata) -> > >>> > > > > Result<ColumnarValue> + Send + Sync>; > >>> > > > > > >>> > > > > These changes have been made in a personal feature branch and > are > >>> > > > available > >>> > > > > for review (still needs cleaning) but conceptually does anyone > >>> have a > >>> > > > > problem with this API change or have a better proposal? > >>> > > > > > >>> > > > > Thanks > >>> > > > > Mike > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >> >