Thanks. Everything is working! Eila On Thu, Mar 22, 2018 at 3:38 AM, Chamikara Jayalath <[email protected]> wrote:
> > > On Wed, Mar 21, 2018 at 1:57 PM OrielResearch Eila Arich-Landkof < > [email protected]> wrote: > >> yes. You were right, I had to put back the pc. >> I am working on the partition function and try to debug it without >> running a pipeline on dataflow (dataflow execution takes at list 8 minutes >> for any size of data), based on the link: https://medium.com/google- >> cloud/quickly-experiment-with-dataflow-3d5a0da8d8e9 >> >> *my code & questions:* >> # Is it possible to use string as partition name? if not, what would be >> the simplest solution? >> # generating another PCollection with the samples (index) and merge with >> the data table? if yes, >> # how would I extract that information when I am writing to text file >> def partition_fn(element, num_partitions): >> return(element['sample']) >> > > I believe partition function has to return an integer that is the > partition number for a given element. Please see following code snippet for > an example usage. > https://github.com/apache/beam/blob/master/sdks/python/ > apache_beam/examples/snippets/snippets.py#L1155 > > Also, that example shows how you can write resulting PCollections to text > files. > > >> >> # debug input, no pipeline >> >> *d = [{u'sample': u'GSM2313641', u'SNRPCP14': 0},{u'sample': >> u'GSM2316666', u'SNRPCP14': 0},{u'sample': u'GSM2312355', u'SNRPCP14': 0}] >> | beam.Flatten()* >> >> *d* >> # output: >> >> [(u'sample', u'GSM2312355'), >> (u'SNRPCP14', 0), >> (u'sample', u'GSM2313641'), >> (u'SNRPCP14', 0), >> (u'sample', u'GSM2316666'), >> (u'SNRPCP14', 0)] >> >> >> >> *d | beam.Partition(partition_fn,3)* >> #output, error: >> >> TypeError: tuple indices must be integers, not str [while running >> 'Partition(CallableWrapperPartitionFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)'] >> >> > Please see above. > > >> >> *#writing to dynamic output - how do i extract the partition name that >> the element is assigned to?* >> *for i, partition in enumerate(partitions):* >> * # The partition path is using the partition str name* >> * pathi= str('gs://bucket/output/'+x["sample"]+'/')* >> * partition | label >> beam.io.WriteToText(pathi)* >> >> >> Please also let me know if there is a better way to debug the code before >> running on dataflow runner. >> > > You can test code using DirectRunner before using DataflowRunner. Both > runners should produce the same result. > > >> >> Many thanks, >> Eila >> >> >> >> On Wed, Mar 21, 2018 at 12:43 PM, Chamikara Jayalath < >> [email protected]> wrote: >> >>> On Wed, Mar 21, 2018 at 7:53 AM OrielResearch Eila Arich-Landkof < >>> [email protected]> wrote: >>> >>>> Hi Cham, >>>> >>>> *all_data = pcollections | beam.Flatten()* >>>> >>>> fires an error: >>>> >>>> TypeError: 'Read' object is not iterable >>>> >>>> >>>> pcollections is the following list: >>>> >>>> [<Read(PTransform) label=[Read] at 0x7f9fa93d7410>, >>>> <Read(PTransform) label=[Read] at 0x7f9fa988a350>, >>>> <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>, >>>> <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>] >>>> >>>> >>>> >>> Did you omit "p | " in "p | beam.io.Read" by any chance ? Not sure how >>> you ended up with a list of Read PTransforms otherwise. >>> >>> Also, follow everything with a "p.run.wait_until_finish()" for pipeline >>> to execute. >>> >>> Can you paste the code that you are running ? >>> >>> >>>> Based on the following, i converted the list to tuples >>>> (tuple(*pcollections)) with the same error for tuple.* >>>> >>>> >>>> # Flatten takes a tuple of PCollection objects.# Returns a single >>>> PCollection that contains all of the elements in the PCollection objects >>>> in that tuple.merged = ( >>>> (pcoll1, pcoll2, pcoll3) >>>> # A list of tuples can be "piped" directly into a Flatten transform. >>>> | beam.Flatten()) >>>> >>>> >>>> Any advice? >>>> >>>> Many thanks, >>>> Eila >>>> >>>> >>>> On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof < >>>> [email protected]> wrote: >>>> >>>>> very helpful!!! i will keep you posted if I have any issue / question >>>>> Best, >>>>> Eila >>>>> >>>>> >>>>> On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi Cham, >>>>>>> >>>>>>> Please see inline. If possible, code / pseudo code will help a lot. >>>>>>> Thanks, >>>>>>> Eila >>>>>>> >>>>>>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi Eila, >>>>>>>> >>>>>>>> Please find my comments inline. >>>>>>>> >>>>>>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hello all, >>>>>>>>> >>>>>>>>> It was nice to meet you last week!!! >>>>>>>>> >>>>>>>>> >>>>>>>> It was nice to meet you as well :) >>>>>>>> >>>>>>>> >>>>>>>>> I am writing genomic pCollection that is created from bigQuery to >>>>>>>>> a folder. Following is the code with output so you can run it with any >>>>>>>>> small BQ table and let me know what your thoughts are: >>>>>>>>> >>>>>>>>> This init is only for debugging. In production I will use the >>>>>>> pipeline syntax >>>>>>> >>>>>>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index': >>>>>>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14': >>>>>>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}] >>>>>>>>> >>>>>>>>> rows[1].keys() >>>>>>>>> # output: [u'index', u'SNRPCP14'] >>>>>>>>> >>>>>>>>> # you can change `archs4.results_20180308_ to any other table >>>>>>>>> name with index column >>>>>>>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io. >>>>>>>>> BigQuerySource(project='orielresearch-188115', >>>>>>>>> use_standard_sql=False, query=str('SELECT * FROM >>>>>>>>> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))), >>>>>>>>> str('gs://archs4/output/'+x[" >>>>>>>>> index"]+'/'))) >>>>>>>>> >>>>>>>> >>>>>>>> I don't think above code will work (not portable across runners at >>>>>>>> least). BigQuerySource (along with Read transform) have to be applied >>>>>>>> to a >>>>>>>> Pipeline object. So probably change this to a for loop that creates a >>>>>>>> set >>>>>>>> of read transforms and use Flatten to create a single PCollection. >>>>>>>> >>>>>>> For debug, I am running on the local datalab runner. For the >>>>>>> production, I will be running only dataflow runner. I think that I was >>>>>>> able >>>>>>> to query the tables that way, I will double check it. The indexes could >>>>>>> go >>>>>>> to millions - my concern is that I will not be able to leverage on Beam >>>>>>> distribution capability when I use the the loop option. Any thoughts on >>>>>>> that? >>>>>>> >>>>>> >>>>>> You mean you'll have millions of queries. That will not be scalable. >>>>>> My suggestion was to loop on queries. Can you reduce to one or a small >>>>>> number of queries and perform further processing in Beam ? >>>>>> >>>>>> >>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> queries2 >>>>>>>>> # output: a list of pCollection and the path to write the >>>>>>>>> pCollection data to >>>>>>>>> >>>>>>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>, >>>>>>>>> 'gs://archs4/output/GSM2313641/'), >>>>>>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fb950>, >>>>>>>>> 'gs://archs4/output/GSM2316666/'), >>>>>>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>, >>>>>>>>> 'gs://archs4/output/GSM2312355/'), >>>>>>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>, >>>>>>>>> 'gs://archs4/output/GSM2312372/')] >>>>>>>>> >>>>>>>>> >>>>>>>> What you got here is a PCollection of PTransform objects which is >>>>>>>> not useful. >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> *# this is my challenge* >>>>>>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND >>>>>>>>> COLUMN") >>>>>>>>> >>>>>>>>> >>>>>>>> Once you update above code you will get a proper PCollection of >>>>>>>> elements read from BigQuery. You can transform and write this (to >>>>>>>> files, >>>>>>>> BQ, or any other sink) as needed. >>>>>>>> >>>>>>> >>>>>>> it is a list of tupples with PCollection and the path to write to. >>>>>>> the path is not unique and I might have more than one PCollection >>>>>>> written >>>>>>> to the same destination. How do I pass the path from the tupple list as >>>>>>> a >>>>>>> parameter to the text file name? Could you please add the code that you >>>>>>> were thinking about? >>>>>>> >>>>>> >>>>>> Python SDK does not support writing to different files based on the >>>>>> values of data (dynamic writes). So you'll have to either partition data >>>>>> into separate PCollections or write all data into the same location. >>>>>> >>>>>> Here's *pseudocode* (untested) for reading from few queries, >>>>>> partitioning into several PCollections, and writing to different >>>>>> destinations. >>>>>> >>>>>> *queries = ['select * from A', 'select * from B',....]* >>>>>> >>>>>> *p = Pipeline()* >>>>>> *pcollections = []* >>>>>> *for query in queries:* >>>>>> * pc = p | beam.io.Read(beam.io >>>>>> <http://beam.io/>.BigQuerySource(query=query))* >>>>>> * pcollections.append(pc)* >>>>>> >>>>>> *all_data = pcollections | beam.Flatten()* >>>>>> *partitions = all_data | beam.Partition(my_partition_fn)* >>>>>> *for i, partition in enumerate(partitions):* >>>>>> * partition | beam.io.WriteToText(<unique path for partition i>)* >>>>>> Hope this helps. >>>>>> >>>>>> Thanks, >>>>>> Cham >>>>>> >>>>>> >>>>>> >>>>>>> Please see programming guide on how to write to text files (section >>>>>>>> 5.3 and click Python tab): https://beam.apache.org/ >>>>>>>> documentation/programming-guide/ >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Cham >>>>>>>> >>>>>>>> >>>>>>>>> Do you have any idea how to sink the data to a text file? I have >>>>>>>>> tried few other options and was stuck at the write transform >>>>>>>>> >>>>>>>>> Any advice is very appreciated. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Eila >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Eila >>>>>>>>> www.orielresearch.org >>>>>>>>> https://www.meetup.com/Deep-Learning-In-Production/ >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Eila >>>>>>> www.orielresearch.org >>>>>>> https://www.meetup.com/Deep-Learning-In-Production/ >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Eila >>>>> www.orielresearch.org >>>>> https://www.meetup.com/Deep-Learning-In-Production/ >>>>> >>>> >>>> >>>> >>>> -- >>>> Eila >>>> www.orielresearch.org >>>> https://www.meetup.com/Deep-Learning-In-Production/ >>>> >>> >> >> >> -- >> Eila >> www.orielresearch.org >> https://www.meetup.com/Deep-Learning-In-Production/ >> > -- Eila www.orielresearch.org https://www.meetup.com/Deep-Learning-In-Production/
