Re: Scaling dataflow python SDK 2.0.0

2017-06-11 Thread Eugene Kirpichov
Oh sorry, I think I misunderstood you. Looks like the tar.gz file contains the actual data, not just filenames. Then my suggestion doesn't apply. On Sun, Jun 11, 2017, 9:54 AM Eugene Kirpichov wrote: > Seems like there's a lot of parallelism you could exploit here and make > the pipeline much, m

Re: Scaling dataflow python SDK 2.0.0

2017-06-11 Thread Eugene Kirpichov
Seems like there's a lot of parallelism you could exploit here and make the pipeline much, much faster. I suggest you: 1) in the main program, even before starting the pipeline, read the tar.gz file and extract the 4 filenames. 2) create the pipeline containing 1 TextIO.Read transform per filename

Re: Scaling dataflow python SDK 2.0.0

2017-06-11 Thread Morand, Sebastien
Hi, It gets a list of filename while building the pipeline (*.tar.gz containing each 4 files). The first step of the pipeline read every line of each files and return a pcollection where each value is a dict containing the content of each file in the archive. Example: - File file1.tar.gz con

Re: Scaling dataflow python SDK 2.0.0

2017-06-10 Thread Eugene Kirpichov
Hi Sebastien, Can you tell more about how your "step 1" works? I looked at the logs of your job and it's taking suspiciously long (~20 minutes) to produce the ~400 elements, and it's doing that sequentially. Is it possible to parallelize step 1? On Sat, Jun 10, 2017 at 5:53 PM Lukasz Cwik wrote:

Re: Scaling dataflow python SDK 2.0.0

2017-06-10 Thread Lukasz Cwik
The Dataflow implementation when executing a batch pipeline does not parallelize dependent fused segments irrespective of the windowing function so #1 will fully execute before #2 starts. On Sat, Jun 10, 2017 at 3:48 PM, Morand, Sebastien < sebastien.mor...@veolia.com> wrote: > Hi again, > > So i

Re: Scaling dataflow python SDK 2.0.0

2017-06-10 Thread Morand, Sebastien
Hi again, So it scales. Now my pipeline is running in two parts: 1. Reading files content (~400) and then GroupByKey 2. From GroupByKey transform and write in bigquery (~50M) 2 is scaling as expected. 1 takes about 25 minutes on my files and 2 about 35 minutes scaling. But what if I want

Re: Scaling dataflow python SDK 2.0.0

2017-06-05 Thread Morand, Sebastien
Fine, it scales ... Thank you very much. *Sébastien MORAND* Team Lead Solution Architect Technology & Operations / Digital Factory Veolia - Group Information Systems & Technology (IS&T) Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 Bureau 0144C (Ouest) 30, rue Madeleine-Vionnet - 93300 Aube

Re: Scaling dataflow python SDK 2.0.0

2017-06-05 Thread Morand, Sebastien
Thank you Eugene, I'm trying the Sourabh way (and yours since it looks like it's the same idea) and let you know if it's better. Regards, *Sébastien MORAND* Team Lead Solution Architect Technology & Operations / Digital Factory Veolia - Group Information Systems & Technology (IS&T) Cell.: +33 7

Re: Scaling dataflow python SDK 2.0.0

2017-06-05 Thread Eugene Kirpichov
I looked at the job ID you quoted, and yes, it suffers from excessive fusion. I wish we had tooling to automatically detect that and emit a warning, but we don't have that yet. Here's an example of how you can break fusion: https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java

Re: Scaling dataflow python SDK 2.0.0

2017-06-05 Thread Sourabh Bajaj
Yes you're correct. On Mon, Jun 5, 2017 at 1:23 PM Morand, Sebastien < sebastien.mor...@veolia.com> wrote: > Between parenthesis of each step I meant the number of records in output > > When I ungroup I send again the 200 data? not the 20M? > > Shouldn't I do instead: > Read (200) -> GroupByKey

Re: Scaling dataflow python SDK 2.0.0

2017-06-05 Thread Morand, Sebastien
Between parenthesis of each step I meant the number of records in output When I ungroup I send again the 200 data? not the 20M? Shouldn't I do instead: Read (200) -> GroupByKey (200) -> UnGroup(20M) -> combine (20M) -> clean (20M) -> filter (20M) -> insert *Sébastien MORAND* Team Lead Solution

Re: Scaling dataflow python SDK 2.0.0

2017-06-05 Thread Sourabh Bajaj
I think you need to do something like: Read (200) -> GroupByKey (200) -> UnGroup(200) [Not this can be on 200 different workers] -> combine (20M) -> clean (20M) -> filter (20M) -> insert On Mon, Jun 5, 2017 at 1:14 PM Morand, Sebastien < sebastien.mor...@veolia.com> wrote: > Yes fusion looks li

Re: Scaling dataflow python SDK 2.0.0

2017-06-05 Thread Morand, Sebastien
Yes fusion looks like my problem. A job ID to look at: 2017-06-05_10_14_25-5856213199384263626. The point is in your link: << For example, one case in which fusion can limit Dataflow's ability to optimize worker usage is a "high fan-out" ParDo. In such an operation, you might have an input collect

Re: Scaling dataflow python SDK 2.0.0

2017-06-05 Thread Morand, Sebastien
I tried this, this is going fast but It doesn't scale ... still one worker. *Sébastien MORAND* Team Lead Solution Architect Technology & Operations / Digital Factory Veolia - Group Information Systems & Technology (IS&T) Cell.: +33 7 52 66 20 81 / Direct: +33 1 85 57 71 08 Bureau 0144C (Ouest) 30,

Re: Scaling dataflow python SDK 2.0.0

2017-06-05 Thread Eugene Kirpichov
Do you have a Dataflow job ID to look at? It might be due to fusion https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion On Mon, Jun 5, 2017 at 12:13 PM Prabeesh K. wrote: > Please try using *--worker_machine_type* n1-standard-4 or n1-standard-8 > > On 5 June 2017 at

Re: Scaling dataflow python SDK 2.0.0

2017-06-05 Thread Prabeesh K.
Please try using *--worker_machine_type* n1-standard-4 or n1-standard-8 On 5 June 2017 at 23:08, Morand, Sebastien wrote: > I do have a problem with my tries to test scaling on dataflow. > > My dataflow is pretty simple: I get a list of files from pubsub, so the > number of files I'm going to us

Scaling dataflow python SDK 2.0.0

2017-06-05 Thread Morand, Sebastien
I do have a problem with my tries to test scaling on dataflow. My dataflow is pretty simple: I get a list of files from pubsub, so the number of files I'm going to use to feed the flow is well known at the begining. Here are my steps: Let's say I have 200 files containing about 20,000,000 of recor