Hi Fernando,

After Andrew's reply I have moved the filename metadata into the Schema and
actually changed the ScalarFunctionImplementation signature to:  Arc<dyn
Fn(&[ColumnarValue], SchemaRef) -> Result<ColumnarValue> + Send + Sync>;

I have a functional (WIP) repo already:
https://github.com/seddonm1/arrow/compare/master...seddonm1:input-file

I need to add some more tests (mainly ensure multipart parquet works as
expected) but I wanted to gather feedback on the proposal before cleaning
up for PR.

Mike

On Thu, Feb 25, 2021 at 8:30 PM Fernando Herrera <
fernando.j.herr...@gmail.com> wrote:

> Hi Mike,
>
> I've been thinking how you are considering adding metadata to the
> RecordBatch.
>
> The struct it is now defined as
>
> pub struct RecordBatch {
> >     schema: SchemaRef,
> >     columns: Vec<Arc<Array>>,
> > }
>
>
> Are you suggesting something like this?
>
> pub struct RecordBatch {
> >     schema: SchemaRef,
> >     columns: Vec<Arc<Array>>,
>
>     metadata: RecodBatchMetadata
>
> }
>
>
> Because if that is the case, why not use the Schema metadata. The schema
> metadata is a hashmap that can hold any information you require
>
> for example, here im using the metadata to create a schema and then use
> that schema with IPC and the RecordBatch
>
> fn main() {
> >     let mut schema_metadata: HashMap<String, String> = HashMap::new();
>
>   *  schema_metadata.insert("file_name".to_string(),
> > "my_file.parquet".to_string());*
> >     let schema = Schema::new_with_metadata(
> >         vec![
> >             Field::new("index", DataType::Int32, false),
> >             Field::new("word", DataType::Utf8, false),
> >         ],
> >         schema_metadata,
> >     );
> >     let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
> >     let b = StringArray::from(vec!["one", "two", "three", "four",
> "five"]);
> >     let batch =
> >         RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a),
> > Arc::new(b)]).unwrap();
> >
>
>
>     let stream = TcpStream::connect("127.0.0.1:8000").unwrap();
> >     let mut writer = StreamWriter::try_new(stream, &schema).unwrap();
> >     writer.write(&batch).unwrap();
> >     writer.finish().unwrap();
> > }
>
>
> I dont know how the Spark implementation works, but I think it would be
> possible to emulate it using this metadata. Don't you think so?
>
> Fernando
>
> On Thu, Feb 25, 2021 at 4:36 AM Mike Seddon <seddo...@gmail.com> wrote:
>
> > Thanks Micah.
> >
> > It is actually Rust implementation that is the odd one out. Ideally
> adding
> > a metadata KeyValue to the RecordBatch plus your suggested 'reserved' key
> > would be the best option.
> >
> > On Thu, Feb 25, 2021 at 3:26 PM Micah Kornfield <emkornfi...@gmail.com>
> > wrote:
> >
> > > Thanks for looking into it. I would guess it is likely possible "hoist"
> > > metadata from a record batch schema object to the Message but
> understand
> > if
> > > it isn't something you want to pursue.
> > >
> > > On Wed, Feb 24, 2021 at 8:19 PM Mike Seddon <seddo...@gmail.com>
> wrote:
> > >
> > >> Hi Micah,
> > >> Thank you for providing this information. I have reviewed the
> > >> documentation you provided and have a few conclusions:
> > >>
> > >> 1. RecordBatch does not have the capability to attach user defined
> > >> metadata (KeyValue attributes):
> > >> https://github.com/apache/arrow/blob/master/format/Message.fbs#L83
> > >> 2. Schema does have this capability but it would not work to pass
> > >> per-batch input files as the design indicates that the Schema object
> > would
> > >> be passed once and then a series of interleaved DictionaryBatch or
> > >> MessageBatch messages must meet the Schema:
> > >>
> >
> https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst#ipc-streaming-format
> > >>
> > >> In the Rust implementation each RecordBatch embeds the Schema so that
> > >> each schema can have different metadata (like filename in this case).
> I
> > >> think this will have to be implemented Rust as a memory-only attribute
> > >> which does not get persisted unless more significant changes to the
> > >> protocol are made.
> > >>
> > >> Thanks
> > >> Mike
> > >>
> > >> On Thu, Feb 25, 2021 at 11:14 AM Micah Kornfield <
> emkornfi...@gmail.com
> > >
> > >> wrote:
> > >>
> > >>> The process would be to create a PR proposal to update to the custom
> > >>> metadata specification [1] to reserve a new word and describe its
> use.
> > >>> Then send a [DISCUSS] email on this list.  Once there is consensus we
> > can
> > >>> formally vote and merge the change.
> > >>>
> > >>> [1]
> > >>>
> > >>>
> >
> https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst
> > >>>
> > >>> On Wed, Feb 24, 2021 at 3:47 PM Mike Seddon <seddo...@gmail.com>
> > wrote:
> > >>>
> > >>> > Thanks for both of your comments.
> > >>> >
> > >>> > @Andrew Schema.metadata does look like a logical place to house the
> > >>> > information so that would solve part of the problem. Do you have
> any
> > >>> > thoughts on whether we change the function signature:
> > >>> >
> > >>> > From: <Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> +
> Send +
> > >>> > Sync>;
> > >>> > To:   <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata) ->
> > >>> > Result<ColumnarValue> + Send + Sync>;
> > >>> >
> > >>> > @Micah It would be nice to have a reserved metadata key so this
> could
> > >>> be
> > >>> > shared but I am not sure of the admin process for the Arrow project
> > to
> > >>> > agree something like that. Is there a forum?
> > >>> >
> > >>> > On Thu, Feb 25, 2021 at 8:58 AM Micah Kornfield <
> > emkornfi...@gmail.com
> > >>> >
> > >>> > wrote:
> > >>> >
> > >>> > > At least C++ (and the IPC format) a schema can be shared across
> the
> > >>> many
> > >>> > > RecordBatch's which might have different sources.
> > >>> > >
> > >>> > >  It might be useful to define a reserved metadata key (similar to
> > >>> > > extension types) so that the data can be interpreted
> consistently.
> > >>> > >
> > >>> > > On Wed, Feb 24, 2021 at 11:29 AM Andrew Lamb <
> al...@influxdata.com
> > >
> > >>> > wrote:
> > >>> > >
> > >>> > > > I wonder if you could add the file_name as metadata on the
> > >>> `Schema` of
> > >>> > > the
> > >>> > > > RecordBatch rather than the RecordBatch itself? Since every
> > >>> RecordBatch
> > >>> > > has
> > >>> > > > a schema, I don't fully understand the need to add something
> > >>> additional
> > >>> > > to
> > >>> > > > the RecordBatch
> > >>> > > >
> > >>> > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> >
> https://docs.rs/arrow/3.0.0/arrow/datatypes/struct.Schema.html#method.new_with_metadata
> > >>> > > >
> > >>> > > > On Wed, Feb 24, 2021 at 1:20 AM Mike Seddon <
> seddo...@gmail.com>
> > >>> > wrote:
> > >>> > > >
> > >>> > > > > Hi,
> > >>> > > > >
> > >>> > > > > One of Apache Spark's very useful SQL functions is the
> > >>> > > 'input_file_name'
> > >>> > > > > SQL function which provides a simple API for identifying the
> > >>> source
> > >>> > of
> > >>> > > a
> > >>> > > > > row of data when sourced from a file-based source like
> Parquet
> > or
> > >>> > CSV.
> > >>> > > > This
> > >>> > > > > is particularly useful for identifying which chunk/partition
> > of a
> > >>> > > Parquet
> > >>> > > > > the row came from and is used heavily by the DeltaLake format
> > to
> > >>> > > > determine
> > >>> > > > > which files are impacted for MERGE operations.
> > >>> > > > >
> > >>> > > > > I have built a functional proof-of-concept for DataFusion but
> > it
> > >>> > > requires
> > >>> > > > > modifying the RecordBatch struct to include a 'metadata'
> struct
> > >>> > > > > (RecordBatchMetadata) to carry the source file name attached
> to
> > >>> each
> > >>> > > > batch.
> > >>> > > > >
> > >>> > > > > It also requires changing the ScalarFunctionImplementation
> > >>> signature
> > >>> > to
> > >>> > > > > support exposing the metadata (and therefore all the
> > functions).
> > >>> > > > >
> > >>> > > > > From: <Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue>
> +
> > >>> Send +
> > >>> > > > > Sync>;
> > >>> > > > > To:   <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata) ->
> > >>> > > > > Result<ColumnarValue> + Send + Sync>;
> > >>> > > > >
> > >>> > > > > These changes have been made in a personal feature branch and
> > are
> > >>> > > > available
> > >>> > > > > for review (still needs cleaning) but conceptually does
> anyone
> > >>> have a
> > >>> > > > > problem with this API change or have a better proposal?
> > >>> > > > >
> > >>> > > > > Thanks
> > >>> > > > > Mike
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> >
>

Reply via email to