On Wed, Mar 21, 2018 at 1:57 PM OrielResearch Eila Arich-Landkof < [email protected]> wrote:
> yes. You were right, I had to put back the pc. > I am working on the partition function and try to debug it without running > a pipeline on dataflow (dataflow execution takes at list 8 minutes for any > size of data), based on the link: > https://medium.com/google-cloud/quickly-experiment-with-dataflow-3d5a0da8d8e9 > > *my code & questions:* > # Is it possible to use string as partition name? if not, what would be > the simplest solution? > # generating another PCollection with the samples (index) and merge with > the data table? if yes, > # how would I extract that information when I am writing to text file > def partition_fn(element, num_partitions): > return(element['sample']) > I believe partition function has to return an integer that is the partition number for a given element. Please see following code snippet for an example usage. https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L1155 Also, that example shows how you can write resulting PCollections to text files. > > # debug input, no pipeline > > *d = [{u'sample': u'GSM2313641', u'SNRPCP14': 0},{u'sample': > u'GSM2316666', u'SNRPCP14': 0},{u'sample': u'GSM2312355', u'SNRPCP14': 0}] > | beam.Flatten()* > > *d* > # output: > > [(u'sample', u'GSM2312355'), > (u'SNRPCP14', 0), > (u'sample', u'GSM2313641'), > (u'SNRPCP14', 0), > (u'sample', u'GSM2316666'), > (u'SNRPCP14', 0)] > > > > *d | beam.Partition(partition_fn,3)* > #output, error: > > TypeError: tuple indices must be integers, not str [while running > 'Partition(CallableWrapperPartitionFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)'] > > Please see above. > > *#writing to dynamic output - how do i extract the partition name that the > element is assigned to?* > *for i, partition in enumerate(partitions):* > * # The partition path is using the partition str name* > * pathi= str('gs://bucket/output/'+x["sample"]+'/')* > * partition | label >> beam.io.WriteToText(pathi)* > > > Please also let me know if there is a better way to debug the code before > running on dataflow runner. > You can test code using DirectRunner before using DataflowRunner. Both runners should produce the same result. > > Many thanks, > Eila > > > > On Wed, Mar 21, 2018 at 12:43 PM, Chamikara Jayalath <[email protected] > > wrote: > >> On Wed, Mar 21, 2018 at 7:53 AM OrielResearch Eila Arich-Landkof < >> [email protected]> wrote: >> >>> Hi Cham, >>> >>> *all_data = pcollections | beam.Flatten()* >>> >>> fires an error: >>> >>> TypeError: 'Read' object is not iterable >>> >>> >>> pcollections is the following list: >>> >>> [<Read(PTransform) label=[Read] at 0x7f9fa93d7410>, >>> <Read(PTransform) label=[Read] at 0x7f9fa988a350>, >>> <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>, >>> <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>] >>> >>> >>> >> Did you omit "p | " in "p | beam.io.Read" by any chance ? Not sure how >> you ended up with a list of Read PTransforms otherwise. >> >> Also, follow everything with a "p.run.wait_until_finish()" for pipeline >> to execute. >> >> Can you paste the code that you are running ? >> >> >>> Based on the following, i converted the list to tuples >>> (tuple(*pcollections)) with the same error for tuple.* >>> >>> >>> # Flatten takes a tuple of PCollection objects.# Returns a single >>> PCollection that contains all of the elements in the PCollection objects in >>> that tuple.merged = ( >>> (pcoll1, pcoll2, pcoll3) >>> # A list of tuples can be "piped" directly into a Flatten transform. >>> | beam.Flatten()) >>> >>> >>> Any advice? >>> >>> Many thanks, >>> Eila >>> >>> >>> On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof < >>> [email protected]> wrote: >>> >>>> very helpful!!! i will keep you posted if I have any issue / question >>>> Best, >>>> Eila >>>> >>>> >>>> On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath < >>>> [email protected]> wrote: >>>> >>>>> >>>>> >>>>> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi Cham, >>>>>> >>>>>> Please see inline. If possible, code / pseudo code will help a lot. >>>>>> Thanks, >>>>>> Eila >>>>>> >>>>>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi Eila, >>>>>>> >>>>>>> Please find my comments inline. >>>>>>> >>>>>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hello all, >>>>>>>> >>>>>>>> It was nice to meet you last week!!! >>>>>>>> >>>>>>>> >>>>>>> It was nice to meet you as well :) >>>>>>> >>>>>>> >>>>>>>> I am writing genomic pCollection that is created from bigQuery to a >>>>>>>> folder. Following is the code with output so you can run it with any >>>>>>>> small >>>>>>>> BQ table and let me know what your thoughts are: >>>>>>>> >>>>>>>> This init is only for debugging. In production I will use the >>>>>> pipeline syntax >>>>>> >>>>>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index': >>>>>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14': >>>>>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}] >>>>>>>> >>>>>>>> rows[1].keys() >>>>>>>> # output: [u'index', u'SNRPCP14'] >>>>>>>> >>>>>>>> # you can change `archs4.results_20180308_ to any other table name >>>>>>>> with index column >>>>>>>> queries2 = rows | beam.Map(lambda x: >>>>>>>> (beam.io.Read(beam.io.BigQuerySource(project='orielresearch-188115', >>>>>>>> use_standard_sql=False, query=str('SELECT * FROM >>>>>>>> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))), >>>>>>>> >>>>>>>> str('gs://archs4/output/'+x["index"]+'/'))) >>>>>>>> >>>>>>> >>>>>>> I don't think above code will work (not portable across runners at >>>>>>> least). BigQuerySource (along with Read transform) have to be applied >>>>>>> to a >>>>>>> Pipeline object. So probably change this to a for loop that creates a >>>>>>> set >>>>>>> of read transforms and use Flatten to create a single PCollection. >>>>>>> >>>>>> For debug, I am running on the local datalab runner. For the >>>>>> production, I will be running only dataflow runner. I think that I was >>>>>> able >>>>>> to query the tables that way, I will double check it. The indexes could >>>>>> go >>>>>> to millions - my concern is that I will not be able to leverage on Beam >>>>>> distribution capability when I use the the loop option. Any thoughts on >>>>>> that? >>>>>> >>>>> >>>>> You mean you'll have millions of queries. That will not be scalable. >>>>> My suggestion was to loop on queries. Can you reduce to one or a small >>>>> number of queries and perform further processing in Beam ? >>>>> >>>>> >>>>>> >>>>>>> >>>>>>>> >>>>>>>> queries2 >>>>>>>> # output: a list of pCollection and the path to write the >>>>>>>> pCollection data to >>>>>>>> >>>>>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>, >>>>>>>> 'gs://archs4/output/GSM2313641/'), >>>>>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fb950>, >>>>>>>> 'gs://archs4/output/GSM2316666/'), >>>>>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>, >>>>>>>> 'gs://archs4/output/GSM2312355/'), >>>>>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>, >>>>>>>> 'gs://archs4/output/GSM2312372/')] >>>>>>>> >>>>>>>> >>>>>>> What you got here is a PCollection of PTransform objects which is >>>>>>> not useful. >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> *# this is my challenge* >>>>>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND >>>>>>>> COLUMN") >>>>>>>> >>>>>>>> >>>>>>> Once you update above code you will get a proper PCollection of >>>>>>> elements read from BigQuery. You can transform and write this (to files, >>>>>>> BQ, or any other sink) as needed. >>>>>>> >>>>>> >>>>>> it is a list of tupples with PCollection and the path to write to. >>>>>> the path is not unique and I might have more than one PCollection written >>>>>> to the same destination. How do I pass the path from the tupple list as a >>>>>> parameter to the text file name? Could you please add the code that you >>>>>> were thinking about? >>>>>> >>>>> >>>>> Python SDK does not support writing to different files based on the >>>>> values of data (dynamic writes). So you'll have to either partition data >>>>> into separate PCollections or write all data into the same location. >>>>> >>>>> Here's *pseudocode* (untested) for reading from few queries, >>>>> partitioning into several PCollections, and writing to different >>>>> destinations. >>>>> >>>>> *queries = ['select * from A', 'select * from B',....]* >>>>> >>>>> *p = Pipeline()* >>>>> *pcollections = []* >>>>> *for query in queries:* >>>>> * pc = p | beam.io.Read(beam.io >>>>> <http://beam.io/>.BigQuerySource(query=query))* >>>>> * pcollections.append(pc)* >>>>> >>>>> *all_data = pcollections | beam.Flatten()* >>>>> *partitions = all_data | beam.Partition(my_partition_fn)* >>>>> *for i, partition in enumerate(partitions):* >>>>> * partition | beam.io.WriteToText(<unique path for partition i>)* >>>>> Hope this helps. >>>>> >>>>> Thanks, >>>>> Cham >>>>> >>>>> >>>>> >>>>>> Please see programming guide on how to write to text files (section >>>>>>> 5.3 and click Python tab): >>>>>>> https://beam.apache.org/documentation/programming-guide/ >>>>>>> >>>>>>> Thanks, >>>>>>> Cham >>>>>>> >>>>>>> >>>>>>>> Do you have any idea how to sink the data to a text file? I have >>>>>>>> tried few other options and was stuck at the write transform >>>>>>>> >>>>>>>> Any advice is very appreciated. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Eila >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Eila >>>>>>>> www.orielresearch.org >>>>>>>> https://www.meetup.com/Deep-Learning-In-Production/ >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Eila >>>>>> www.orielresearch.org >>>>>> https://www.meetup.com/Deep-Learning-In-Production/ >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Eila >>>> www.orielresearch.org >>>> https://www.meetup.com/Deep-Learning-In-Production/ >>>> >>> >>> >>> >>> -- >>> Eila >>> www.orielresearch.org >>> https://www.meetup.com/Deep-Learning-In-Production/ >>> >> > > > -- > Eila > www.orielresearch.org > https://www.meetup.com/Deep-Learning-In-Production/ >
