Thanks Pioter and Guowei for the inputs. I understand that this can lead to
deadlocks and the right solution should be based on FLIP-27.
Will look further into FLIP-27 for this.

Thanks,
Robin


On Wed, Mar 24, 2021 at 2:21 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi Robin,
>
> Flink has a functionality to block reading from an input. It's not
> documented, as it's not fully working in Streaming. Take a look at the
> `org.apache.flink.streaming.api.operators.InputSelectable` class and how
> it's being used (you have to implement your own operator to use it). It has
> two limitations:
>
> 1. As Guowei Ma mentioned, if there is a diamond pattern in the job graph,
> this can lead to deadlocks in the job.
> 2. When blocking reads from an input, you are blocking checkpoints from
> progressing, so currently there is a safety check that disallows using
> `InputSelectable` when checkpointing is enabled.
>
> Having said that, and as Guowei mentioned, this doesn't seem to be a
> correct solution for your problem. Throttling FLIP-27 sources based on
> watermarks progression is the correct approach here, that avoids above
> issues.
>
> Best,
> Piotrek
>
> śr., 24 mar 2021 o 08:23 Guowei Ma <guowei....@gmail.com> napisał(a):
>
> > Hi Robin
> >
> > Thank you for bringing up this discussion. AFAIK there are many same
> > requirements.But it might lead to a deadlock if we depend on pausing one
> > input of two to align the watermark.
> > After the FLIP-27 Flink would introduce some new mechanism for aligning
> the
> > watermark of different sources .Maybe @Becket could give some inputs or
> > some plans for this.
> >
> > Best,
> > Guowei
> >
> >
> > On Wed, Mar 24, 2021 at 1:46 PM Robin KC <arora.k.ro...@gmail.com>
> wrote:
> >
> > > Hi all,
> > >
> > > The issue has been discussed before here -
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html
> > >
> > > Our use case requires event time join of two streams and we use
> > > ConnectedStreams for the same. Within the CoProcessFunction, we buffer
> > > records until watermark and perform the join and business logic based
> on
> > > watermark. The issue is if one stream is slower than the other, the
> > buffer
> > > (a rocksdb state) is unnecessarily filled by continuously reading from
> > the
> > > fast stream.
> > >
> > > I took an inspiration from a response on the same thread
> > > <
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html
> > > >
> > > by Elias Levy -
> > >
> > > The idea I was suggesting is not for operators to block an input.
> > Rather,
> > > it is that they selectively choose from which input to process the next
> > > message from based on their timestamp, so long as there are buffered
> > > messages waiting to be processed.  That is a best-effort alignment
> > > strategy.  Seems to work relatively well in practice, at least within
> > Kafka
> > > Streams.
> > >
> > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
> both
> > > its inputs.  Instead, it could keep them separate and selectively
> consume
> > > from the one that had a buffer available, and if both have buffers
> > > available, from the buffer with the messages with a lower timestamp.
> > >
> > > And attempted a POC implementation of CoBackpressure whenever 2 streams
> > are
> > > connected. This is committed in a branch in my own fork -
> > >
> > >
> >
> https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1
> > >
> > > The approach is
> > >
> > >    1. Provide a new method setCoBackpressureThreshold on
> ConnectedStream
> > >    2. Pass the user-provided CoBackpressureThreshold through various
> > >    classes till StreamTwoInputProcessorFactory.
> > >    3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory
> that
> > >    pauses input1 or input2 if the diff between watermarks is greater
> than
> > > the
> > >    threshold. In other words, it selectively chooses the next input
> which
> > > is
> > >    lagging behind.
> > >
> > > Some key points
> > >
> > >    1. One benefit of this approach is that a user can configure
> > >    CoBackpressureThreshold at any join and it is not a global config.
> > >    2. IntervalJoins internally use ConnectedStreams and therefore this
> > will
> > >    work for intervalJoins as well.
> > >    3. Other window joins do not use ConnectedStreams but use
> > >    UnionedStreams. Will have to find a solution for that.
> > >    4. I believe MultipleInputStreams will also need similar
> functioality.
> > >    5. IMP: This approach does not solve the problem of having
> event-time
> > >    skew within different partitions/shards of the same input source. It
> > > only
> > >    solves for event time alignment of different sources.
> > >
> > > Looking forward to inputs on the same. If this seems like a feasible
> > > approach, I can take it forward and implement code with fixes for
> > > identified gaps and appropriate test cases.
> > >
> > > Thanks,
> > > Robin
> > >
> >
>

Reply via email to