On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof < [email protected]> wrote:
> Hi Cham, > > Please see inline. If possible, code / pseudo code will help a lot. > Thanks, > Eila > > On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <[email protected]> > wrote: > >> Hi Eila, >> >> Please find my comments inline. >> >> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof < >> [email protected]> wrote: >> >>> Hello all, >>> >>> It was nice to meet you last week!!! >>> >>> >> It was nice to meet you as well :) >> >> >>> I am writing genomic pCollection that is created from bigQuery to a >>> folder. Following is the code with output so you can run it with any small >>> BQ table and let me know what your thoughts are: >>> >>> This init is only for debugging. In production I will use the pipeline > syntax > >> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index': >>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14': >>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}] >>> >>> rows[1].keys() >>> # output: [u'index', u'SNRPCP14'] >>> >>> # you can change `archs4.results_20180308_ to any other table name with >>> index column >>> queries2 = rows | beam.Map(lambda x: >>> (beam.io.Read(beam.io.BigQuerySource(project='orielresearch-188115', >>> use_standard_sql=False, query=str('SELECT * FROM >>> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))), >>> >>> str('gs://archs4/output/'+x["index"]+'/'))) >>> >> >> I don't think above code will work (not portable across runners at >> least). BigQuerySource (along with Read transform) have to be applied to a >> Pipeline object. So probably change this to a for loop that creates a set >> of read transforms and use Flatten to create a single PCollection. >> > For debug, I am running on the local datalab runner. For the production, I > will be running only dataflow runner. I think that I was able to query the > tables that way, I will double check it. The indexes could go to millions - > my concern is that I will not be able to leverage on Beam distribution > capability when I use the the loop option. Any thoughts on that? > You mean you'll have millions of queries. That will not be scalable. My suggestion was to loop on queries. Can you reduce to one or a small number of queries and perform further processing in Beam ? > >> >>> >>> queries2 >>> # output: a list of pCollection and the path to write the pCollection >>> data to >>> >>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>, >>> 'gs://archs4/output/GSM2313641/'), >>> (<Read(PTransform) label=[Read] at 0x7fa6990fb950>, >>> 'gs://archs4/output/GSM2316666/'), >>> (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>, >>> 'gs://archs4/output/GSM2312355/'), >>> (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>, >>> 'gs://archs4/output/GSM2312372/')] >>> >>> >> What you got here is a PCollection of PTransform objects which is not >> useful. >> >> >>> >>> *# this is my challenge* >>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND >>> COLUMN") >>> >>> >> Once you update above code you will get a proper PCollection of elements >> read from BigQuery. You can transform and write this (to files, BQ, or any >> other sink) as needed. >> > > it is a list of tupples with PCollection and the path to write to. the > path is not unique and I might have more than one PCollection written to > the same destination. How do I pass the path from the tupple list as a > parameter to the text file name? Could you please add the code that you > were thinking about? > Python SDK does not support writing to different files based on the values of data (dynamic writes). So you'll have to either partition data into separate PCollections or write all data into the same location. Here's *pseudocode* (untested) for reading from few queries, partitioning into several PCollections, and writing to different destinations. *queries = ['select * from A', 'select * from B',....]* *p = Pipeline()* *pcollections = []* *for query in queries:* * pc = p | beam.io.Read(beam.io <http://beam.io/>.BigQuerySource(query=query))* * pcollections.append(pc)* *all_data = pcollections | beam.Flatten()* *partitions = all_data | beam.Partition(my_partition_fn)* *for i, partition in enumerate(partitions):* * partition | beam.io.WriteToText(<unique path for partition i>)* Hope this helps. Thanks, Cham > Please see programming guide on how to write to text files (section 5.3 >> and click Python tab): >> https://beam.apache.org/documentation/programming-guide/ >> >> Thanks, >> Cham >> >> >>> Do you have any idea how to sink the data to a text file? I have tried >>> few other options and was stuck at the write transform >>> >>> Any advice is very appreciated. >>> >>> Thanks, >>> Eila >>> >>> >>> >>> -- >>> Eila >>> www.orielresearch.org >>> https://www.meetup.com/Deep-Learning-In-Production/ >>> >> > > > -- > Eila > www.orielresearch.org > https://www.meetup.com/Deep-Learning-In-Production/ >
