The Dataflow implementation when executing a batch pipeline does not
parallelize dependent fused segments irrespective of the windowing function
so #1 will fully execute before #2 starts.

On Sat, Jun 10, 2017 at 3:48 PM, Morand, Sebastien <
sebastien.mor...@veolia.com> wrote:

> Hi again,
>
> So it scales. Now my pipeline is running in two parts:
>
>    1. Reading files content (~400) and then GroupByKey
>
>    2. From GroupByKey transform and write in bigquery (~50M)
>
> 2 is scaling as expected. 1 takes about 25 minutes on my files and 2 about
> 35 minutes scaling. But what if I want to Window so that the second part
> starts sooner and the process is more parallel?
>
> I tried to add a 60 seconds FixedWindow time but it's not working (Job ID
> : 2017-06-06_04_36_01-9894155361321571250)
>
> Regards,
>
>
> *Sébastien MORAND*
> Team Lead Solution Architect
> Technology & Operations / Digital Factory
> Veolia - Group Information Systems & Technology (IS&T)
> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
> <+33%201%2085%2057%2071%2008>
> Bureau 0144C (Ouest)
> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
> *www.veolia.com <http://www.veolia.com>*
> <http://www.veolia.com>
> <https://www.facebook.com/veoliaenvironment/>
> <https://www.youtube.com/user/veoliaenvironnement>
> <https://www.linkedin.com/company/veolia-environnement>
> <https://twitter.com/veolia>
>
> On 6 June 2017 at 01:24, Morand, Sebastien <sebastien.mor...@veolia.com>
> wrote:
>
>> Fine, it scales ... Thank you very much.
>>
>> *Sébastien MORAND*
>> Team Lead Solution Architect
>> Technology & Operations / Digital Factory
>> Veolia - Group Information Systems & Technology (IS&T)
>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>> <+33%201%2085%2057%2071%2008>
>> Bureau 0144C (Ouest)
>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>> *www.veolia.com <http://www.veolia.com>*
>> <http://www.veolia.com>
>> <https://www.facebook.com/veoliaenvironment/>
>> <https://www.youtube.com/user/veoliaenvironnement>
>> <https://www.linkedin.com/company/veolia-environnement>
>> <https://twitter.com/veolia>
>>
>> On 6 June 2017 at 00:31, Morand, Sebastien <sebastien.mor...@veolia.com>
>> wrote:
>>
>>> Thank you Eugene,
>>>
>>> I'm trying the Sourabh way (and yours since it looks like it's the same
>>> idea) and let you know if it's better.
>>>
>>> Regards,
>>>
>>> *Sébastien MORAND*
>>> Team Lead Solution Architect
>>> Technology & Operations / Digital Factory
>>> Veolia - Group Information Systems & Technology (IS&T)
>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>> <+33%201%2085%2057%2071%2008>
>>> Bureau 0144C (Ouest)
>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>> *www.veolia.com <http://www.veolia.com>*
>>> <http://www.veolia.com>
>>> <https://www.facebook.com/veoliaenvironment/>
>>> <https://www.youtube.com/user/veoliaenvironnement>
>>> <https://www.linkedin.com/company/veolia-environnement>
>>> <https://twitter.com/veolia>
>>>
>>> On 5 June 2017 at 23:31, Eugene Kirpichov <kirpic...@google.com> wrote:
>>>
>>>> I looked at the job ID you quoted, and yes, it suffers from excessive
>>>> fusion. I wish we had tooling to automatically detect that and emit a
>>>> warning, but we don't have that yet.
>>>>
>>>> Here's an example of how you can break fusion: https://github.com/apa
>>>> che/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apa
>>>> che/beam/sdk/io/jdbc/JdbcIO.java#L326-L339
>>>>
>>>> On Mon, Jun 5, 2017 at 1:27 PM Sourabh Bajaj <sourabhba...@google.com>
>>>> wrote:
>>>>
>>>>> Yes you're correct.
>>>>>
>>>>> On Mon, Jun 5, 2017 at 1:23 PM Morand, Sebastien <
>>>>> sebastien.mor...@veolia.com> wrote:
>>>>>
>>>>>> Between parenthesis of each step I meant the number of records in
>>>>>> output
>>>>>>
>>>>>> When I ungroup I send again the 200 data? not the 20M?
>>>>>>
>>>>>> Shouldn't I do instead:
>>>>>> Read (200)  -> GroupByKey (200) -> UnGroup(20M) -> combine (20M) ->
>>>>>> clean (20M) -> filter (20M) -> insert
>>>>>>
>>>>>>
>>>>>> *Sébastien MORAND*
>>>>>> Team Lead Solution Architect
>>>>>> Technology & Operations / Digital Factory
>>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>>> <+33%201%2085%2057%2071%2008>
>>>>>> Bureau 0144C (Ouest)
>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>>> <http://www.veolia.com>
>>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>>> <https://twitter.com/veolia>
>>>>>>
>>>>>> On 5 June 2017 at 22:17, Sourabh Bajaj <sourabhba...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I think you need to do something like:
>>>>>>>
>>>>>>> Read (200)  -> GroupByKey (200) -> UnGroup(200) [Not this can be on
>>>>>>> 200 different workers] -> combine (20M) -> clean (20M) -> filter
>>>>>>> (20M) -> insert
>>>>>>>
>>>>>>> On Mon, Jun 5, 2017 at 1:14 PM Morand, Sebastien <
>>>>>>> sebastien.mor...@veolia.com> wrote:
>>>>>>>
>>>>>>>> Yes fusion looks like my problem. A job ID to look at:
>>>>>>>> 2017-06-05_10_14_25-5856213199384263626.
>>>>>>>>
>>>>>>>> The point is in your link:
>>>>>>>> <<
>>>>>>>> For example, one case in which fusion can limit Dataflow's ability
>>>>>>>> to optimize worker usage is a "high fan-out" ParDo. In such an 
>>>>>>>> operation,
>>>>>>>> you might have an input collection with relatively few elements, but 
>>>>>>>> the
>>>>>>>> ParDo produces an output with hundreds or thousands of times as many
>>>>>>>> elements, followed by another ParDo
>>>>>>>> >>
>>>>>>>>
>>>>>>>> This is exactly what I'm doing in the step
>>>>>>>> transform-combine-7d5ad942 in the above job id.
>>>>>>>>
>>>>>>>> As fas as I understand, I should create a GroupByKey after the
>>>>>>>> transform-combine-7d5ad942 on a unique field and then ungroup the data?
>>>>>>>> (meaning I add two operations in the pipeline to help the worker?
>>>>>>>>
>>>>>>>> Right now:
>>>>>>>> Read (200) -> combine (20M) -> clean (20M) -> filter (20M) -> insert
>>>>>>>>
>>>>>>>> Will become:
>>>>>>>> Read (200) -> combine (20M) -> GroupByKey (20M) -> ungroup (20M) ->
>>>>>>>> clean (20M) -> filter (20M) -> insert
>>>>>>>>
>>>>>>>> It this the right way?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Sébastien MORAND*
>>>>>>>> Team Lead Solution Architect
>>>>>>>> Technology & Operations / Digital Factory
>>>>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>>>>> <+33%201%2085%2057%2071%2008>
>>>>>>>> Bureau 0144C (Ouest)
>>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>>>>> <http://www.veolia.com>
>>>>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>>>>> <https://twitter.com/veolia>
>>>>>>>>
>>>>>>>> On 5 June 2017 at 21:42, Eugene Kirpichov <kirpic...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Do you have a Dataflow job ID to look at?
>>>>>>>>> It might be due to fusion https://cloud.google.co
>>>>>>>>> m/dataflow/service/dataflow-service-desc#preventing-fusion
>>>>>>>>>
>>>>>>>>> On Mon, Jun 5, 2017 at 12:13 PM Prabeesh K. <prabsma...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Please try using *--worker_machine_type* n1-standard-4 or
>>>>>>>>>> n1-standard-8
>>>>>>>>>>
>>>>>>>>>> On 5 June 2017 at 23:08, Morand, Sebastien <
>>>>>>>>>> sebastien.mor...@veolia.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I do have a problem with my tries to test scaling on dataflow.
>>>>>>>>>>>
>>>>>>>>>>> My dataflow is pretty simple: I get a list of files from pubsub,
>>>>>>>>>>> so the number of files I'm going to use to feed the flow is well 
>>>>>>>>>>> known at
>>>>>>>>>>> the begining. Here are my steps:
>>>>>>>>>>> Let's say I have 200 files containing about 20,000,000 of records
>>>>>>>>>>>
>>>>>>>>>>>    - *First Step:* Read file contents from storage: files are
>>>>>>>>>>>    .tar.gz containing each 4 files (CSV). I return the file content 
>>>>>>>>>>> as the
>>>>>>>>>>>    whole in a record
>>>>>>>>>>>    *OUT:* 200 records (one for each file containing the data of
>>>>>>>>>>>    all 4 files). Bascillacy it's a dict : {file1: content_of_file1, 
>>>>>>>>>>> file2:
>>>>>>>>>>>    content_of_file2, etc...}
>>>>>>>>>>>
>>>>>>>>>>>    - *Second step:*  Joining the data of the 4 files in one
>>>>>>>>>>>    record (the main file contains foreign key to get information 
>>>>>>>>>>> from the
>>>>>>>>>>>    other files)
>>>>>>>>>>>    *OUT:* 20,000,000 records each for every line in the files.
>>>>>>>>>>>    Each record is a list of string
>>>>>>>>>>>
>>>>>>>>>>>    - *Third step:* cleaning data (convert to prepare
>>>>>>>>>>>    integration in bigquery) and set them as a dict where keys are 
>>>>>>>>>>> bigquery
>>>>>>>>>>>    column name.
>>>>>>>>>>>    *OUT:* 20,000,000 records as dict for each record
>>>>>>>>>>>
>>>>>>>>>>>    - *Fourth step:* insert into bigquery
>>>>>>>>>>>
>>>>>>>>>>> So the first step return 200 records, but I have 20,000,000
>>>>>>>>>>> records to insert.
>>>>>>>>>>> This takes about 1 hour and half and always use 1 worker ...
>>>>>>>>>>>
>>>>>>>>>>> If I manually set the number of workers, it's not really faster.
>>>>>>>>>>> So for an unknow reason, it doesn't scale, any ideas how to do it?
>>>>>>>>>>>
>>>>>>>>>>> Thanks for any help.
>>>>>>>>>>>
>>>>>>>>>>> *Sébastien MORAND*
>>>>>>>>>>> Team Lead Solution Architect
>>>>>>>>>>> Technology & Operations / Digital Factory
>>>>>>>>>>> Veolia - Group Information Systems & Technology (IS&T)
>>>>>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08
>>>>>>>>>>> <+33%201%2085%2057%2071%2008>
>>>>>>>>>>> Bureau 0144C (Ouest)
>>>>>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France
>>>>>>>>>>> *www.veolia.com <http://www.veolia.com>*
>>>>>>>>>>> <http://www.veolia.com>
>>>>>>>>>>> <https://www.facebook.com/veoliaenvironment/>
>>>>>>>>>>> <https://www.youtube.com/user/veoliaenvironnement>
>>>>>>>>>>> <https://www.linkedin.com/company/veolia-environnement>
>>>>>>>>>>> <https://twitter.com/veolia>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>> --------------------------------
>>>>>>>>>>> This e-mail transmission (message and any attached files) may
>>>>>>>>>>> contain information that is proprietary, privileged and/or 
>>>>>>>>>>> confidential to
>>>>>>>>>>> Veolia Environnement and/or its affiliates and is intended 
>>>>>>>>>>> exclusively for
>>>>>>>>>>> the person(s) to whom it is addressed. If you are not the intended
>>>>>>>>>>> recipient, please notify the sender by return e-mail and delete all 
>>>>>>>>>>> copies
>>>>>>>>>>> of this e-mail, including all attachments. Unless expressly 
>>>>>>>>>>> authorized, any
>>>>>>>>>>> use, disclosure, publication, retransmission or dissemination of 
>>>>>>>>>>> this
>>>>>>>>>>> e-mail and/or of its attachments is strictly prohibited.
>>>>>>>>>>>
>>>>>>>>>>> Ce message electronique et ses fichiers attaches sont
>>>>>>>>>>> strictement confidentiels et peuvent contenir des elements dont 
>>>>>>>>>>> Veolia
>>>>>>>>>>> Environnement et/ou l'une de ses entites affiliees sont 
>>>>>>>>>>> proprietaires. Ils
>>>>>>>>>>> sont donc destines a l'usage de leurs seuls destinataires. Si vous 
>>>>>>>>>>> avez
>>>>>>>>>>> recu ce message par erreur, merci de le retourner a son emetteur et 
>>>>>>>>>>> de le
>>>>>>>>>>> detruire ainsi que toutes les pieces attachees. L'utilisation, la
>>>>>>>>>>> divulgation, la publication, la distribution, ou la reproduction non
>>>>>>>>>>> expressement autorisees de ce message et de ses pieces attachees 
>>>>>>>>>>> sont
>>>>>>>>>>> interdites.
>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>> --------------------------------
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ------------------------------------------------------------
>>>>>>>> --------------------------------
>>>>>>>> This e-mail transmission (message and any attached files) may
>>>>>>>> contain information that is proprietary, privileged and/or 
>>>>>>>> confidential to
>>>>>>>> Veolia Environnement and/or its affiliates and is intended exclusively 
>>>>>>>> for
>>>>>>>> the person(s) to whom it is addressed. If you are not the intended
>>>>>>>> recipient, please notify the sender by return e-mail and delete all 
>>>>>>>> copies
>>>>>>>> of this e-mail, including all attachments. Unless expressly 
>>>>>>>> authorized, any
>>>>>>>> use, disclosure, publication, retransmission or dissemination of this
>>>>>>>> e-mail and/or of its attachments is strictly prohibited.
>>>>>>>>
>>>>>>>> Ce message electronique et ses fichiers attaches sont strictement
>>>>>>>> confidentiels et peuvent contenir des elements dont Veolia 
>>>>>>>> Environnement
>>>>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
>>>>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>>>>>>>> message par erreur, merci de le retourner a son emetteur et de le 
>>>>>>>> detruire
>>>>>>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, 
>>>>>>>> la
>>>>>>>> publication, la distribution, ou la reproduction non expressement
>>>>>>>> autorisees de ce message et de ses pieces attachees sont interdites.
>>>>>>>> ------------------------------------------------------------
>>>>>>>> --------------------------------
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>> --------------------------------
>>>>>> This e-mail transmission (message and any attached files) may contain
>>>>>> information that is proprietary, privileged and/or confidential to Veolia
>>>>>> Environnement and/or its affiliates and is intended exclusively for the
>>>>>> person(s) to whom it is addressed. If you are not the intended recipient,
>>>>>> please notify the sender by return e-mail and delete all copies of this
>>>>>> e-mail, including all attachments. Unless expressly authorized, any use,
>>>>>> disclosure, publication, retransmission or dissemination of this e-mail
>>>>>> and/or of its attachments is strictly prohibited.
>>>>>>
>>>>>> Ce message electronique et ses fichiers attaches sont strictement
>>>>>> confidentiels et peuvent contenir des elements dont Veolia Environnement
>>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
>>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
>>>>>> message par erreur, merci de le retourner a son emetteur et de le 
>>>>>> detruire
>>>>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la
>>>>>> publication, la distribution, ou la reproduction non expressement
>>>>>> autorisees de ce message et de ses pieces attachees sont interdites.
>>>>>> ------------------------------------------------------------
>>>>>> --------------------------------
>>>>>>
>>>>>
>>>
>>
>
>
> ------------------------------------------------------------
> --------------------------------
> This e-mail transmission (message and any attached files) may contain
> information that is proprietary, privileged and/or confidential to Veolia
> Environnement and/or its affiliates and is intended exclusively for the
> person(s) to whom it is addressed. If you are not the intended recipient,
> please notify the sender by return e-mail and delete all copies of this
> e-mail, including all attachments. Unless expressly authorized, any use,
> disclosure, publication, retransmission or dissemination of this e-mail
> and/or of its attachments is strictly prohibited.
>
> Ce message electronique et ses fichiers attaches sont strictement
> confidentiels et peuvent contenir des elements dont Veolia Environnement
> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc
> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce
> message par erreur, merci de le retourner a son emetteur et de le detruire
> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la
> publication, la distribution, ou la reproduction non expressement
> autorisees de ce message et de ses pieces attachees sont interdites.
> ------------------------------------------------------------
> --------------------------------
>

Reply via email to