Hi Andreas, Julian already offered a good explanation, so here is one possible solution: you could try to run the whole first subpipeline with parallelism X and the second with P-X. However, most likely you need to run with P>X to finish in time.
Another way is to use DataStream (your program is not doing any aggregation/join, so streaming is indeed a good fit) and use STREAMING mode (the only execution mode for DataStream in Flink <1.12). There, all tasks are active all the time and records are streamed through as you expect. Since we plan to phase out DataSet API eventually, it's also the more future-proof solution. Best, Arvid On Tue, Feb 16, 2021 at 11:37 PM Jaffe, Julian <julianja...@activision.com> wrote: > Hey Andreas, > > > > Have a read through > https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_execution_mode.html#task-scheduling-and-network-shuffle > and in particular the BATCH Execution Mode section. Your intuition is > mostly correct – because your operators can’t be chained due to the > rebalancing, if you execute your pipeline in batch mode downstream tasks > will not begin processing data until the upstream tasks have finished all > of their processing. If you can forgo the higher resiliency and lower > resource requirements of executing in batch mode, you could try running > your pipeline in streaming mode over bounded data. > > > > Julian > > > > *From: *"Hailu, Andreas [Engineering]" <andreas.ha...@gs.com> > *Date: *Tuesday, February 16, 2021 at 2:00 PM > *To: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Understanding blocking behavior > > > > Hi folks, I’m trying to get a better understanding of what operations > result in blocked partitions. I’ve got a batch-processing job that reads > from 2 sources, and then performs a series of Maps/Filters/CoGroups all > with the same parallelism to create a final DataSet to be written to two > different Sinks. > > > > The kind of Sink a record in the DataSet is written to is dependent on the > record’s properties, so we use a Map + Filter operation to just pull the > desired records for the Sink. The latter portion of the graph looks like > this: > > > > DataSet -> Map + FilterA (with parallelism P) -> SinkA (with parallelism X) > > DataSet -> Map + FilterB (with parallelism P) -> SinkB (with parallelism > P-X) > > > > Parallelisms for the output into SinkA and SinkB are different than the > parallelism used in the Map + Filter operation in order to control the > resulting total number of output files. What I observe is that all of the > records must first be sent to the Map + Filter operators, and only once > after all records are received, the Sink begins to output records. This > shows in the Flink Dashboard as the Sinks remaining in ‘CREATED’ states > while the Map + Filter operators are ‘RUNNING’. At scale, where the DataSet > may contain billions of records, this ends up taking hours. Ideally, the > records are streamed through to the Sink as they go through the Map + > Filter. > > > > Is this blocking behavior due to the fact that the Map + Filter operators > must re-distribute the records as they’re moving to an operator that has a > lesser parallelism? > > > > ____________ > > > > *Andreas Hailu* > > *Data Lake Engineering *| Goldman Sachs > > > > > ------------------------------ > > > Your Personal Data: We may collect and process information about you that > may be subject to data protection laws. For more information about how we > use and disclose your personal data, how we protect your information, our > legal basis to use your information, your rights and who you can contact, > please refer to: www.gs.com/privacy-notices > <https://urldefense.proofpoint.com/v2/url?u=http-3A__www.gs.com_privacy-2Dnotices&d=DwMFAg&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=Jv2k2wNGkO1uo3rmHmVGS_JzHaIN5ImVWGCtAC-R2qw&s=Gle60a8I-K0ybZirKKcun1OZyYPFPZg1I-61NIgTFiw&e=> > >