Hi Fernando, After Andrew's reply I have moved the filename metadata into the Schema and actually changed the ScalarFunctionImplementation signature to: Arc<dyn Fn(&[ColumnarValue], SchemaRef) -> Result<ColumnarValue> + Send + Sync>;
I have a functional (WIP) repo already: https://github.com/seddonm1/arrow/compare/master...seddonm1:input-file I need to add some more tests (mainly ensure multipart parquet works as expected) but I wanted to gather feedback on the proposal before cleaning up for PR. Mike On Thu, Feb 25, 2021 at 8:30 PM Fernando Herrera < fernando.j.herr...@gmail.com> wrote: > Hi Mike, > > I've been thinking how you are considering adding metadata to the > RecordBatch. > > The struct it is now defined as > > pub struct RecordBatch { > > schema: SchemaRef, > > columns: Vec<Arc<Array>>, > > } > > > Are you suggesting something like this? > > pub struct RecordBatch { > > schema: SchemaRef, > > columns: Vec<Arc<Array>>, > > metadata: RecodBatchMetadata > > } > > > Because if that is the case, why not use the Schema metadata. The schema > metadata is a hashmap that can hold any information you require > > for example, here im using the metadata to create a schema and then use > that schema with IPC and the RecordBatch > > fn main() { > > let mut schema_metadata: HashMap<String, String> = HashMap::new(); > > * schema_metadata.insert("file_name".to_string(), > > "my_file.parquet".to_string());* > > let schema = Schema::new_with_metadata( > > vec![ > > Field::new("index", DataType::Int32, false), > > Field::new("word", DataType::Utf8, false), > > ], > > schema_metadata, > > ); > > let a = Int32Array::from(vec![1, 2, 3, 4, 5]); > > let b = StringArray::from(vec!["one", "two", "three", "four", > "five"]); > > let batch = > > RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a), > > Arc::new(b)]).unwrap(); > > > > > let stream = TcpStream::connect("127.0.0.1:8000").unwrap(); > > let mut writer = StreamWriter::try_new(stream, &schema).unwrap(); > > writer.write(&batch).unwrap(); > > writer.finish().unwrap(); > > } > > > I dont know how the Spark implementation works, but I think it would be > possible to emulate it using this metadata. Don't you think so? > > Fernando > > On Thu, Feb 25, 2021 at 4:36 AM Mike Seddon <seddo...@gmail.com> wrote: > > > Thanks Micah. > > > > It is actually Rust implementation that is the odd one out. Ideally > adding > > a metadata KeyValue to the RecordBatch plus your suggested 'reserved' key > > would be the best option. > > > > On Thu, Feb 25, 2021 at 3:26 PM Micah Kornfield <emkornfi...@gmail.com> > > wrote: > > > > > Thanks for looking into it. I would guess it is likely possible "hoist" > > > metadata from a record batch schema object to the Message but > understand > > if > > > it isn't something you want to pursue. > > > > > > On Wed, Feb 24, 2021 at 8:19 PM Mike Seddon <seddo...@gmail.com> > wrote: > > > > > >> Hi Micah, > > >> Thank you for providing this information. I have reviewed the > > >> documentation you provided and have a few conclusions: > > >> > > >> 1. RecordBatch does not have the capability to attach user defined > > >> metadata (KeyValue attributes): > > >> https://github.com/apache/arrow/blob/master/format/Message.fbs#L83 > > >> 2. Schema does have this capability but it would not work to pass > > >> per-batch input files as the design indicates that the Schema object > > would > > >> be passed once and then a series of interleaved DictionaryBatch or > > >> MessageBatch messages must meet the Schema: > > >> > > > https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst#ipc-streaming-format > > >> > > >> In the Rust implementation each RecordBatch embeds the Schema so that > > >> each schema can have different metadata (like filename in this case). > I > > >> think this will have to be implemented Rust as a memory-only attribute > > >> which does not get persisted unless more significant changes to the > > >> protocol are made. > > >> > > >> Thanks > > >> Mike > > >> > > >> On Thu, Feb 25, 2021 at 11:14 AM Micah Kornfield < > emkornfi...@gmail.com > > > > > >> wrote: > > >> > > >>> The process would be to create a PR proposal to update to the custom > > >>> metadata specification [1] to reserve a new word and describe its > use. > > >>> Then send a [DISCUSS] email on this list. Once there is consensus we > > can > > >>> formally vote and merge the change. > > >>> > > >>> [1] > > >>> > > >>> > > > https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst > > >>> > > >>> On Wed, Feb 24, 2021 at 3:47 PM Mike Seddon <seddo...@gmail.com> > > wrote: > > >>> > > >>> > Thanks for both of your comments. > > >>> > > > >>> > @Andrew Schema.metadata does look like a logical place to house the > > >>> > information so that would solve part of the problem. Do you have > any > > >>> > thoughts on whether we change the function signature: > > >>> > > > >>> > From: <Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + > Send + > > >>> > Sync>; > > >>> > To: <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata) -> > > >>> > Result<ColumnarValue> + Send + Sync>; > > >>> > > > >>> > @Micah It would be nice to have a reserved metadata key so this > could > > >>> be > > >>> > shared but I am not sure of the admin process for the Arrow project > > to > > >>> > agree something like that. Is there a forum? > > >>> > > > >>> > On Thu, Feb 25, 2021 at 8:58 AM Micah Kornfield < > > emkornfi...@gmail.com > > >>> > > > >>> > wrote: > > >>> > > > >>> > > At least C++ (and the IPC format) a schema can be shared across > the > > >>> many > > >>> > > RecordBatch's which might have different sources. > > >>> > > > > >>> > > It might be useful to define a reserved metadata key (similar to > > >>> > > extension types) so that the data can be interpreted > consistently. > > >>> > > > > >>> > > On Wed, Feb 24, 2021 at 11:29 AM Andrew Lamb < > al...@influxdata.com > > > > > >>> > wrote: > > >>> > > > > >>> > > > I wonder if you could add the file_name as metadata on the > > >>> `Schema` of > > >>> > > the > > >>> > > > RecordBatch rather than the RecordBatch itself? Since every > > >>> RecordBatch > > >>> > > has > > >>> > > > a schema, I don't fully understand the need to add something > > >>> additional > > >>> > > to > > >>> > > > the RecordBatch > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > > https://docs.rs/arrow/3.0.0/arrow/datatypes/struct.Schema.html#method.new_with_metadata > > >>> > > > > > >>> > > > On Wed, Feb 24, 2021 at 1:20 AM Mike Seddon < > seddo...@gmail.com> > > >>> > wrote: > > >>> > > > > > >>> > > > > Hi, > > >>> > > > > > > >>> > > > > One of Apache Spark's very useful SQL functions is the > > >>> > > 'input_file_name' > > >>> > > > > SQL function which provides a simple API for identifying the > > >>> source > > >>> > of > > >>> > > a > > >>> > > > > row of data when sourced from a file-based source like > Parquet > > or > > >>> > CSV. > > >>> > > > This > > >>> > > > > is particularly useful for identifying which chunk/partition > > of a > > >>> > > Parquet > > >>> > > > > the row came from and is used heavily by the DeltaLake format > > to > > >>> > > > determine > > >>> > > > > which files are impacted for MERGE operations. > > >>> > > > > > > >>> > > > > I have built a functional proof-of-concept for DataFusion but > > it > > >>> > > requires > > >>> > > > > modifying the RecordBatch struct to include a 'metadata' > struct > > >>> > > > > (RecordBatchMetadata) to carry the source file name attached > to > > >>> each > > >>> > > > batch. > > >>> > > > > > > >>> > > > > It also requires changing the ScalarFunctionImplementation > > >>> signature > > >>> > to > > >>> > > > > support exposing the metadata (and therefore all the > > functions). > > >>> > > > > > > >>> > > > > From: <Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> > + > > >>> Send + > > >>> > > > > Sync>; > > >>> > > > > To: <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata) -> > > >>> > > > > Result<ColumnarValue> + Send + Sync>; > > >>> > > > > > > >>> > > > > These changes have been made in a personal feature branch and > > are > > >>> > > > available > > >>> > > > > for review (still needs cleaning) but conceptually does > anyone > > >>> have a > > >>> > > > > problem with this API change or have a better proposal? > > >>> > > > > > > >>> > > > > Thanks > > >>> > > > > Mike > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >> > > >