The Dataflow implementation when executing a batch pipeline does not parallelize dependent fused segments irrespective of the windowing function so #1 will fully execute before #2 starts.
On Sat, Jun 10, 2017 at 3:48 PM, Morand, Sebastien < sebastien.mor...@veolia.com> wrote: > Hi again, > > So it scales. Now my pipeline is running in two parts: > > 1. Reading files content (~400) and then GroupByKey > > 2. From GroupByKey transform and write in bigquery (~50M) > > 2 is scaling as expected. 1 takes about 25 minutes on my files and 2 about > 35 minutes scaling. But what if I want to Window so that the second part > starts sooner and the process is more parallel? > > I tried to add a 60 seconds FixedWindow time but it's not working (Job ID > : 2017-06-06_04_36_01-9894155361321571250) > > Regards, > > > *Sébastien MORAND* > Team Lead Solution Architect > Technology & Operations / Digital Factory > Veolia - Group Information Systems & Technology (IS&T) > Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 > <+33%201%2085%2057%2071%2008> > Bureau 0144C (Ouest) > 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France > *www.veolia.com <http://www.veolia.com>* > <http://www.veolia.com> > <https://www.facebook.com/veoliaenvironment/> > <https://www.youtube.com/user/veoliaenvironnement> > <https://www.linkedin.com/company/veolia-environnement> > <https://twitter.com/veolia> > > On 6 June 2017 at 01:24, Morand, Sebastien <sebastien.mor...@veolia.com> > wrote: > >> Fine, it scales ... Thank you very much. >> >> *Sébastien MORAND* >> Team Lead Solution Architect >> Technology & Operations / Digital Factory >> Veolia - Group Information Systems & Technology (IS&T) >> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >> <+33%201%2085%2057%2071%2008> >> Bureau 0144C (Ouest) >> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >> *www.veolia.com <http://www.veolia.com>* >> <http://www.veolia.com> >> <https://www.facebook.com/veoliaenvironment/> >> <https://www.youtube.com/user/veoliaenvironnement> >> <https://www.linkedin.com/company/veolia-environnement> >> <https://twitter.com/veolia> >> >> On 6 June 2017 at 00:31, Morand, Sebastien <sebastien.mor...@veolia.com> >> wrote: >> >>> Thank you Eugene, >>> >>> I'm trying the Sourabh way (and yours since it looks like it's the same >>> idea) and let you know if it's better. >>> >>> Regards, >>> >>> *Sébastien MORAND* >>> Team Lead Solution Architect >>> Technology & Operations / Digital Factory >>> Veolia - Group Information Systems & Technology (IS&T) >>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>> <+33%201%2085%2057%2071%2008> >>> Bureau 0144C (Ouest) >>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>> *www.veolia.com <http://www.veolia.com>* >>> <http://www.veolia.com> >>> <https://www.facebook.com/veoliaenvironment/> >>> <https://www.youtube.com/user/veoliaenvironnement> >>> <https://www.linkedin.com/company/veolia-environnement> >>> <https://twitter.com/veolia> >>> >>> On 5 June 2017 at 23:31, Eugene Kirpichov <kirpic...@google.com> wrote: >>> >>>> I looked at the job ID you quoted, and yes, it suffers from excessive >>>> fusion. I wish we had tooling to automatically detect that and emit a >>>> warning, but we don't have that yet. >>>> >>>> Here's an example of how you can break fusion: https://github.com/apa >>>> che/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apa >>>> che/beam/sdk/io/jdbc/JdbcIO.java#L326-L339 >>>> >>>> On Mon, Jun 5, 2017 at 1:27 PM Sourabh Bajaj <sourabhba...@google.com> >>>> wrote: >>>> >>>>> Yes you're correct. >>>>> >>>>> On Mon, Jun 5, 2017 at 1:23 PM Morand, Sebastien < >>>>> sebastien.mor...@veolia.com> wrote: >>>>> >>>>>> Between parenthesis of each step I meant the number of records in >>>>>> output >>>>>> >>>>>> When I ungroup I send again the 200 data? not the 20M? >>>>>> >>>>>> Shouldn't I do instead: >>>>>> Read (200) -> GroupByKey (200) -> UnGroup(20M) -> combine (20M) -> >>>>>> clean (20M) -> filter (20M) -> insert >>>>>> >>>>>> >>>>>> *Sébastien MORAND* >>>>>> Team Lead Solution Architect >>>>>> Technology & Operations / Digital Factory >>>>>> Veolia - Group Information Systems & Technology (IS&T) >>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>>>>> <+33%201%2085%2057%2071%2008> >>>>>> Bureau 0144C (Ouest) >>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>>>>> *www.veolia.com <http://www.veolia.com>* >>>>>> <http://www.veolia.com> >>>>>> <https://www.facebook.com/veoliaenvironment/> >>>>>> <https://www.youtube.com/user/veoliaenvironnement> >>>>>> <https://www.linkedin.com/company/veolia-environnement> >>>>>> <https://twitter.com/veolia> >>>>>> >>>>>> On 5 June 2017 at 22:17, Sourabh Bajaj <sourabhba...@google.com> >>>>>> wrote: >>>>>> >>>>>>> I think you need to do something like: >>>>>>> >>>>>>> Read (200) -> GroupByKey (200) -> UnGroup(200) [Not this can be on >>>>>>> 200 different workers] -> combine (20M) -> clean (20M) -> filter >>>>>>> (20M) -> insert >>>>>>> >>>>>>> On Mon, Jun 5, 2017 at 1:14 PM Morand, Sebastien < >>>>>>> sebastien.mor...@veolia.com> wrote: >>>>>>> >>>>>>>> Yes fusion looks like my problem. A job ID to look at: >>>>>>>> 2017-06-05_10_14_25-5856213199384263626. >>>>>>>> >>>>>>>> The point is in your link: >>>>>>>> << >>>>>>>> For example, one case in which fusion can limit Dataflow's ability >>>>>>>> to optimize worker usage is a "high fan-out" ParDo. In such an >>>>>>>> operation, >>>>>>>> you might have an input collection with relatively few elements, but >>>>>>>> the >>>>>>>> ParDo produces an output with hundreds or thousands of times as many >>>>>>>> elements, followed by another ParDo >>>>>>>> >> >>>>>>>> >>>>>>>> This is exactly what I'm doing in the step >>>>>>>> transform-combine-7d5ad942 in the above job id. >>>>>>>> >>>>>>>> As fas as I understand, I should create a GroupByKey after the >>>>>>>> transform-combine-7d5ad942 on a unique field and then ungroup the data? >>>>>>>> (meaning I add two operations in the pipeline to help the worker? >>>>>>>> >>>>>>>> Right now: >>>>>>>> Read (200) -> combine (20M) -> clean (20M) -> filter (20M) -> insert >>>>>>>> >>>>>>>> Will become: >>>>>>>> Read (200) -> combine (20M) -> GroupByKey (20M) -> ungroup (20M) -> >>>>>>>> clean (20M) -> filter (20M) -> insert >>>>>>>> >>>>>>>> It this the right way? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *Sébastien MORAND* >>>>>>>> Team Lead Solution Architect >>>>>>>> Technology & Operations / Digital Factory >>>>>>>> Veolia - Group Information Systems & Technology (IS&T) >>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>>>>>>> <+33%201%2085%2057%2071%2008> >>>>>>>> Bureau 0144C (Ouest) >>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>>>>>>> *www.veolia.com <http://www.veolia.com>* >>>>>>>> <http://www.veolia.com> >>>>>>>> <https://www.facebook.com/veoliaenvironment/> >>>>>>>> <https://www.youtube.com/user/veoliaenvironnement> >>>>>>>> <https://www.linkedin.com/company/veolia-environnement> >>>>>>>> <https://twitter.com/veolia> >>>>>>>> >>>>>>>> On 5 June 2017 at 21:42, Eugene Kirpichov <kirpic...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Do you have a Dataflow job ID to look at? >>>>>>>>> It might be due to fusion https://cloud.google.co >>>>>>>>> m/dataflow/service/dataflow-service-desc#preventing-fusion >>>>>>>>> >>>>>>>>> On Mon, Jun 5, 2017 at 12:13 PM Prabeesh K. <prabsma...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Please try using *--worker_machine_type* n1-standard-4 or >>>>>>>>>> n1-standard-8 >>>>>>>>>> >>>>>>>>>> On 5 June 2017 at 23:08, Morand, Sebastien < >>>>>>>>>> sebastien.mor...@veolia.com> wrote: >>>>>>>>>> >>>>>>>>>>> I do have a problem with my tries to test scaling on dataflow. >>>>>>>>>>> >>>>>>>>>>> My dataflow is pretty simple: I get a list of files from pubsub, >>>>>>>>>>> so the number of files I'm going to use to feed the flow is well >>>>>>>>>>> known at >>>>>>>>>>> the begining. Here are my steps: >>>>>>>>>>> Let's say I have 200 files containing about 20,000,000 of records >>>>>>>>>>> >>>>>>>>>>> - *First Step:* Read file contents from storage: files are >>>>>>>>>>> .tar.gz containing each 4 files (CSV). I return the file content >>>>>>>>>>> as the >>>>>>>>>>> whole in a record >>>>>>>>>>> *OUT:* 200 records (one for each file containing the data of >>>>>>>>>>> all 4 files). Bascillacy it's a dict : {file1: content_of_file1, >>>>>>>>>>> file2: >>>>>>>>>>> content_of_file2, etc...} >>>>>>>>>>> >>>>>>>>>>> - *Second step:* Joining the data of the 4 files in one >>>>>>>>>>> record (the main file contains foreign key to get information >>>>>>>>>>> from the >>>>>>>>>>> other files) >>>>>>>>>>> *OUT:* 20,000,000 records each for every line in the files. >>>>>>>>>>> Each record is a list of string >>>>>>>>>>> >>>>>>>>>>> - *Third step:* cleaning data (convert to prepare >>>>>>>>>>> integration in bigquery) and set them as a dict where keys are >>>>>>>>>>> bigquery >>>>>>>>>>> column name. >>>>>>>>>>> *OUT:* 20,000,000 records as dict for each record >>>>>>>>>>> >>>>>>>>>>> - *Fourth step:* insert into bigquery >>>>>>>>>>> >>>>>>>>>>> So the first step return 200 records, but I have 20,000,000 >>>>>>>>>>> records to insert. >>>>>>>>>>> This takes about 1 hour and half and always use 1 worker ... >>>>>>>>>>> >>>>>>>>>>> If I manually set the number of workers, it's not really faster. >>>>>>>>>>> So for an unknow reason, it doesn't scale, any ideas how to do it? >>>>>>>>>>> >>>>>>>>>>> Thanks for any help. >>>>>>>>>>> >>>>>>>>>>> *Sébastien MORAND* >>>>>>>>>>> Team Lead Solution Architect >>>>>>>>>>> Technology & Operations / Digital Factory >>>>>>>>>>> Veolia - Group Information Systems & Technology (IS&T) >>>>>>>>>>> Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 >>>>>>>>>>> <+33%201%2085%2057%2071%2008> >>>>>>>>>>> Bureau 0144C (Ouest) >>>>>>>>>>> 30, rue Madeleine-Vionnet - 93300 Aubervilliers, France >>>>>>>>>>> *www.veolia.com <http://www.veolia.com>* >>>>>>>>>>> <http://www.veolia.com> >>>>>>>>>>> <https://www.facebook.com/veoliaenvironment/> >>>>>>>>>>> <https://www.youtube.com/user/veoliaenvironnement> >>>>>>>>>>> <https://www.linkedin.com/company/veolia-environnement> >>>>>>>>>>> <https://twitter.com/veolia> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>> -------------------------------- >>>>>>>>>>> This e-mail transmission (message and any attached files) may >>>>>>>>>>> contain information that is proprietary, privileged and/or >>>>>>>>>>> confidential to >>>>>>>>>>> Veolia Environnement and/or its affiliates and is intended >>>>>>>>>>> exclusively for >>>>>>>>>>> the person(s) to whom it is addressed. If you are not the intended >>>>>>>>>>> recipient, please notify the sender by return e-mail and delete all >>>>>>>>>>> copies >>>>>>>>>>> of this e-mail, including all attachments. Unless expressly >>>>>>>>>>> authorized, any >>>>>>>>>>> use, disclosure, publication, retransmission or dissemination of >>>>>>>>>>> this >>>>>>>>>>> e-mail and/or of its attachments is strictly prohibited. >>>>>>>>>>> >>>>>>>>>>> Ce message electronique et ses fichiers attaches sont >>>>>>>>>>> strictement confidentiels et peuvent contenir des elements dont >>>>>>>>>>> Veolia >>>>>>>>>>> Environnement et/ou l'une de ses entites affiliees sont >>>>>>>>>>> proprietaires. Ils >>>>>>>>>>> sont donc destines a l'usage de leurs seuls destinataires. Si vous >>>>>>>>>>> avez >>>>>>>>>>> recu ce message par erreur, merci de le retourner a son emetteur et >>>>>>>>>>> de le >>>>>>>>>>> detruire ainsi que toutes les pieces attachees. L'utilisation, la >>>>>>>>>>> divulgation, la publication, la distribution, ou la reproduction non >>>>>>>>>>> expressement autorisees de ce message et de ses pieces attachees >>>>>>>>>>> sont >>>>>>>>>>> interdites. >>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>> -------------------------------- >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> ------------------------------------------------------------ >>>>>>>> -------------------------------- >>>>>>>> This e-mail transmission (message and any attached files) may >>>>>>>> contain information that is proprietary, privileged and/or >>>>>>>> confidential to >>>>>>>> Veolia Environnement and/or its affiliates and is intended exclusively >>>>>>>> for >>>>>>>> the person(s) to whom it is addressed. If you are not the intended >>>>>>>> recipient, please notify the sender by return e-mail and delete all >>>>>>>> copies >>>>>>>> of this e-mail, including all attachments. Unless expressly >>>>>>>> authorized, any >>>>>>>> use, disclosure, publication, retransmission or dissemination of this >>>>>>>> e-mail and/or of its attachments is strictly prohibited. >>>>>>>> >>>>>>>> Ce message electronique et ses fichiers attaches sont strictement >>>>>>>> confidentiels et peuvent contenir des elements dont Veolia >>>>>>>> Environnement >>>>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc >>>>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce >>>>>>>> message par erreur, merci de le retourner a son emetteur et de le >>>>>>>> detruire >>>>>>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, >>>>>>>> la >>>>>>>> publication, la distribution, ou la reproduction non expressement >>>>>>>> autorisees de ce message et de ses pieces attachees sont interdites. >>>>>>>> ------------------------------------------------------------ >>>>>>>> -------------------------------- >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> ------------------------------------------------------------ >>>>>> -------------------------------- >>>>>> This e-mail transmission (message and any attached files) may contain >>>>>> information that is proprietary, privileged and/or confidential to Veolia >>>>>> Environnement and/or its affiliates and is intended exclusively for the >>>>>> person(s) to whom it is addressed. If you are not the intended recipient, >>>>>> please notify the sender by return e-mail and delete all copies of this >>>>>> e-mail, including all attachments. Unless expressly authorized, any use, >>>>>> disclosure, publication, retransmission or dissemination of this e-mail >>>>>> and/or of its attachments is strictly prohibited. >>>>>> >>>>>> Ce message electronique et ses fichiers attaches sont strictement >>>>>> confidentiels et peuvent contenir des elements dont Veolia Environnement >>>>>> et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc >>>>>> destines a l'usage de leurs seuls destinataires. Si vous avez recu ce >>>>>> message par erreur, merci de le retourner a son emetteur et de le >>>>>> detruire >>>>>> ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la >>>>>> publication, la distribution, ou la reproduction non expressement >>>>>> autorisees de ce message et de ses pieces attachees sont interdites. >>>>>> ------------------------------------------------------------ >>>>>> -------------------------------- >>>>>> >>>>> >>> >> > > > ------------------------------------------------------------ > -------------------------------- > This e-mail transmission (message and any attached files) may contain > information that is proprietary, privileged and/or confidential to Veolia > Environnement and/or its affiliates and is intended exclusively for the > person(s) to whom it is addressed. If you are not the intended recipient, > please notify the sender by return e-mail and delete all copies of this > e-mail, including all attachments. Unless expressly authorized, any use, > disclosure, publication, retransmission or dissemination of this e-mail > and/or of its attachments is strictly prohibited. > > Ce message electronique et ses fichiers attaches sont strictement > confidentiels et peuvent contenir des elements dont Veolia Environnement > et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc > destines a l'usage de leurs seuls destinataires. Si vous avez recu ce > message par erreur, merci de le retourner a son emetteur et de le detruire > ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la > publication, la distribution, ou la reproduction non expressement > autorisees de ce message et de ses pieces attachees sont interdites. > ------------------------------------------------------------ > -------------------------------- >