Hi Sebastien,
Can you tell more about how your "step 1" works? I looked at the logs of
your job and it's taking suspiciously long (~20 minutes) to produce the
~400 elements, and it's doing that sequentially. Is it possible to
parallelize step 1?

On Sat, Jun 10, 2017 at 5:53 PM Lukasz Cwik <lc...@google.com> wrote:

> The Dataflow implementation when executing a batch pipeline does not
> parallelize dependent fused segments irrespective of the windowing function
> so #1 will fully execute before #2 starts.
>
> On Sat, Jun 10, 2017 at 3:48 PM, Morand, Sebastien <
> sebastien.mor...@veolia.com> wrote:
>
>> Hi again,
>>
>> So it scales. Now my pipeline is running in two parts:
>>
>>    1. Reading files content (~400) and then GroupByKey
>>
>>    2. From GroupByKey transform and write in bigquery (~50M)
>>
>> 2 is scaling as expected. 1 takes about 25 minutes on my files and 2
>> about 35 minutes scaling. But what if I want to Window so that the second
>> part starts sooner and the process is more parallel?
>>
>> I tried to add a 60 seconds FixedWindow time but it's not working (Job ID
>> : 2017-06-06_04_36_01-9894155361321571250)
>>
>> Regards,
>>
>>
>> *Sébastien MORAND*
>> Team Lead Solution Architect
>> Technology & Operations / Digital Factory
>> Veolia - Group Information Systems & Technology (IS&T)
>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>> <+33%201%2085%2057%2071%2008>
>> Bureau 0144C (Ouest)
>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>> *www.veolia.com <http://www.veolia.com>*
>> <http://www.veolia.com>
>> <https://www.facebook.com/veoliaenvironment/>
>> <https://www.youtube.com/user/veoliaenvironnement>
>> <https://www.linkedin.com/company/veolia-environnement>
>> <https://twitter.com/veolia>
>>
>> On 6 June 2017 at 01:24, Morand, Sebastien <sebastien.mor...@veolia.com>
>> wrote:
>>
>>> Fine, it scales ... Thank you very much.
>>>
>>> *Sébastien MORAND*
>>> Team Lead Solution Architect
>>> Technology & Operations / Digital Factory
>>> Veolia - Group Information Systems & Technology (IS&T)
>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>> <+33%201%2085%2057%2071%2008>
>>> Bureau 0144C (Ouest)
>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>> *www.veolia.com <http://www.veolia.com>*
>>> <http://www.veolia.com>
>>> <https://www.facebook.com/veoliaenvironment/>
>>> <https://www.youtube.com/user/veoliaenvironnement>
>>> <https://www.linkedin.com/company/veolia-environnement>
>>> <https://twitter.com/veolia>
>>>
>>> On 6 June 2017 at 00:31, Morand, Sebastien <sebastien.mor...@veolia.com>
>>> wrote:
>>>
>>>> Thank you Eugene,
>>>>
>>>> I'm trying the Sourabh way (and yours since it looks like it's the same
>>>> idea) and let you know if it's better.
>>>>
>>>> Regards,
>>>>
>>>> *Sébastien MORAND*
>>>> Team Lead Solution Architect
>>>> Technology & Operations / Digital Factory
>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>> <+33%201%2085%2057%2071%2008>
>>>> Bureau 0144C (Ouest)
>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>> *www.veolia.com <http://www.veolia.com>*
>>>> <http://www.veolia.com>
>>>> <https://www.facebook.com/veoliaenvironment/>
>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>> <https://twitter.com/veolia>
>>>>
>>>> On 5 June 2017 at 23:31, Eugene Kirpichov <kirpic...@google.com> wrote:
>>>>
>>>>> I looked at the job ID you quoted, and yes, it suffers from excessive
>>>>> fusion. I wish we had tooling to automatically detect that and emit a
>>>>> warning, but we don't have that yet.
>>>>>
>>>>> Here's an example of how you can break fusion:
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L326-L339
>>>>>
>>>>> On Mon, Jun 5, 2017 at 1:27 PM Sourabh Bajaj <sourabhba...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Yes you're correct.
>>>>>>
>>>>>> On Mon, Jun 5, 2017 at 1:23 PM Morand, Sebastien <
>>>>>> sebastien.mor...@veolia.com> wrote:
>>>>>>
>>>>>>> Between parenthesis of each step I meant the number of records in
>>>>>>> output
>>>>>>>
>>>>>>> When I ungroup I send again the 200 data? not the 20M?
>>>>>>>
>>>>>>> Shouldn't I do instead:
>>>>>>> Read (200)  -> GroupByKey (200) -> UnGroup(20M) -> combine (20M) ->
>>>>>>> clean (20M) -> filter (20M) -> insert
>>>>>>>
>>>>>>>
>>>>>>> *Sébastien MORAND*
>>>>>>> Team Lead Solution Architect
>>>>>>> Technology & Operations / Digital Factory
>>>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>>>> <+33%201%2085%2057%2071%2008>
>>>>>>> Bureau 0144C (Ouest)
>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>>>> <http://www.veolia.com>
>>>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>>>> <https://twitter.com/veolia>
>>>>>>>
>>>>>>> On 5 June 2017 at 22:17, Sourabh Bajaj <sourabhba...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I think you need to do something like:
>>>>>>>>
>>>>>>>> Read (200)  -> GroupByKey (200) -> UnGroup(200) [Not this can be on
>>>>>>>> 200 different workers] -> combine (20M) -> clean (20M) -> filter
>>>>>>>> (20M) -> insert
>>>>>>>>
>>>>>>>> On Mon, Jun 5, 2017 at 1:14 PM Morand, Sebastien <
>>>>>>>> sebastien.mor...@veolia.com> wrote:
>>>>>>>>
>>>>>>>>> Yes fusion looks like my problem. A job ID to look at:
>>>>>>>>> 2017-06-05_10_14_25-5856213199384263626.
>>>>>>>>>
>>>>>>>>> The point is in your link:
>>>>>>>>> <<
>>>>>>>>> For example, one case in which fusion can limit Dataflow's ability
>>>>>>>>> to optimize worker usage is a "high fan-out" ParDo. In such an 
>>>>>>>>> operation,
>>>>>>>>> you might have an input collection with relatively few elements, but 
>>>>>>>>> the
>>>>>>>>> ParDo produces an output with hundreds or thousands of times as many
>>>>>>>>> elements, followed by another ParDo
>>>>>>>>> >>
>>>>>>>>>
>>>>>>>>> This is exactly what I'm doing in the step
>>>>>>>>> transform-combine-7d5ad942 in the above job id.
>>>>>>>>>
>>>>>>>>> As fas as I understand, I should create a GroupByKey after the
>>>>>>>>> transform-combine-7d5ad942 on a unique field and then ungroup the 
>>>>>>>>> data?
>>>>>>>>> (meaning I add two operations in the pipeline to help the worker?
>>>>>>>>>
>>>>>>>>> Right now:
>>>>>>>>> Read (200) -> combine (20M) -> clean (20M) -> filter (20M) ->
>>>>>>>>> insert
>>>>>>>>>
>>>>>>>>> Will become:
>>>>>>>>> Read (200) -> combine (20M) -> GroupByKey (20M) -> ungroup (20M)
>>>>>>>>> -> clean (20M) -> filter (20M) -> insert
>>>>>>>>>
>>>>>>>>> It this the right way?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Sébastien MORAND*
>>>>>>>>> Team Lead Solution Architect
>>>>>>>>> Technology & Operations / Digital Factory
>>>>>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>>>>>> <+33%201%2085%2057%2071%2008>
>>>>>>>>> Bureau 0144C (Ouest)
>>>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>>>>>> <http://www.veolia.com>
>>>>>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>>>>>> <https://twitter.com/veolia>
>>>>>>>>>
>>>>>>>>> On 5 June 2017 at 21:42, Eugene Kirpichov <kirpic...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Do you have a Dataflow job ID to look at?
>>>>>>>>>> It might be due to fusion
>>>>>>>>>> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
>>>>>>>>>>
>>>>>>>>>> On Mon, Jun 5, 2017 at 12:13 PM Prabeesh K. <prabsma...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Please try using *--worker_machine_type* n1-standard-4 or
>>>>>>>>>>> n1-standard-8
>>>>>>>>>>>
>>>>>>>>>>> On 5 June 2017 at 23:08, Morand, Sebastien <
>>>>>>>>>>> sebastien.mor...@veolia.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I do have a problem with my tries to test scaling on dataflow.
>>>>>>>>>>>>
>>>>>>>>>>>> My dataflow is pretty simple: I get a list of files from
>>>>>>>>>>>> pubsub, so the number of files I'm going to use to feed the flow 
>>>>>>>>>>>> is well
>>>>>>>>>>>> known at the begining. Here are my steps:
>>>>>>>>>>>> Let's say I have 200 files containing about 20,000,000 of
>>>>>>>>>>>> records
>>>>>>>>>>>>
>>>>>>>>>>>>    - *First Step:* Read file contents from storage: files are
>>>>>>>>>>>>    .tar.gz containing each 4 files (CSV). I return the file 
>>>>>>>>>>>> content as the
>>>>>>>>>>>>    whole in a record
>>>>>>>>>>>>    *OUT:* 200 records (one for each file containing the data
>>>>>>>>>>>>    of all 4 files). Bascillacy it's a dict : {file1: 
>>>>>>>>>>>> content_of_file1, file2:
>>>>>>>>>>>>    content_of_file2, etc...}
>>>>>>>>>>>>
>>>>>>>>>>>>    - *Second step:*  Joining the data of the 4 files in one
>>>>>>>>>>>>    record (the main file contains foreign key to get information 
>>>>>>>>>>>> from the
>>>>>>>>>>>>    other files)
>>>>>>>>>>>>    *OUT:* 20,000,000 records each for every line in the files.
>>>>>>>>>>>>    Each record is a list of string
>>>>>>>>>>>>
>>>>>>>>>>>>    - *Third step:* cleaning data (convert to prepare
>>>>>>>>>>>>    integration in bigquery) and set them as a dict where keys are 
>>>>>>>>>>>> bigquery
>>>>>>>>>>>>    column name.
>>>>>>>>>>>>    *OUT:* 20,000,000 records as dict for each record
>>>>>>>>>>>>
>>>>>>>>>>>>    - *Fourth step:* insert into bigquery
>>>>>>>>>>>>
>>>>>>>>>>>> So the first step return 200 records, but I have 20,000,000
>>>>>>>>>>>> records to insert.
>>>>>>>>>>>> This takes about 1 hour and half and always use 1 worker ...
>>>>>>>>>>>>
>>>>>>>>>>>> If I manually set the number of workers, it's not really
>>>>>>>>>>>> faster. So for an unknow reason, it doesn't scale, any ideas how 
>>>>>>>>>>>> to do it?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for any help.
>>>>>>>>>>>>
>>>>>>>>>>>> *Sébastien MORAND*
>>>>>>>>>>>> Team Lead Solution Architect
>>>>>>>>>>>> Technology & Operations / Digital Factory
>>>>>>>>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>>>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>>>>>>>>> <+33%201%2085%2057%2071%2008>
>>>>>>>>>>>> Bureau 0144C (Ouest)
>>>>>>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>>>>>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>>>>>>>>> <http://www.veolia.com>
>>>>>>>>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>>>>>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>>>>>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>>>>>>>>> <https://twitter.com/veolia>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --------------------------------------------------------------------------------------------
>>>>>>>>>>>> This e-mail transmission (message and any attached files) may
>>>>>>>>>>>> contain information that is proprietary, privileged and/or 
>>>>>>>>>>>> confidential to
>>>>>>>>>>>> Veolia Environnement and/or its affiliates and is intended 
>>>>>>>>>>>> exclusively for
>>>>>>>>>>>> the person(s) to whom it is addressed. If you are not the intended
>>>>>>>>>>>> recipient, please notify the sender by return e-mail and delete 
>>>>>>>>>>>> all copies
>>>>>>>>>>>> of this e-mail, including all attachments. Unless expressly 
>>>>>>>>>>>> authorized, any
>>>>>>>>>>>> use, disclosure, publication, retransmission or dissemination of 
>>>>>>>>>>>> this
>>>>>>>>>>>> e-mail and/or of its attachments is strictly prohibited.
>>>>>>>>>>>>
>>>>>>>>>>>> Ce message electronique et ses fichiers attaches sont
>>>>>>>>>>>> strictement confidentiels et peuvent contenir des elements dont 
>>>>>>>>>>>> Veolia
>>>>>>>>>>>> Environnement et/ou l'une de ses entites affiliees sont 
>>>>>>>>>>>> proprietaires. Ils
>>>>>>>>>>>> sont donc destines a l'usage de leurs seuls destinataires. Si vous 
>>>>>>>>>>>> avez
>>>>>>>>>>>> recu ce message par erreur, merci de le retourner a son emetteur 
>>>>>>>>>>>> et de le
>>>>>>>>>>>> detruire ainsi que toutes les pieces attachees. L'utilisation, la
>>>>>>>>>>>> divulgation, la publication, la distribution, ou la reproduction 
>>>>>>>>>>>> non
>>>>>>>>>>>> expressement autorisees de ce message et de ses pieces attachees 
>>>>>>>>>>>> sont
>>>>>>>>>>>> interdites.
>>>>>>>>>>>>
>>>>>>>>>>>> --------------------------------------------------------------------------------------------
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --------------------------------------------------------------------------------------------
>>>>>>>>> This e-mail transmission (message and any attached files) may
>>>>>>>>> contain information that is proprietary, privileged and/or 
>>>>>>>>> confidential to
>>>>>>>>> Veolia Environnement and/or its affiliates and is intended 
>>>>>>>>> exclusively for
>>>>>>>>> the person(s) to whom it is addressed. If you are not the intended
>>>>>>>>> recipient, please notify the sender by return e-mail and delete all 
>>>>>>>>> copies
>>>>>>>>> of this e-mail, including all attachments. Unless expressly 
>>>>>>>>> authorized, any
>>>>>>>>> use, disclosure, publication, retransmission or dissemination of this
>>>>>>>>> e-mail and/or of its attachments is strictly prohibited.
>>>>>>>>>
>>>>>>>>> Ce message electronique et ses fichiers attaches sont strictement
>>>>>>>>> confidentiels et peuvent contenir des elements dont Veolia 
>>>>>>>>> Environnement
>>>>>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
>>>>>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>>>>>>>>> message par erreur, merci de le retourner a son emetteur et de le 
>>>>>>>>> detruire
>>>>>>>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, 
>>>>>>>>> la
>>>>>>>>> publication, la distribution, ou la reproduction non expressement
>>>>>>>>> autorisees de ce message et de ses pieces attachees sont interdites.
>>>>>>>>>
>>>>>>>>> --------------------------------------------------------------------------------------------
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --------------------------------------------------------------------------------------------
>>>>>>> This e-mail transmission (message and any attached files) may
>>>>>>> contain information that is proprietary, privileged and/or confidential 
>>>>>>> to
>>>>>>> Veolia Environnement and/or its affiliates and is intended exclusively 
>>>>>>> for
>>>>>>> the person(s) to whom it is addressed. If you are not the intended
>>>>>>> recipient, please notify the sender by return e-mail and delete all 
>>>>>>> copies
>>>>>>> of this e-mail, including all attachments. Unless expressly authorized, 
>>>>>>> any
>>>>>>> use, disclosure, publication, retransmission or dissemination of this
>>>>>>> e-mail and/or of its attachments is strictly prohibited.
>>>>>>>
>>>>>>> Ce message electronique et ses fichiers attaches sont strictement
>>>>>>> confidentiels et peuvent contenir des elements dont Veolia Environnement
>>>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
>>>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>>>>>>> message par erreur, merci de le retourner a son emetteur et de le 
>>>>>>> detruire
>>>>>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la
>>>>>>> publication, la distribution, ou la reproduction non expressement
>>>>>>> autorisees de ce message et de ses pieces attachees sont interdites.
>>>>>>>
>>>>>>> --------------------------------------------------------------------------------------------
>>>>>>>
>>>>>>
>>>>
>>>
>>
>>
>>
>> --------------------------------------------------------------------------------------------
>> This e-mail transmission (message and any attached files) may contain
>> information that is proprietary, privileged and/or confidential to Veolia
>> Environnement and/or its affiliates and is intended exclusively for the
>> person(s) to whom it is addressed. If you are not the intended recipient,
>> please notify the sender by return e-mail and delete all copies of this
>> e-mail, including all attachments. Unless expressly authorized, any use,
>> disclosure, publication, retransmission or dissemination of this e-mail
>> and/or of its attachments is strictly prohibited.
>>
>> Ce message electronique et ses fichiers attaches sont strictement
>> confidentiels et peuvent contenir des elements dont Veolia Environnement
>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>> message par erreur, merci de le retourner a son emetteur et de le detruire
>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la
>> publication, la distribution, ou la reproduction non expressement
>> autorisees de ce message et de ses pieces attachees sont interdites.
>>
>> --------------------------------------------------------------------------------------------
>>
>
>

Reply via email to