Hi Robin

Thank you for bringing up this discussion. AFAIK there are many same
requirements.But it might lead to a deadlock if we depend on pausing one
input of two to align the watermark.
After the FLIP-27 Flink would introduce some new mechanism for aligning the
watermark of different sources .Maybe @Becket could give some inputs or
some plans for this.

Best,
Guowei


On Wed, Mar 24, 2021 at 1:46 PM Robin KC <arora.k.ro...@gmail.com> wrote:

> Hi all,
>
> The issue has been discussed before here -
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html
>
> Our use case requires event time join of two streams and we use
> ConnectedStreams for the same. Within the CoProcessFunction, we buffer
> records until watermark and perform the join and business logic based on
> watermark. The issue is if one stream is slower than the other, the buffer
> (a rocksdb state) is unnecessarily filled by continuously reading from the
> fast stream.
>
> I took an inspiration from a response on the same thread
> <
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html
> >
> by Elias Levy -
>
> The idea I was suggesting is not for operators to block an input.  Rather,
> it is that they selectively choose from which input to process the next
> message from based on their timestamp, so long as there are buffered
> messages waiting to be processed.  That is a best-effort alignment
> strategy.  Seems to work relatively well in practice, at least within Kafka
> Streams.
>
> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
> its inputs.  Instead, it could keep them separate and selectively consume
> from the one that had a buffer available, and if both have buffers
> available, from the buffer with the messages with a lower timestamp.
>
> And attempted a POC implementation of CoBackpressure whenever 2 streams are
> connected. This is committed in a branch in my own fork -
>
> https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1
>
> The approach is
>
>    1. Provide a new method setCoBackpressureThreshold on ConnectedStream
>    2. Pass the user-provided CoBackpressureThreshold through various
>    classes till StreamTwoInputProcessorFactory.
>    3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory that
>    pauses input1 or input2 if the diff between watermarks is greater than
> the
>    threshold. In other words, it selectively chooses the next input which
> is
>    lagging behind.
>
> Some key points
>
>    1. One benefit of this approach is that a user can configure
>    CoBackpressureThreshold at any join and it is not a global config.
>    2. IntervalJoins internally use ConnectedStreams and therefore this will
>    work for intervalJoins as well.
>    3. Other window joins do not use ConnectedStreams but use
>    UnionedStreams. Will have to find a solution for that.
>    4. I believe MultipleInputStreams will also need similar functioality.
>    5. IMP: This approach does not solve the problem of having event-time
>    skew within different partitions/shards of the same input source. It
> only
>    solves for event time alignment of different sources.
>
> Looking forward to inputs on the same. If this seems like a feasible
> approach, I can take it forward and implement code with fixes for
> identified gaps and appropriate test cases.
>
> Thanks,
> Robin
>

Reply via email to