I see. You are storing the file name when reading a json, csv, and parquet
file.

Just out of curiosity, how would you use the file name in spark?
Are you using it for file statistics?

On Thu, Feb 25, 2021 at 9:36 AM Mike Seddon <seddo...@gmail.com> wrote:

> Hi Fernando,
>
> After Andrew's reply I have moved the filename metadata into the Schema and
> actually changed the ScalarFunctionImplementation signature to:  Arc<dyn
> Fn(&[ColumnarValue], SchemaRef) -> Result<ColumnarValue> + Send + Sync>;
>
> I have a functional (WIP) repo already:
> https://github.com/seddonm1/arrow/compare/master...seddonm1:input-file
>
> I need to add some more tests (mainly ensure multipart parquet works as
> expected) but I wanted to gather feedback on the proposal before cleaning
> up for PR.
>
> Mike
>
> On Thu, Feb 25, 2021 at 8:30 PM Fernando Herrera <
> fernando.j.herr...@gmail.com> wrote:
>
> > Hi Mike,
> >
> > I've been thinking how you are considering adding metadata to the
> > RecordBatch.
> >
> > The struct it is now defined as
> >
> > pub struct RecordBatch {
> > >     schema: SchemaRef,
> > >     columns: Vec<Arc<Array>>,
> > > }
> >
> >
> > Are you suggesting something like this?
> >
> > pub struct RecordBatch {
> > >     schema: SchemaRef,
> > >     columns: Vec<Arc<Array>>,
> >
> >     metadata: RecodBatchMetadata
> >
> > }
> >
> >
> > Because if that is the case, why not use the Schema metadata. The schema
> > metadata is a hashmap that can hold any information you require
> >
> > for example, here im using the metadata to create a schema and then use
> > that schema with IPC and the RecordBatch
> >
> > fn main() {
> > >     let mut schema_metadata: HashMap<String, String> = HashMap::new();
> >
> >   *  schema_metadata.insert("file_name".to_string(),
> > > "my_file.parquet".to_string());*
> > >     let schema = Schema::new_with_metadata(
> > >         vec![
> > >             Field::new("index", DataType::Int32, false),
> > >             Field::new("word", DataType::Utf8, false),
> > >         ],
> > >         schema_metadata,
> > >     );
> > >     let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
> > >     let b = StringArray::from(vec!["one", "two", "three", "four",
> > "five"]);
> > >     let batch =
> > >         RecordBatch::try_new(Arc::new(schema.clone()),
> vec![Arc::new(a),
> > > Arc::new(b)]).unwrap();
> > >
> >
> >
> >     let stream = TcpStream::connect("127.0.0.1:8000").unwrap();
> > >     let mut writer = StreamWriter::try_new(stream, &schema).unwrap();
> > >     writer.write(&batch).unwrap();
> > >     writer.finish().unwrap();
> > > }
> >
> >
> > I dont know how the Spark implementation works, but I think it would be
> > possible to emulate it using this metadata. Don't you think so?
> >
> > Fernando
> >
> > On Thu, Feb 25, 2021 at 4:36 AM Mike Seddon <seddo...@gmail.com> wrote:
> >
> > > Thanks Micah.
> > >
> > > It is actually Rust implementation that is the odd one out. Ideally
> > adding
> > > a metadata KeyValue to the RecordBatch plus your suggested 'reserved'
> key
> > > would be the best option.
> > >
> > > On Thu, Feb 25, 2021 at 3:26 PM Micah Kornfield <emkornfi...@gmail.com
> >
> > > wrote:
> > >
> > > > Thanks for looking into it. I would guess it is likely possible
> "hoist"
> > > > metadata from a record batch schema object to the Message but
> > understand
> > > if
> > > > it isn't something you want to pursue.
> > > >
> > > > On Wed, Feb 24, 2021 at 8:19 PM Mike Seddon <seddo...@gmail.com>
> > wrote:
> > > >
> > > >> Hi Micah,
> > > >> Thank you for providing this information. I have reviewed the
> > > >> documentation you provided and have a few conclusions:
> > > >>
> > > >> 1. RecordBatch does not have the capability to attach user defined
> > > >> metadata (KeyValue attributes):
> > > >> https://github.com/apache/arrow/blob/master/format/Message.fbs#L83
> > > >> 2. Schema does have this capability but it would not work to pass
> > > >> per-batch input files as the design indicates that the Schema object
> > > would
> > > >> be passed once and then a series of interleaved DictionaryBatch or
> > > >> MessageBatch messages must meet the Schema:
> > > >>
> > >
> >
> https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst#ipc-streaming-format
> > > >>
> > > >> In the Rust implementation each RecordBatch embeds the Schema so
> that
> > > >> each schema can have different metadata (like filename in this
> case).
> > I
> > > >> think this will have to be implemented Rust as a memory-only
> attribute
> > > >> which does not get persisted unless more significant changes to the
> > > >> protocol are made.
> > > >>
> > > >> Thanks
> > > >> Mike
> > > >>
> > > >> On Thu, Feb 25, 2021 at 11:14 AM Micah Kornfield <
> > emkornfi...@gmail.com
> > > >
> > > >> wrote:
> > > >>
> > > >>> The process would be to create a PR proposal to update to the
> custom
> > > >>> metadata specification [1] to reserve a new word and describe its
> > use.
> > > >>> Then send a [DISCUSS] email on this list.  Once there is consensus
> we
> > > can
> > > >>> formally vote and merge the change.
> > > >>>
> > > >>> [1]
> > > >>>
> > > >>>
> > >
> >
> https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst
> > > >>>
> > > >>> On Wed, Feb 24, 2021 at 3:47 PM Mike Seddon <seddo...@gmail.com>
> > > wrote:
> > > >>>
> > > >>> > Thanks for both of your comments.
> > > >>> >
> > > >>> > @Andrew Schema.metadata does look like a logical place to house
> the
> > > >>> > information so that would solve part of the problem. Do you have
> > any
> > > >>> > thoughts on whether we change the function signature:
> > > >>> >
> > > >>> > From: <Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> +
> > Send +
> > > >>> > Sync>;
> > > >>> > To:   <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata) ->
> > > >>> > Result<ColumnarValue> + Send + Sync>;
> > > >>> >
> > > >>> > @Micah It would be nice to have a reserved metadata key so this
> > could
> > > >>> be
> > > >>> > shared but I am not sure of the admin process for the Arrow
> project
> > > to
> > > >>> > agree something like that. Is there a forum?
> > > >>> >
> > > >>> > On Thu, Feb 25, 2021 at 8:58 AM Micah Kornfield <
> > > emkornfi...@gmail.com
> > > >>> >
> > > >>> > wrote:
> > > >>> >
> > > >>> > > At least C++ (and the IPC format) a schema can be shared across
> > the
> > > >>> many
> > > >>> > > RecordBatch's which might have different sources.
> > > >>> > >
> > > >>> > >  It might be useful to define a reserved metadata key (similar
> to
> > > >>> > > extension types) so that the data can be interpreted
> > consistently.
> > > >>> > >
> > > >>> > > On Wed, Feb 24, 2021 at 11:29 AM Andrew Lamb <
> > al...@influxdata.com
> > > >
> > > >>> > wrote:
> > > >>> > >
> > > >>> > > > I wonder if you could add the file_name as metadata on the
> > > >>> `Schema` of
> > > >>> > > the
> > > >>> > > > RecordBatch rather than the RecordBatch itself? Since every
> > > >>> RecordBatch
> > > >>> > > has
> > > >>> > > > a schema, I don't fully understand the need to add something
> > > >>> additional
> > > >>> > > to
> > > >>> > > > the RecordBatch
> > > >>> > > >
> > > >>> > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > >
> >
> https://docs.rs/arrow/3.0.0/arrow/datatypes/struct.Schema.html#method.new_with_metadata
> > > >>> > > >
> > > >>> > > > On Wed, Feb 24, 2021 at 1:20 AM Mike Seddon <
> > seddo...@gmail.com>
> > > >>> > wrote:
> > > >>> > > >
> > > >>> > > > > Hi,
> > > >>> > > > >
> > > >>> > > > > One of Apache Spark's very useful SQL functions is the
> > > >>> > > 'input_file_name'
> > > >>> > > > > SQL function which provides a simple API for identifying
> the
> > > >>> source
> > > >>> > of
> > > >>> > > a
> > > >>> > > > > row of data when sourced from a file-based source like
> > Parquet
> > > or
> > > >>> > CSV.
> > > >>> > > > This
> > > >>> > > > > is particularly useful for identifying which
> chunk/partition
> > > of a
> > > >>> > > Parquet
> > > >>> > > > > the row came from and is used heavily by the DeltaLake
> format
> > > to
> > > >>> > > > determine
> > > >>> > > > > which files are impacted for MERGE operations.
> > > >>> > > > >
> > > >>> > > > > I have built a functional proof-of-concept for DataFusion
> but
> > > it
> > > >>> > > requires
> > > >>> > > > > modifying the RecordBatch struct to include a 'metadata'
> > struct
> > > >>> > > > > (RecordBatchMetadata) to carry the source file name
> attached
> > to
> > > >>> each
> > > >>> > > > batch.
> > > >>> > > > >
> > > >>> > > > > It also requires changing the ScalarFunctionImplementation
> > > >>> signature
> > > >>> > to
> > > >>> > > > > support exposing the metadata (and therefore all the
> > > functions).
> > > >>> > > > >
> > > >>> > > > > From: <Arc<dyn Fn(&[ColumnarValue]) ->
> Result<ColumnarValue>
> > +
> > > >>> Send +
> > > >>> > > > > Sync>;
> > > >>> > > > > To:   <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata) ->
> > > >>> > > > > Result<ColumnarValue> + Send + Sync>;
> > > >>> > > > >
> > > >>> > > > > These changes have been made in a personal feature branch
> and
> > > are
> > > >>> > > > available
> > > >>> > > > > for review (still needs cleaning) but conceptually does
> > anyone
> > > >>> have a
> > > >>> > > > > problem with this API change or have a better proposal?
> > > >>> > > > >
> > > >>> > > > > Thanks
> > > >>> > > > > Mike
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > >
> >
>

Reply via email to