On Wed, Mar 21, 2018 at 1:57 PM OrielResearch Eila Arich-Landkof <
[email protected]> wrote:

> yes. You were right, I had to put back the pc.
> I am working on the partition function and try to debug it without running
> a pipeline on dataflow (dataflow execution takes at list 8 minutes for any
> size of data), based on the link:
> https://medium.com/google-cloud/quickly-experiment-with-dataflow-3d5a0da8d8e9
>
> *my code & questions:*
> # Is it possible to use string as partition name? if not, what would be
> the simplest solution?
> # generating another PCollection with the samples (index) and merge with
> the data table? if yes,
> # how would I extract that information when I am writing to text file
> def partition_fn(element, num_partitions):
>   return(element['sample'])
>

I believe partition function has to return an integer that is the partition
number for a given element. Please see following code snippet for an
example usage.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L1155

Also, that example shows how you can write resulting PCollections to text
files.


>
> # debug input, no pipeline
>
> *d = [{u'sample': u'GSM2313641', u'SNRPCP14': 0},{u'sample':
> u'GSM2316666', u'SNRPCP14': 0},{u'sample': u'GSM2312355', u'SNRPCP14': 0}]
> | beam.Flatten()*
>
> *d*
> # output:
>
> [(u'sample', u'GSM2312355'),
>  (u'SNRPCP14', 0),
>  (u'sample', u'GSM2313641'),
>  (u'SNRPCP14', 0),
>  (u'sample', u'GSM2316666'),
>  (u'SNRPCP14', 0)]
>
>
>
> *d | beam.Partition(partition_fn,3)*
> #output, error:
>
> TypeError: tuple indices must be integers, not str [while running 
> 'Partition(CallableWrapperPartitionFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)']
>
>
Please see above.


>
> *#writing to dynamic output - how do i extract the partition name that the
> element is assigned to?*
> *for i, partition in enumerate(partitions):*
> *  # The partition path is using the partition str name*
> *  pathi= str('gs://bucket/output/'+x["sample"]+'/')*
> *  partition | label >> beam.io.WriteToText(pathi)*
>
>
> Please also let me know if there is a better way to debug the code before
> running on dataflow runner.
>

You can test code using DirectRunner before using DataflowRunner. Both
runners should produce the same result.


>
> Many thanks,
> Eila
>
>
>
> On Wed, Mar 21, 2018 at 12:43 PM, Chamikara Jayalath <[email protected]
> > wrote:
>
>> On Wed, Mar 21, 2018 at 7:53 AM OrielResearch Eila Arich-Landkof <
>> [email protected]> wrote:
>>
>>> Hi Cham,
>>>
>>> *all_data = pcollections | beam.Flatten()*
>>>
>>> fires an error:
>>>
>>> TypeError: 'Read' object is not iterable
>>>
>>>
>>> pcollections is the following list:
>>>
>>> [<Read(PTransform) label=[Read] at 0x7f9fa93d7410>,
>>>  <Read(PTransform) label=[Read] at 0x7f9fa988a350>,
>>>  <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>,
>>>  <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>]
>>>
>>>
>>>
>> Did you omit "p | " in "p | beam.io.Read" by any chance ? Not sure how
>> you ended up with a list of Read PTransforms otherwise.
>>
>> Also, follow everything with a "p.run.wait_until_finish()" for pipeline
>> to execute.
>>
>> Can you paste the code that you are running ?
>>
>>
>>> Based on the following, i converted the list to tuples 
>>> (tuple(*pcollections)) with the same error for tuple.*
>>>
>>>
>>> # Flatten takes a tuple of PCollection objects.# Returns a single 
>>> PCollection that contains all of the elements in the PCollection objects in 
>>> that tuple.merged = (
>>>     (pcoll1, pcoll2, pcoll3)
>>>     # A list of tuples can be "piped" directly into a Flatten transform.
>>>     | beam.Flatten())
>>>
>>>
>>> Any advice?
>>>
>>> Many thanks,
>>> Eila
>>>
>>>
>>> On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof <
>>> [email protected]> wrote:
>>>
>>>> very helpful!!! i will keep you posted if I have any issue / question
>>>> Best,
>>>> Eila
>>>>
>>>>
>>>> On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi Cham,
>>>>>>
>>>>>> Please see inline. If possible, code / pseudo code will help a lot.
>>>>>> Thanks,
>>>>>> Eila
>>>>>>
>>>>>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi Eila,
>>>>>>>
>>>>>>> Please find my comments inline.
>>>>>>>
>>>>>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hello all,
>>>>>>>>
>>>>>>>> It was nice to meet you last week!!!
>>>>>>>>
>>>>>>>>
>>>>>>> It was nice to meet you as well :)
>>>>>>>
>>>>>>>
>>>>>>>> I am writing genomic pCollection that is created from bigQuery to a
>>>>>>>> folder. Following is the code with output so you can run it with any 
>>>>>>>> small
>>>>>>>> BQ table and let me know what your thoughts are:
>>>>>>>>
>>>>>>>> This init is only for debugging. In production I will use the
>>>>>> pipeline syntax
>>>>>>
>>>>>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>>>>>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>>>>>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>>>>>>>
>>>>>>>> rows[1].keys()
>>>>>>>> # output:  [u'index', u'SNRPCP14']
>>>>>>>>
>>>>>>>> # you can change `archs4.results_20180308_ to any other table name
>>>>>>>> with index column
>>>>>>>> queries2 = rows | beam.Map(lambda x: 
>>>>>>>> (beam.io.Read(beam.io.BigQuerySource(project='orielresearch-188115',
>>>>>>>> use_standard_sql=False, query=str('SELECT * FROM
>>>>>>>> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))),
>>>>>>>>
>>>>>>>>  str('gs://archs4/output/'+x["index"]+'/')))
>>>>>>>>
>>>>>>>
>>>>>>> I don't think above code will work (not portable across runners at
>>>>>>> least). BigQuerySource (along with Read transform) have to be applied 
>>>>>>> to a
>>>>>>> Pipeline object. So probably change this to a for loop that creates a 
>>>>>>> set
>>>>>>> of read transforms and use Flatten to create a single PCollection.
>>>>>>>
>>>>>> For debug, I am running on the local datalab runner. For the
>>>>>> production, I will be running only dataflow runner. I think that I was 
>>>>>> able
>>>>>> to query the tables that way, I will double check it. The indexes could 
>>>>>> go
>>>>>> to millions - my concern is that I will not be able to leverage on Beam
>>>>>> distribution capability when I use the the loop option. Any thoughts on
>>>>>> that?
>>>>>>
>>>>>
>>>>> You mean you'll have millions of queries. That will not be scalable.
>>>>> My suggestion was to loop on queries. Can you reduce to one or a small
>>>>> number of queries and perform further processing in Beam ?
>>>>>
>>>>>
>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> queries2
>>>>>>>> # output: a list of pCollection and the path to write the
>>>>>>>> pCollection data to
>>>>>>>>
>>>>>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>>>>>>>   'gs://archs4/output/GSM2313641/'),
>>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>>>>>>>   'gs://archs4/output/GSM2316666/'),
>>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>>>>>>>   'gs://archs4/output/GSM2312355/'),
>>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>>>>>>>   'gs://archs4/output/GSM2312372/')]
>>>>>>>>
>>>>>>>>
>>>>>>> What you got here is a PCollection of PTransform objects which is
>>>>>>> not useful.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> *# this is my challenge*
>>>>>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>>>>>>>> COLUMN")
>>>>>>>>
>>>>>>>>
>>>>>>> Once you update above code you will get a proper PCollection of
>>>>>>> elements read from BigQuery. You can transform and write this (to files,
>>>>>>> BQ, or any other sink) as needed.
>>>>>>>
>>>>>>
>>>>>> it is a list of tupples with PCollection and the path to write to.
>>>>>> the path is not unique and I might have more than one PCollection written
>>>>>> to the same destination. How do I pass the path from the tupple list as a
>>>>>> parameter to the text file name? Could you please add the code that you
>>>>>> were thinking about?
>>>>>>
>>>>>
>>>>> Python SDK does not support writing to different files based on the
>>>>> values of data (dynamic writes). So you'll have to either partition data
>>>>> into separate PCollections  or write all data into the same location.
>>>>>
>>>>> Here's *pseudocode* (untested) for reading from few queries,
>>>>> partitioning into several PCollections, and writing to different
>>>>> destinations.
>>>>>
>>>>> *queries = ['select * from A', 'select * from B',....]*
>>>>>
>>>>> *p = Pipeline()*
>>>>> *pcollections = []*
>>>>> *for query in queries:*
>>>>> *  pc = p | beam.io.Read(beam.io
>>>>> <http://beam.io/>.BigQuerySource(query=query))*
>>>>> * pcollections.append(pc)*
>>>>>
>>>>> *all_data = pcollections | beam.Flatten()*
>>>>> *partitions = all_data | beam.Partition(my_partition_fn)*
>>>>> *for i, partition in enumerate(partitions):*
>>>>> *  partition | beam.io.WriteToText(<unique path for partition i>)*
>>>>> Hope this helps.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>>
>>>>>> Please see programming guide on how to write to text files (section
>>>>>>> 5.3 and click Python tab):
>>>>>>> https://beam.apache.org/documentation/programming-guide/
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>>
>>>>>>>> Do you have any idea how to sink the data to a text file? I have
>>>>>>>> tried few other options and was stuck at the write transform
>>>>>>>>
>>>>>>>> Any advice is very appreciated.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Eila
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Eila
>>>>>>>> www.orielresearch.org
>>>>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eila
>>>>>> www.orielresearch.org
>>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Eila
>>>> www.orielresearch.org
>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>
>>>
>>>
>>>
>>> --
>>> Eila
>>> www.orielresearch.org
>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>
>>
>
>
> --
> Eila
> www.orielresearch.org
> https://www.meetup.com/Deep-Learning-In-Production/
>

Reply via email to