Hey Andreas, Have a read through https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_execution_mode.html#task-scheduling-and-network-shuffle and in particular the BATCH Execution Mode section. Your intuition is mostly correct – because your operators can’t be chained due to the rebalancing, if you execute your pipeline in batch mode downstream tasks will not begin processing data until the upstream tasks have finished all of their processing. If you can forgo the higher resiliency and lower resource requirements of executing in batch mode, you could try running your pipeline in streaming mode over bounded data.
Julian From: "Hailu, Andreas [Engineering]" <andreas.ha...@gs.com> Date: Tuesday, February 16, 2021 at 2:00 PM To: "user@flink.apache.org" <user@flink.apache.org> Subject: Understanding blocking behavior Hi folks, I’m trying to get a better understanding of what operations result in blocked partitions. I’ve got a batch-processing job that reads from 2 sources, and then performs a series of Maps/Filters/CoGroups all with the same parallelism to create a final DataSet to be written to two different Sinks. The kind of Sink a record in the DataSet is written to is dependent on the record’s properties, so we use a Map + Filter operation to just pull the desired records for the Sink. The latter portion of the graph looks like this: DataSet -> Map + FilterA (with parallelism P) -> SinkA (with parallelism X) DataSet -> Map + FilterB (with parallelism P) -> SinkB (with parallelism P-X) Parallelisms for the output into SinkA and SinkB are different than the parallelism used in the Map + Filter operation in order to control the resulting total number of output files. What I observe is that all of the records must first be sent to the Map + Filter operators, and only once after all records are received, the Sink begins to output records. This shows in the Flink Dashboard as the Sinks remaining in ‘CREATED’ states while the Map + Filter operators are ‘RUNNING’. At scale, where the DataSet may contain billions of records, this ends up taking hours. Ideally, the records are streamed through to the Sink as they go through the Map + Filter. Is this blocking behavior due to the fact that the Map + Filter operators must re-distribute the records as they’re moving to an operator that has a lesser parallelism? ____________ Andreas Hailu Data Lake Engineering | Goldman Sachs ________________________________ Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<https://urldefense.proofpoint.com/v2/url?u=http-3A__www.gs.com_privacy-2Dnotices&d=DwMFAg&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=Jv2k2wNGkO1uo3rmHmVGS_JzHaIN5ImVWGCtAC-R2qw&s=Gle60a8I-K0ybZirKKcun1OZyYPFPZg1I-61NIgTFiw&e=>