Hi all,

The issue has been discussed before here -
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html

Our use case requires event time join of two streams and we use
ConnectedStreams for the same. Within the CoProcessFunction, we buffer
records until watermark and perform the join and business logic based on
watermark. The issue is if one stream is slower than the other, the buffer
(a rocksdb state) is unnecessarily filled by continuously reading from the
fast stream.

I took an inspiration from a response on the same thread
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html>
by Elias Levy -

The idea I was suggesting is not for operators to block an input.  Rather,
it is that they selectively choose from which input to process the next
message from based on their timestamp, so long as there are buffered
messages waiting to be processed.  That is a best-effort alignment
strategy.  Seems to work relatively well in practice, at least within Kafka
Streams.

E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
its inputs.  Instead, it could keep them separate and selectively consume
from the one that had a buffer available, and if both have buffers
available, from the buffer with the messages with a lower timestamp.

And attempted a POC implementation of CoBackpressure whenever 2 streams are
connected. This is committed in a branch in my own fork -
https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1

The approach is

   1. Provide a new method setCoBackpressureThreshold on ConnectedStream
   2. Pass the user-provided CoBackpressureThreshold through various
   classes till StreamTwoInputProcessorFactory.
   3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory that
   pauses input1 or input2 if the diff between watermarks is greater than the
   threshold. In other words, it selectively chooses the next input which is
   lagging behind.

Some key points

   1. One benefit of this approach is that a user can configure
   CoBackpressureThreshold at any join and it is not a global config.
   2. IntervalJoins internally use ConnectedStreams and therefore this will
   work for intervalJoins as well.
   3. Other window joins do not use ConnectedStreams but use
   UnionedStreams. Will have to find a solution for that.
   4. I believe MultipleInputStreams will also need similar functioality.
   5. IMP: This approach does not solve the problem of having event-time
   skew within different partitions/shards of the same input source. It only
   solves for event time alignment of different sources.

Looking forward to inputs on the same. If this seems like a feasible
approach, I can take it forward and implement code with fixes for
identified gaps and appropriate test cases.

Thanks,
Robin

Reply via email to