Thanks. Everything is working!
Eila

On Thu, Mar 22, 2018 at 3:38 AM, Chamikara Jayalath <[email protected]>
wrote:

>
>
> On Wed, Mar 21, 2018 at 1:57 PM OrielResearch Eila Arich-Landkof <
> [email protected]> wrote:
>
>> yes. You were right, I had to put back the pc.
>> I am working on the partition function and try to debug it without
>> running a pipeline on dataflow (dataflow execution takes at list 8 minutes
>> for any size of data), based on the link: https://medium.com/google-
>> cloud/quickly-experiment-with-dataflow-3d5a0da8d8e9
>>
>> *my code & questions:*
>> # Is it possible to use string as partition name? if not, what would be
>> the simplest solution?
>> # generating another PCollection with the samples (index) and merge with
>> the data table? if yes,
>> # how would I extract that information when I am writing to text file
>> def partition_fn(element, num_partitions):
>>   return(element['sample'])
>>
>
> I believe partition function has to return an integer that is the
> partition number for a given element. Please see following code snippet for
> an example usage.
> https://github.com/apache/beam/blob/master/sdks/python/
> apache_beam/examples/snippets/snippets.py#L1155
>
> Also, that example shows how you can write resulting PCollections to text
> files.
>
>
>>
>> # debug input, no pipeline
>>
>> *d = [{u'sample': u'GSM2313641', u'SNRPCP14': 0},{u'sample':
>> u'GSM2316666', u'SNRPCP14': 0},{u'sample': u'GSM2312355', u'SNRPCP14': 0}]
>> | beam.Flatten()*
>>
>> *d*
>> # output:
>>
>> [(u'sample', u'GSM2312355'),
>>  (u'SNRPCP14', 0),
>>  (u'sample', u'GSM2313641'),
>>  (u'SNRPCP14', 0),
>>  (u'sample', u'GSM2316666'),
>>  (u'SNRPCP14', 0)]
>>
>>
>>
>> *d | beam.Partition(partition_fn,3)*
>> #output, error:
>>
>> TypeError: tuple indices must be integers, not str [while running 
>> 'Partition(CallableWrapperPartitionFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)']
>>
>>
> Please see above.
>
>
>>
>> *#writing to dynamic output - how do i extract the partition name that
>> the element is assigned to?*
>> *for i, partition in enumerate(partitions):*
>> *  # The partition path is using the partition str name*
>> *  pathi= str('gs://bucket/output/'+x["sample"]+'/')*
>> *  partition | label >> beam.io.WriteToText(pathi)*
>>
>>
>> Please also let me know if there is a better way to debug the code before
>> running on dataflow runner.
>>
>
> You can test code using DirectRunner before using DataflowRunner. Both
> runners should produce the same result.
>
>
>>
>> Many thanks,
>> Eila
>>
>>
>>
>> On Wed, Mar 21, 2018 at 12:43 PM, Chamikara Jayalath <
>> [email protected]> wrote:
>>
>>> On Wed, Mar 21, 2018 at 7:53 AM OrielResearch Eila Arich-Landkof <
>>> [email protected]> wrote:
>>>
>>>> Hi Cham,
>>>>
>>>> *all_data = pcollections | beam.Flatten()*
>>>>
>>>> fires an error:
>>>>
>>>> TypeError: 'Read' object is not iterable
>>>>
>>>>
>>>> pcollections is the following list:
>>>>
>>>> [<Read(PTransform) label=[Read] at 0x7f9fa93d7410>,
>>>>  <Read(PTransform) label=[Read] at 0x7f9fa988a350>,
>>>>  <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>,
>>>>  <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>]
>>>>
>>>>
>>>>
>>> Did you omit "p | " in "p | beam.io.Read" by any chance ? Not sure how
>>> you ended up with a list of Read PTransforms otherwise.
>>>
>>> Also, follow everything with a "p.run.wait_until_finish()" for pipeline
>>> to execute.
>>>
>>> Can you paste the code that you are running ?
>>>
>>>
>>>> Based on the following, i converted the list to tuples 
>>>> (tuple(*pcollections)) with the same error for tuple.*
>>>>
>>>>
>>>> # Flatten takes a tuple of PCollection objects.# Returns a single 
>>>> PCollection that contains all of the elements in the PCollection objects 
>>>> in that tuple.merged = (
>>>>     (pcoll1, pcoll2, pcoll3)
>>>>     # A list of tuples can be "piped" directly into a Flatten transform.
>>>>     | beam.Flatten())
>>>>
>>>>
>>>> Any advice?
>>>>
>>>> Many thanks,
>>>> Eila
>>>>
>>>>
>>>> On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof <
>>>> [email protected]> wrote:
>>>>
>>>>> very helpful!!! i will keep you posted if I have any issue / question
>>>>> Best,
>>>>> Eila
>>>>>
>>>>>
>>>>> On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi Cham,
>>>>>>>
>>>>>>> Please see inline. If possible, code / pseudo code will help a lot.
>>>>>>> Thanks,
>>>>>>> Eila
>>>>>>>
>>>>>>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Eila,
>>>>>>>>
>>>>>>>> Please find my comments inline.
>>>>>>>>
>>>>>>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hello all,
>>>>>>>>>
>>>>>>>>> It was nice to meet you last week!!!
>>>>>>>>>
>>>>>>>>>
>>>>>>>> It was nice to meet you as well :)
>>>>>>>>
>>>>>>>>
>>>>>>>>> I am writing genomic pCollection that is created from bigQuery to
>>>>>>>>> a folder. Following is the code with output so you can run it with any
>>>>>>>>> small BQ table and let me know what your thoughts are:
>>>>>>>>>
>>>>>>>>> This init is only for debugging. In production I will use the
>>>>>>> pipeline syntax
>>>>>>>
>>>>>>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>>>>>>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>>>>>>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>>>>>>>>
>>>>>>>>> rows[1].keys()
>>>>>>>>> # output:  [u'index', u'SNRPCP14']
>>>>>>>>>
>>>>>>>>> # you can change `archs4.results_20180308_ to any other table
>>>>>>>>> name with index column
>>>>>>>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io.
>>>>>>>>> BigQuerySource(project='orielresearch-188115',
>>>>>>>>> use_standard_sql=False, query=str('SELECT * FROM
>>>>>>>>> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))),
>>>>>>>>>                                str('gs://archs4/output/'+x["
>>>>>>>>> index"]+'/')))
>>>>>>>>>
>>>>>>>>
>>>>>>>> I don't think above code will work (not portable across runners at
>>>>>>>> least). BigQuerySource (along with Read transform) have to be applied 
>>>>>>>> to a
>>>>>>>> Pipeline object. So probably change this to a for loop that creates a 
>>>>>>>> set
>>>>>>>> of read transforms and use Flatten to create a single PCollection.
>>>>>>>>
>>>>>>> For debug, I am running on the local datalab runner. For the
>>>>>>> production, I will be running only dataflow runner. I think that I was 
>>>>>>> able
>>>>>>> to query the tables that way, I will double check it. The indexes could 
>>>>>>> go
>>>>>>> to millions - my concern is that I will not be able to leverage on Beam
>>>>>>> distribution capability when I use the the loop option. Any thoughts on
>>>>>>> that?
>>>>>>>
>>>>>>
>>>>>> You mean you'll have millions of queries. That will not be scalable.
>>>>>> My suggestion was to loop on queries. Can you reduce to one or a small
>>>>>> number of queries and perform further processing in Beam ?
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> queries2
>>>>>>>>> # output: a list of pCollection and the path to write the
>>>>>>>>> pCollection data to
>>>>>>>>>
>>>>>>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>>>>>>>>   'gs://archs4/output/GSM2313641/'),
>>>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>>>>>>>>   'gs://archs4/output/GSM2316666/'),
>>>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>>>>>>>>   'gs://archs4/output/GSM2312355/'),
>>>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>>>>>>>>   'gs://archs4/output/GSM2312372/')]
>>>>>>>>>
>>>>>>>>>
>>>>>>>> What you got here is a PCollection of PTransform objects which is
>>>>>>>> not useful.
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> *# this is my challenge*
>>>>>>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>>>>>>>>> COLUMN")
>>>>>>>>>
>>>>>>>>>
>>>>>>>> Once you update above code you will get a proper PCollection of
>>>>>>>> elements read from BigQuery. You can transform and write this (to 
>>>>>>>> files,
>>>>>>>> BQ, or any other sink) as needed.
>>>>>>>>
>>>>>>>
>>>>>>> it is a list of tupples with PCollection and the path to write to.
>>>>>>> the path is not unique and I might have more than one PCollection 
>>>>>>> written
>>>>>>> to the same destination. How do I pass the path from the tupple list as 
>>>>>>> a
>>>>>>> parameter to the text file name? Could you please add the code that you
>>>>>>> were thinking about?
>>>>>>>
>>>>>>
>>>>>> Python SDK does not support writing to different files based on the
>>>>>> values of data (dynamic writes). So you'll have to either partition data
>>>>>> into separate PCollections  or write all data into the same location.
>>>>>>
>>>>>> Here's *pseudocode* (untested) for reading from few queries,
>>>>>> partitioning into several PCollections, and writing to different
>>>>>> destinations.
>>>>>>
>>>>>> *queries = ['select * from A', 'select * from B',....]*
>>>>>>
>>>>>> *p = Pipeline()*
>>>>>> *pcollections = []*
>>>>>> *for query in queries:*
>>>>>> *  pc = p | beam.io.Read(beam.io
>>>>>> <http://beam.io/>.BigQuerySource(query=query))*
>>>>>> * pcollections.append(pc)*
>>>>>>
>>>>>> *all_data = pcollections | beam.Flatten()*
>>>>>> *partitions = all_data | beam.Partition(my_partition_fn)*
>>>>>> *for i, partition in enumerate(partitions):*
>>>>>> *  partition | beam.io.WriteToText(<unique path for partition i>)*
>>>>>> Hope this helps.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>>
>>>>>>
>>>>>>> Please see programming guide on how to write to text files (section
>>>>>>>> 5.3 and click Python tab): https://beam.apache.org/
>>>>>>>> documentation/programming-guide/
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Cham
>>>>>>>>
>>>>>>>>
>>>>>>>>> Do you have any idea how to sink the data to a text file? I have
>>>>>>>>> tried few other options and was stuck at the write transform
>>>>>>>>>
>>>>>>>>> Any advice is very appreciated.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Eila
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Eila
>>>>>>>>> www.orielresearch.org
>>>>>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Eila
>>>>>>> www.orielresearch.org
>>>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eila
>>>>> www.orielresearch.org
>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Eila
>>>> www.orielresearch.org
>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>
>>>
>>
>>
>> --
>> Eila
>> www.orielresearch.org
>> https://www.meetup.com/Deep-Learning-In-Production/
>>
>


-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Reply via email to