Talking about KafkaIO, it’s already possible to have this since "apply(KafkaIO.<K, V>read())" returns "PCollection<KafkaRecord<K, V>>” where KafkaRecord contains message metadata (topic, partition, etc). Though, it works _only_ if “withoutMetadata()” was not used before - in this case it will return simple KV<K, V>.
In the same time, I agree that it would be useful to have some general way to obtain meta information of records across all Beam IOs. > On 7 Feb 2019, at 18:25, Yi Pan <nickpa...@gmail.com> wrote: > > Shouldn't this apply to more generic scenario for any BeamIO? For example, I > am using KafkaIO and wanted to get the topic and partition from which the > message was received. Some IOContext associated with each data unit from > BeamIO may be useful here? > > -Yi > > On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles <k...@apache.org > <mailto:k...@apache.org>> wrote: > This comes up a lot, wanting file names alongside the data that came from the > file. It is a historical quirk that none of our connectors used to have the > file names. What is the change needed for FileIO + parse Avro to be really > easy to use? > > Kenn > > On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas <jklu...@mozilla.com > <mailto:jklu...@mozilla.com>> wrote: > I haven't needed to do this with Beam before, but I've definitely had similar > needs in the past. Spark, for example, provides an input_file_name function > that can be applied to a dataframe to add the input file as an additional > column. It's not clear to me how that's implemented, though. > > Perhaps others have suggestions, but I'm not aware of a way to do this > conveniently in Beam today. To my knowledge, today you would have to use > FileIO.match() and FileIO.readMatches() to get a collection of ReadableFile. > You'd then have to FlatMapElements to pull out the metadata and the bytes of > the file, and you'd be responsible for parsing those bytes into avro records. > You'd be able to output something like a KV<String, T> that groups the file > name together with the parsed avro record. > > Seems like something worth providing better support for in Beam itself if > this indeed doesn't already exist. > > On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel <ch...@behalf.com > <mailto:ch...@behalf.com>> wrote: > Hi, > I am working on a pipeline that listens to a topic on pubsub to get > files that have changes in the storage. Then i read avro files, and > would like to write them to bigquery based on the file name (to > different tables). > My problem is that the transformer that reads the avro does not give > me back the files name (like a tuple or something like that). I seem > to have this pattern come back a lot. > Can you think of any solutions? > > Chaim > > -- > > > Loans are funded by > FinWise Bank, a Utah-chartered bank located in Sandy, > Utah, member FDIC, Equal > Opportunity Lender. Merchant Cash Advances are > made by Behalf. For more > information on ECOA, click here > <https://www.behalf.com/legal/ecoa/ <https://www.behalf.com/legal/ecoa/>>. > For important information about > opening a new > account, review Patriot Act procedures here > <https://www.behalf.com/legal/patriot/ > <https://www.behalf.com/legal/patriot/>>. > Visit Legal > <https://www.behalf.com/legal/ <https://www.behalf.com/legal/>> to > review our comprehensive program terms, > conditions, and disclosures.