Hi Andreas,

Julian already offered a good explanation, so here is one possible
solution: you could try to run the whole first subpipeline with parallelism
X and the second with P-X. However, most likely you need to run with P>X to
finish in time.

Another way is to use DataStream (your program is not doing any
aggregation/join, so streaming is indeed a good fit) and use STREAMING mode
(the only execution mode for DataStream in Flink <1.12). There, all tasks
are active all the time and records are streamed through as you expect.
Since we plan to phase out DataSet API eventually, it's also the more
future-proof solution.

Best,

Arvid

On Tue, Feb 16, 2021 at 11:37 PM Jaffe, Julian <julianja...@activision.com>
wrote:

> Hey Andreas,
>
>
>
> Have a read through
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_execution_mode.html#task-scheduling-and-network-shuffle
> and in particular the BATCH Execution Mode section. Your intuition is
> mostly correct – because your operators can’t be chained due to the
> rebalancing, if you execute your pipeline in batch mode downstream tasks
> will not begin processing data until the upstream tasks have finished all
> of their processing. If you can forgo the higher resiliency and lower
> resource requirements of executing in batch mode, you could try running
> your pipeline in streaming mode over bounded data.
>
>
>
> Julian
>
>
>
> *From: *"Hailu, Andreas [Engineering]" <andreas.ha...@gs.com>
> *Date: *Tuesday, February 16, 2021 at 2:00 PM
> *To: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Understanding blocking behavior
>
>
>
> Hi folks, I’m trying to get a better understanding of what operations
> result in blocked partitions. I’ve got a batch-processing job that reads
> from 2 sources, and then performs a series of Maps/Filters/CoGroups all
> with the same parallelism to create a final DataSet to be written to two
> different Sinks.
>
>
>
> The kind of Sink a record in the DataSet is written to is dependent on the
> record’s properties, so we use a Map + Filter operation to just pull the
> desired records for the Sink. The latter portion of the graph looks like
> this:
>
>
>
> DataSet -> Map + FilterA (with parallelism P) -> SinkA (with parallelism X)
>
> DataSet -> Map + FilterB (with parallelism P) -> SinkB (with parallelism
> P-X)
>
>
>
> Parallelisms for the output into SinkA and SinkB are different than the
> parallelism used in the Map + Filter operation in order to control the
> resulting total number of output files. What I observe is that all of the
> records must first be sent to the Map + Filter operators, and only once
> after all records are received, the Sink begins to output records. This
> shows in the Flink Dashboard as the Sinks remaining in ‘CREATED’ states
> while the Map + Filter operators are ‘RUNNING’. At scale, where the DataSet
> may contain billions of records, this ends up taking hours. Ideally, the
> records are streamed through to the Sink as they go through the Map +
> Filter.
>
>
>
> Is this blocking behavior due to the fact that the Map + Filter operators
> must re-distribute the records as they’re moving to an operator that has a
> lesser parallelism?
>
>
>
> ____________
>
>
>
> *Andreas Hailu*
>
> *Data Lake Engineering *| Goldman Sachs
>
>
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__www.gs.com_privacy-2Dnotices&d=DwMFAg&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=Jv2k2wNGkO1uo3rmHmVGS_JzHaIN5ImVWGCtAC-R2qw&s=Gle60a8I-K0ybZirKKcun1OZyYPFPZg1I-61NIgTFiw&e=>
>
>

Reply via email to