I see. You are storing the file name when reading a json, csv, and parquet file.
Just out of curiosity, how would you use the file name in spark? Are you using it for file statistics? On Thu, Feb 25, 2021 at 9:36 AM Mike Seddon <seddo...@gmail.com> wrote: > Hi Fernando, > > After Andrew's reply I have moved the filename metadata into the Schema and > actually changed the ScalarFunctionImplementation signature to: Arc<dyn > Fn(&[ColumnarValue], SchemaRef) -> Result<ColumnarValue> + Send + Sync>; > > I have a functional (WIP) repo already: > https://github.com/seddonm1/arrow/compare/master...seddonm1:input-file > > I need to add some more tests (mainly ensure multipart parquet works as > expected) but I wanted to gather feedback on the proposal before cleaning > up for PR. > > Mike > > On Thu, Feb 25, 2021 at 8:30 PM Fernando Herrera < > fernando.j.herr...@gmail.com> wrote: > > > Hi Mike, > > > > I've been thinking how you are considering adding metadata to the > > RecordBatch. > > > > The struct it is now defined as > > > > pub struct RecordBatch { > > > schema: SchemaRef, > > > columns: Vec<Arc<Array>>, > > > } > > > > > > Are you suggesting something like this? > > > > pub struct RecordBatch { > > > schema: SchemaRef, > > > columns: Vec<Arc<Array>>, > > > > metadata: RecodBatchMetadata > > > > } > > > > > > Because if that is the case, why not use the Schema metadata. The schema > > metadata is a hashmap that can hold any information you require > > > > for example, here im using the metadata to create a schema and then use > > that schema with IPC and the RecordBatch > > > > fn main() { > > > let mut schema_metadata: HashMap<String, String> = HashMap::new(); > > > > * schema_metadata.insert("file_name".to_string(), > > > "my_file.parquet".to_string());* > > > let schema = Schema::new_with_metadata( > > > vec![ > > > Field::new("index", DataType::Int32, false), > > > Field::new("word", DataType::Utf8, false), > > > ], > > > schema_metadata, > > > ); > > > let a = Int32Array::from(vec![1, 2, 3, 4, 5]); > > > let b = StringArray::from(vec!["one", "two", "three", "four", > > "five"]); > > > let batch = > > > RecordBatch::try_new(Arc::new(schema.clone()), > vec![Arc::new(a), > > > Arc::new(b)]).unwrap(); > > > > > > > > > let stream = TcpStream::connect("127.0.0.1:8000").unwrap(); > > > let mut writer = StreamWriter::try_new(stream, &schema).unwrap(); > > > writer.write(&batch).unwrap(); > > > writer.finish().unwrap(); > > > } > > > > > > I dont know how the Spark implementation works, but I think it would be > > possible to emulate it using this metadata. Don't you think so? > > > > Fernando > > > > On Thu, Feb 25, 2021 at 4:36 AM Mike Seddon <seddo...@gmail.com> wrote: > > > > > Thanks Micah. > > > > > > It is actually Rust implementation that is the odd one out. Ideally > > adding > > > a metadata KeyValue to the RecordBatch plus your suggested 'reserved' > key > > > would be the best option. > > > > > > On Thu, Feb 25, 2021 at 3:26 PM Micah Kornfield <emkornfi...@gmail.com > > > > > wrote: > > > > > > > Thanks for looking into it. I would guess it is likely possible > "hoist" > > > > metadata from a record batch schema object to the Message but > > understand > > > if > > > > it isn't something you want to pursue. > > > > > > > > On Wed, Feb 24, 2021 at 8:19 PM Mike Seddon <seddo...@gmail.com> > > wrote: > > > > > > > >> Hi Micah, > > > >> Thank you for providing this information. I have reviewed the > > > >> documentation you provided and have a few conclusions: > > > >> > > > >> 1. RecordBatch does not have the capability to attach user defined > > > >> metadata (KeyValue attributes): > > > >> https://github.com/apache/arrow/blob/master/format/Message.fbs#L83 > > > >> 2. Schema does have this capability but it would not work to pass > > > >> per-batch input files as the design indicates that the Schema object > > > would > > > >> be passed once and then a series of interleaved DictionaryBatch or > > > >> MessageBatch messages must meet the Schema: > > > >> > > > > > > https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst#ipc-streaming-format > > > >> > > > >> In the Rust implementation each RecordBatch embeds the Schema so > that > > > >> each schema can have different metadata (like filename in this > case). > > I > > > >> think this will have to be implemented Rust as a memory-only > attribute > > > >> which does not get persisted unless more significant changes to the > > > >> protocol are made. > > > >> > > > >> Thanks > > > >> Mike > > > >> > > > >> On Thu, Feb 25, 2021 at 11:14 AM Micah Kornfield < > > emkornfi...@gmail.com > > > > > > > >> wrote: > > > >> > > > >>> The process would be to create a PR proposal to update to the > custom > > > >>> metadata specification [1] to reserve a new word and describe its > > use. > > > >>> Then send a [DISCUSS] email on this list. Once there is consensus > we > > > can > > > >>> formally vote and merge the change. > > > >>> > > > >>> [1] > > > >>> > > > >>> > > > > > > https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst > > > >>> > > > >>> On Wed, Feb 24, 2021 at 3:47 PM Mike Seddon <seddo...@gmail.com> > > > wrote: > > > >>> > > > >>> > Thanks for both of your comments. > > > >>> > > > > >>> > @Andrew Schema.metadata does look like a logical place to house > the > > > >>> > information so that would solve part of the problem. Do you have > > any > > > >>> > thoughts on whether we change the function signature: > > > >>> > > > > >>> > From: <Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + > > Send + > > > >>> > Sync>; > > > >>> > To: <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata) -> > > > >>> > Result<ColumnarValue> + Send + Sync>; > > > >>> > > > > >>> > @Micah It would be nice to have a reserved metadata key so this > > could > > > >>> be > > > >>> > shared but I am not sure of the admin process for the Arrow > project > > > to > > > >>> > agree something like that. Is there a forum? > > > >>> > > > > >>> > On Thu, Feb 25, 2021 at 8:58 AM Micah Kornfield < > > > emkornfi...@gmail.com > > > >>> > > > > >>> > wrote: > > > >>> > > > > >>> > > At least C++ (and the IPC format) a schema can be shared across > > the > > > >>> many > > > >>> > > RecordBatch's which might have different sources. > > > >>> > > > > > >>> > > It might be useful to define a reserved metadata key (similar > to > > > >>> > > extension types) so that the data can be interpreted > > consistently. > > > >>> > > > > > >>> > > On Wed, Feb 24, 2021 at 11:29 AM Andrew Lamb < > > al...@influxdata.com > > > > > > > >>> > wrote: > > > >>> > > > > > >>> > > > I wonder if you could add the file_name as metadata on the > > > >>> `Schema` of > > > >>> > > the > > > >>> > > > RecordBatch rather than the RecordBatch itself? Since every > > > >>> RecordBatch > > > >>> > > has > > > >>> > > > a schema, I don't fully understand the need to add something > > > >>> additional > > > >>> > > to > > > >>> > > > the RecordBatch > > > >>> > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > > > > https://docs.rs/arrow/3.0.0/arrow/datatypes/struct.Schema.html#method.new_with_metadata > > > >>> > > > > > > >>> > > > On Wed, Feb 24, 2021 at 1:20 AM Mike Seddon < > > seddo...@gmail.com> > > > >>> > wrote: > > > >>> > > > > > > >>> > > > > Hi, > > > >>> > > > > > > > >>> > > > > One of Apache Spark's very useful SQL functions is the > > > >>> > > 'input_file_name' > > > >>> > > > > SQL function which provides a simple API for identifying > the > > > >>> source > > > >>> > of > > > >>> > > a > > > >>> > > > > row of data when sourced from a file-based source like > > Parquet > > > or > > > >>> > CSV. > > > >>> > > > This > > > >>> > > > > is particularly useful for identifying which > chunk/partition > > > of a > > > >>> > > Parquet > > > >>> > > > > the row came from and is used heavily by the DeltaLake > format > > > to > > > >>> > > > determine > > > >>> > > > > which files are impacted for MERGE operations. > > > >>> > > > > > > > >>> > > > > I have built a functional proof-of-concept for DataFusion > but > > > it > > > >>> > > requires > > > >>> > > > > modifying the RecordBatch struct to include a 'metadata' > > struct > > > >>> > > > > (RecordBatchMetadata) to carry the source file name > attached > > to > > > >>> each > > > >>> > > > batch. > > > >>> > > > > > > > >>> > > > > It also requires changing the ScalarFunctionImplementation > > > >>> signature > > > >>> > to > > > >>> > > > > support exposing the metadata (and therefore all the > > > functions). > > > >>> > > > > > > > >>> > > > > From: <Arc<dyn Fn(&[ColumnarValue]) -> > Result<ColumnarValue> > > + > > > >>> Send + > > > >>> > > > > Sync>; > > > >>> > > > > To: <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata) -> > > > >>> > > > > Result<ColumnarValue> + Send + Sync>; > > > >>> > > > > > > > >>> > > > > These changes have been made in a personal feature branch > and > > > are > > > >>> > > > available > > > >>> > > > > for review (still needs cleaning) but conceptually does > > anyone > > > >>> have a > > > >>> > > > > problem with this API change or have a better proposal? > > > >>> > > > > > > > >>> > > > > Thanks > > > >>> > > > > Mike > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >> > > > > > >