yes. You were right, I had to put back the pc.
I am working on the partition function and try to debug it without running
a pipeline on dataflow (dataflow execution takes at list 8 minutes for any
size of data), based on the link:
https://medium.com/google-cloud/quickly-experiment-with-dataflow-3d5a0da8d8e9
*my code & questions:*
# Is it possible to use string as partition name? if not, what would be the
simplest solution?
# generating another PCollection with the samples (index) and merge with
the data table? if yes,
# how would I extract that information when I am writing to text file
def partition_fn(element, num_partitions):
return(element['sample'])
# debug input, no pipeline
*d = [{u'sample': u'GSM2313641', u'SNRPCP14': 0},{u'sample': u'GSM2316666',
u'SNRPCP14': 0},{u'sample': u'GSM2312355', u'SNRPCP14': 0}] |
beam.Flatten()*
*d*
# output:
[(u'sample', u'GSM2312355'),
(u'SNRPCP14', 0),
(u'sample', u'GSM2313641'),
(u'SNRPCP14', 0),
(u'sample', u'GSM2316666'),
(u'SNRPCP14', 0)]
*d | beam.Partition(partition_fn,3)*
#output, error:
TypeError: tuple indices must be integers, not str [while running
'Partition(CallableWrapperPartitionFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)']
*#writing to dynamic output - how do i extract the partition name that the
element is assigned to?*
*for i, partition in enumerate(partitions):*
* # The partition path is using the partition str name*
* pathi= str('gs://bucket/output/'+x["sample"]+'/')*
* partition | label >> beam.io.WriteToText(pathi)*
Please also let me know if there is a better way to debug the code before
running on dataflow runner.
Many thanks,
Eila
On Wed, Mar 21, 2018 at 12:43 PM, Chamikara Jayalath <[email protected]>
wrote:
> On Wed, Mar 21, 2018 at 7:53 AM OrielResearch Eila Arich-Landkof <
> [email protected]> wrote:
>
>> Hi Cham,
>>
>> *all_data = pcollections | beam.Flatten()*
>>
>> fires an error:
>>
>> TypeError: 'Read' object is not iterable
>>
>>
>> pcollections is the following list:
>>
>> [<Read(PTransform) label=[Read] at 0x7f9fa93d7410>,
>> <Read(PTransform) label=[Read] at 0x7f9fa988a350>,
>> <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>,
>> <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>]
>>
>>
>>
> Did you omit "p | " in "p | beam.io.Read" by any chance ? Not sure how
> you ended up with a list of Read PTransforms otherwise.
>
> Also, follow everything with a "p.run.wait_until_finish()" for pipeline to
> execute.
>
> Can you paste the code that you are running ?
>
>
>> Based on the following, i converted the list to tuples
>> (tuple(*pcollections)) with the same error for tuple.*
>>
>>
>> # Flatten takes a tuple of PCollection objects.# Returns a single
>> PCollection that contains all of the elements in the PCollection objects in
>> that tuple.merged = (
>> (pcoll1, pcoll2, pcoll3)
>> # A list of tuples can be "piped" directly into a Flatten transform.
>> | beam.Flatten())
>>
>>
>> Any advice?
>>
>> Many thanks,
>> Eila
>>
>>
>> On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof <
>> [email protected]> wrote:
>>
>>> very helpful!!! i will keep you posted if I have any issue / question
>>> Best,
>>> Eila
>>>
>>>
>>> On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <
>>> [email protected]> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi Cham,
>>>>>
>>>>> Please see inline. If possible, code / pseudo code will help a lot.
>>>>> Thanks,
>>>>> Eila
>>>>>
>>>>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi Eila,
>>>>>>
>>>>>> Please find my comments inline.
>>>>>>
>>>>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> It was nice to meet you last week!!!
>>>>>>>
>>>>>>>
>>>>>> It was nice to meet you as well :)
>>>>>>
>>>>>>
>>>>>>> I am writing genomic pCollection that is created from bigQuery to a
>>>>>>> folder. Following is the code with output so you can run it with any
>>>>>>> small
>>>>>>> BQ table and let me know what your thoughts are:
>>>>>>>
>>>>>>> This init is only for debugging. In production I will use the
>>>>> pipeline syntax
>>>>>
>>>>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>>>>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>>>>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>>>>>>
>>>>>>> rows[1].keys()
>>>>>>> # output: [u'index', u'SNRPCP14']
>>>>>>>
>>>>>>> # you can change `archs4.results_20180308_ to any other table name
>>>>>>> with index column
>>>>>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io.
>>>>>>> BigQuerySource(project='orielresearch-188115',
>>>>>>> use_standard_sql=False, query=str('SELECT * FROM
>>>>>>> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))),
>>>>>>> str('gs://archs4/output/'+x["
>>>>>>> index"]+'/')))
>>>>>>>
>>>>>>
>>>>>> I don't think above code will work (not portable across runners at
>>>>>> least). BigQuerySource (along with Read transform) have to be applied to
>>>>>> a
>>>>>> Pipeline object. So probably change this to a for loop that creates a set
>>>>>> of read transforms and use Flatten to create a single PCollection.
>>>>>>
>>>>> For debug, I am running on the local datalab runner. For the
>>>>> production, I will be running only dataflow runner. I think that I was
>>>>> able
>>>>> to query the tables that way, I will double check it. The indexes could go
>>>>> to millions - my concern is that I will not be able to leverage on Beam
>>>>> distribution capability when I use the the loop option. Any thoughts on
>>>>> that?
>>>>>
>>>>
>>>> You mean you'll have millions of queries. That will not be scalable. My
>>>> suggestion was to loop on queries. Can you reduce to one or a small number
>>>> of queries and perform further processing in Beam ?
>>>>
>>>>
>>>>>
>>>>>>
>>>>>>>
>>>>>>> queries2
>>>>>>> # output: a list of pCollection and the path to write the
>>>>>>> pCollection data to
>>>>>>>
>>>>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>>>>>> 'gs://archs4/output/GSM2313641/'),
>>>>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>>>>>> 'gs://archs4/output/GSM2316666/'),
>>>>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>>>>>> 'gs://archs4/output/GSM2312355/'),
>>>>>>> (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>>>>>> 'gs://archs4/output/GSM2312372/')]
>>>>>>>
>>>>>>>
>>>>>> What you got here is a PCollection of PTransform objects which is not
>>>>>> useful.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> *# this is my challenge*
>>>>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>>>>>>> COLUMN")
>>>>>>>
>>>>>>>
>>>>>> Once you update above code you will get a proper PCollection of
>>>>>> elements read from BigQuery. You can transform and write this (to files,
>>>>>> BQ, or any other sink) as needed.
>>>>>>
>>>>>
>>>>> it is a list of tupples with PCollection and the path to write to. the
>>>>> path is not unique and I might have more than one PCollection written to
>>>>> the same destination. How do I pass the path from the tupple list as a
>>>>> parameter to the text file name? Could you please add the code that you
>>>>> were thinking about?
>>>>>
>>>>
>>>> Python SDK does not support writing to different files based on the
>>>> values of data (dynamic writes). So you'll have to either partition data
>>>> into separate PCollections or write all data into the same location.
>>>>
>>>> Here's *pseudocode* (untested) for reading from few queries,
>>>> partitioning into several PCollections, and writing to different
>>>> destinations.
>>>>
>>>> *queries = ['select * from A', 'select * from B',....]*
>>>>
>>>> *p = Pipeline()*
>>>> *pcollections = []*
>>>> *for query in queries:*
>>>> * pc = p | beam.io.Read(beam.io
>>>> <http://beam.io/>.BigQuerySource(query=query))*
>>>> * pcollections.append(pc)*
>>>>
>>>> *all_data = pcollections | beam.Flatten()*
>>>> *partitions = all_data | beam.Partition(my_partition_fn)*
>>>> *for i, partition in enumerate(partitions):*
>>>> * partition | beam.io.WriteToText(<unique path for partition i>)*
>>>> Hope this helps.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>>
>>>>
>>>>> Please see programming guide on how to write to text files (section
>>>>>> 5.3 and click Python tab): https://beam.apache.org/
>>>>>> documentation/programming-guide/
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>>
>>>>>>> Do you have any idea how to sink the data to a text file? I have
>>>>>>> tried few other options and was stuck at the write transform
>>>>>>>
>>>>>>> Any advice is very appreciated.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Eila
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Eila
>>>>>>> www.orielresearch.org
>>>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eila
>>>>> www.orielresearch.org
>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>
>>>>
>>>
>>>
>>> --
>>> Eila
>>> www.orielresearch.org
>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>
>>
>>
>>
>> --
>> Eila
>> www.orielresearch.org
>> https://www.meetup.com/Deep-Learning-In-Production/
>>
>
--
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/