Hi Fabian, I see, thank's for the quick explanation.
Cheers, Konstantin On 04.01.2017 14:15, Fabian Hueske wrote: > Hi Konstantin, > > the DataSet API tries to execute all operators as soon as possible. > > I assume that in your case, Flink does not do this because it tries to > avoid a deadlock. > A dataflow which replicates data from the same source and joins it again > might get deadlocked because all pipelines need to make progress in > order to finish the source. > > Think of a simple example like this: > > /-- Map1 --\ > Src --< >-Join > \-- Map2 --/ > > If the join is executed as a hash join, one input (Map1) is used to > build a hash table. Only once the hash table is built, the other input > (Map2) can be consumed. > If both Map operators would run at the same time, Map2 would stall at > some point because it cannot emit anymore data due to the backpressure > of the not-yet-opened probe input of the hash join. > Once Map2 stalls, the Source would stall and Map1 could not continue to > finish the build side. At this point we have a deadlock. > > Flink detects these situations and adds an artificial pipeline breaker > in the dataflow to prevent deadlocks. Due to the pipeline breaker, the > build side is completed before the probe side input is processed. > > This also answers the question, which operator is executed first: the > operator on the build side of the first join. Hence the join strategy of > the optimizer (BUILD_FIRST, BUILD_SECONS) decides. > You can also give a manual JoinHint to control that. If you give a > SORT_MERGE hint, all three operators should run concurrently because > both join input will be concurrently consumed for sorting. > > Best, Fabian > > > 2017-01-04 13:30 GMT+01:00 Konstantin Knauf > <konstantin.kn...@tngtech.com <mailto:konstantin.kn...@tngtech.com>>: > > Hi everyone, > > I have a basic question regarding scheduling of batch programs. Let's > take the following graph: > > -> Group Combine -> ... > / > Source ----> Group Combine -> ... > \ > -> Map -> ... > > So, a source and followed by three operators with ship strategy > "Forward" and exchange mode "pipelined". > > The three flows are later joined again, so that this results in a single > job. > > When the job is started, first, only one of the operators immediately > receive the input read by the source and can therefore run concurrently > with the source. Once the source is finished, the other two operators > are scheduled. > > Two questions about this: > > 1) Why doesn't the source forward the records to all three operators > while still running? > 2) How does the jobmanager decide, which of the three operators > receivese the pipelined data first? > > Cheers and Thanks, > > Konstantin > > > -- > Konstantin Knauf * konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com> * +49-174-3413182 > <tel:%2B49-174-3413182> > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082
signature.asc
Description: OpenPGP digital signature