Talking about KafkaIO, it’s already possible to have this since 
"apply(KafkaIO.<K, V>read())" returns "PCollection<KafkaRecord<K, V>>” where  
KafkaRecord contains message metadata (topic, partition, etc).
Though, it works _only_ if “withoutMetadata()”  was not used before - in this 
case it will return simple KV<K, V>.

In the same time, I agree that it would be useful to have some general way to 
obtain meta information of records across all Beam IOs.

> On 7 Feb 2019, at 18:25, Yi Pan <nickpa...@gmail.com> wrote:
> 
> Shouldn't this apply to more generic scenario for any BeamIO? For example, I 
> am using KafkaIO and wanted to get the topic and partition from which the 
> message was received. Some IOContext associated with each data unit from 
> BeamIO may be useful here?
> 
> -Yi
> 
> On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles <k...@apache.org 
> <mailto:k...@apache.org>> wrote:
> This comes up a lot, wanting file names alongside the data that came from the 
> file. It is a historical quirk that none of our connectors used to have the 
> file names. What is the change needed for FileIO + parse Avro to be really 
> easy to use?
> 
> Kenn
> 
> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas <jklu...@mozilla.com 
> <mailto:jklu...@mozilla.com>> wrote:
> I haven't needed to do this with Beam before, but I've definitely had similar 
> needs in the past. Spark, for example, provides an input_file_name function 
> that can be applied to a dataframe to add the input file as an additional 
> column. It's not clear to me how that's implemented, though.
> 
> Perhaps others have suggestions, but I'm not aware of a way to do this 
> conveniently in Beam today. To my knowledge, today you would have to use 
> FileIO.match() and FileIO.readMatches() to get a collection of ReadableFile. 
> You'd then have to FlatMapElements to pull out the metadata and the bytes of 
> the file, and you'd be responsible for parsing those bytes into avro records. 
> You'd  be able to output something like a KV<String, T> that groups the file 
> name together with the parsed avro record.
> 
> Seems like something worth providing better support for in Beam itself if 
> this indeed doesn't already exist.
> 
> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel <ch...@behalf.com 
> <mailto:ch...@behalf.com>> wrote:
> Hi,
>   I am working on a pipeline that listens to a topic on pubsub to get
> files that have changes in the storage. Then i read avro files, and
> would like to write them to bigquery based on the file name (to
> different tables).
>   My problem is that the transformer that reads the avro does not give
> me back the files name (like a tuple or something like that). I seem
> to have this pattern come back a lot.
> Can you think of any solutions?
> 
> Chaim
> 
> -- 
> 
> 
> Loans are funded by
> FinWise Bank, a Utah-chartered bank located in Sandy, 
> Utah, member FDIC, Equal
> Opportunity Lender. Merchant Cash Advances are 
> made by Behalf. For more
> information on ECOA, click here 
> <https://www.behalf.com/legal/ecoa/ <https://www.behalf.com/legal/ecoa/>>. 
> For important information about 
> opening a new
> account, review Patriot Act procedures here 
> <https://www.behalf.com/legal/patriot/ 
> <https://www.behalf.com/legal/patriot/>>.
> Visit Legal 
> <https://www.behalf.com/legal/ <https://www.behalf.com/legal/>> to
> review our comprehensive program terms, 
> conditions, and disclosures. 

Reply via email to