Hi Mike,

I've been thinking how you are considering adding metadata to the
RecordBatch.

The struct it is now defined as

pub struct RecordBatch {
>     schema: SchemaRef,
>     columns: Vec<Arc<Array>>,
> }


Are you suggesting something like this?

pub struct RecordBatch {
>     schema: SchemaRef,
>     columns: Vec<Arc<Array>>,

    metadata: RecodBatchMetadata

}


Because if that is the case, why not use the Schema metadata. The schema
metadata is a hashmap that can hold any information you require

for example, here im using the metadata to create a schema and then use
that schema with IPC and the RecordBatch

fn main() {
>     let mut schema_metadata: HashMap<String, String> = HashMap::new();

  *  schema_metadata.insert("file_name".to_string(),
> "my_file.parquet".to_string());*
>     let schema = Schema::new_with_metadata(
>         vec![
>             Field::new("index", DataType::Int32, false),
>             Field::new("word", DataType::Utf8, false),
>         ],
>         schema_metadata,
>     );
>     let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
>     let b = StringArray::from(vec!["one", "two", "three", "four", "five"]);
>     let batch =
>         RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a),
> Arc::new(b)]).unwrap();
>


    let stream = TcpStream::connect("127.0.0.1:8000").unwrap();
>     let mut writer = StreamWriter::try_new(stream, &schema).unwrap();
>     writer.write(&batch).unwrap();
>     writer.finish().unwrap();
> }


I dont know how the Spark implementation works, but I think it would be
possible to emulate it using this metadata. Don't you think so?

Fernando

On Thu, Feb 25, 2021 at 4:36 AM Mike Seddon <seddo...@gmail.com> wrote:

> Thanks Micah.
>
> It is actually Rust implementation that is the odd one out. Ideally adding
> a metadata KeyValue to the RecordBatch plus your suggested 'reserved' key
> would be the best option.
>
> On Thu, Feb 25, 2021 at 3:26 PM Micah Kornfield <emkornfi...@gmail.com>
> wrote:
>
> > Thanks for looking into it. I would guess it is likely possible "hoist"
> > metadata from a record batch schema object to the Message but understand
> if
> > it isn't something you want to pursue.
> >
> > On Wed, Feb 24, 2021 at 8:19 PM Mike Seddon <seddo...@gmail.com> wrote:
> >
> >> Hi Micah,
> >> Thank you for providing this information. I have reviewed the
> >> documentation you provided and have a few conclusions:
> >>
> >> 1. RecordBatch does not have the capability to attach user defined
> >> metadata (KeyValue attributes):
> >> https://github.com/apache/arrow/blob/master/format/Message.fbs#L83
> >> 2. Schema does have this capability but it would not work to pass
> >> per-batch input files as the design indicates that the Schema object
> would
> >> be passed once and then a series of interleaved DictionaryBatch or
> >> MessageBatch messages must meet the Schema:
> >>
> https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst#ipc-streaming-format
> >>
> >> In the Rust implementation each RecordBatch embeds the Schema so that
> >> each schema can have different metadata (like filename in this case). I
> >> think this will have to be implemented Rust as a memory-only attribute
> >> which does not get persisted unless more significant changes to the
> >> protocol are made.
> >>
> >> Thanks
> >> Mike
> >>
> >> On Thu, Feb 25, 2021 at 11:14 AM Micah Kornfield <emkornfi...@gmail.com
> >
> >> wrote:
> >>
> >>> The process would be to create a PR proposal to update to the custom
> >>> metadata specification [1] to reserve a new word and describe its use.
> >>> Then send a [DISCUSS] email on this list.  Once there is consensus we
> can
> >>> formally vote and merge the change.
> >>>
> >>> [1]
> >>>
> >>>
> https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst
> >>>
> >>> On Wed, Feb 24, 2021 at 3:47 PM Mike Seddon <seddo...@gmail.com>
> wrote:
> >>>
> >>> > Thanks for both of your comments.
> >>> >
> >>> > @Andrew Schema.metadata does look like a logical place to house the
> >>> > information so that would solve part of the problem. Do you have any
> >>> > thoughts on whether we change the function signature:
> >>> >
> >>> > From: <Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send +
> >>> > Sync>;
> >>> > To:   <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata) ->
> >>> > Result<ColumnarValue> + Send + Sync>;
> >>> >
> >>> > @Micah It would be nice to have a reserved metadata key so this could
> >>> be
> >>> > shared but I am not sure of the admin process for the Arrow project
> to
> >>> > agree something like that. Is there a forum?
> >>> >
> >>> > On Thu, Feb 25, 2021 at 8:58 AM Micah Kornfield <
> emkornfi...@gmail.com
> >>> >
> >>> > wrote:
> >>> >
> >>> > > At least C++ (and the IPC format) a schema can be shared across the
> >>> many
> >>> > > RecordBatch's which might have different sources.
> >>> > >
> >>> > >  It might be useful to define a reserved metadata key (similar to
> >>> > > extension types) so that the data can be interpreted consistently.
> >>> > >
> >>> > > On Wed, Feb 24, 2021 at 11:29 AM Andrew Lamb <al...@influxdata.com
> >
> >>> > wrote:
> >>> > >
> >>> > > > I wonder if you could add the file_name as metadata on the
> >>> `Schema` of
> >>> > > the
> >>> > > > RecordBatch rather than the RecordBatch itself? Since every
> >>> RecordBatch
> >>> > > has
> >>> > > > a schema, I don't fully understand the need to add something
> >>> additional
> >>> > > to
> >>> > > > the RecordBatch
> >>> > > >
> >>> > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> https://docs.rs/arrow/3.0.0/arrow/datatypes/struct.Schema.html#method.new_with_metadata
> >>> > > >
> >>> > > > On Wed, Feb 24, 2021 at 1:20 AM Mike Seddon <seddo...@gmail.com>
> >>> > wrote:
> >>> > > >
> >>> > > > > Hi,
> >>> > > > >
> >>> > > > > One of Apache Spark's very useful SQL functions is the
> >>> > > 'input_file_name'
> >>> > > > > SQL function which provides a simple API for identifying the
> >>> source
> >>> > of
> >>> > > a
> >>> > > > > row of data when sourced from a file-based source like Parquet
> or
> >>> > CSV.
> >>> > > > This
> >>> > > > > is particularly useful for identifying which chunk/partition
> of a
> >>> > > Parquet
> >>> > > > > the row came from and is used heavily by the DeltaLake format
> to
> >>> > > > determine
> >>> > > > > which files are impacted for MERGE operations.
> >>> > > > >
> >>> > > > > I have built a functional proof-of-concept for DataFusion but
> it
> >>> > > requires
> >>> > > > > modifying the RecordBatch struct to include a 'metadata' struct
> >>> > > > > (RecordBatchMetadata) to carry the source file name attached to
> >>> each
> >>> > > > batch.
> >>> > > > >
> >>> > > > > It also requires changing the ScalarFunctionImplementation
> >>> signature
> >>> > to
> >>> > > > > support exposing the metadata (and therefore all the
> functions).
> >>> > > > >
> >>> > > > > From: <Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> +
> >>> Send +
> >>> > > > > Sync>;
> >>> > > > > To:   <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata) ->
> >>> > > > > Result<ColumnarValue> + Send + Sync>;
> >>> > > > >
> >>> > > > > These changes have been made in a personal feature branch and
> are
> >>> > > > available
> >>> > > > > for review (still needs cleaning) but conceptually does anyone
> >>> have a
> >>> > > > > problem with this API change or have a better proposal?
> >>> > > > >
> >>> > > > > Thanks
> >>> > > > > Mike
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
>

Reply via email to