Hi Robin Thank you for bringing up this discussion. AFAIK there are many same requirements.But it might lead to a deadlock if we depend on pausing one input of two to align the watermark. After the FLIP-27 Flink would introduce some new mechanism for aligning the watermark of different sources .Maybe @Becket could give some inputs or some plans for this.
Best, Guowei On Wed, Mar 24, 2021 at 1:46 PM Robin KC <arora.k.ro...@gmail.com> wrote: > Hi all, > > The issue has been discussed before here - > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html > > Our use case requires event time join of two streams and we use > ConnectedStreams for the same. Within the CoProcessFunction, we buffer > records until watermark and perform the join and business logic based on > watermark. The issue is if one stream is slower than the other, the buffer > (a rocksdb state) is unnecessarily filled by continuously reading from the > fast stream. > > I took an inspiration from a response on the same thread > < > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html > > > by Elias Levy - > > The idea I was suggesting is not for operators to block an input. Rather, > it is that they selectively choose from which input to process the next > message from based on their timestamp, so long as there are buffered > messages waiting to be processed. That is a best-effort alignment > strategy. Seems to work relatively well in practice, at least within Kafka > Streams. > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both > its inputs. Instead, it could keep them separate and selectively consume > from the one that had a buffer available, and if both have buffers > available, from the buffer with the messages with a lower timestamp. > > And attempted a POC implementation of CoBackpressure whenever 2 streams are > connected. This is committed in a branch in my own fork - > > https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1 > > The approach is > > 1. Provide a new method setCoBackpressureThreshold on ConnectedStream > 2. Pass the user-provided CoBackpressureThreshold through various > classes till StreamTwoInputProcessorFactory. > 3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory that > pauses input1 or input2 if the diff between watermarks is greater than > the > threshold. In other words, it selectively chooses the next input which > is > lagging behind. > > Some key points > > 1. One benefit of this approach is that a user can configure > CoBackpressureThreshold at any join and it is not a global config. > 2. IntervalJoins internally use ConnectedStreams and therefore this will > work for intervalJoins as well. > 3. Other window joins do not use ConnectedStreams but use > UnionedStreams. Will have to find a solution for that. > 4. I believe MultipleInputStreams will also need similar functioality. > 5. IMP: This approach does not solve the problem of having event-time > skew within different partitions/shards of the same input source. It > only > solves for event time alignment of different sources. > > Looking forward to inputs on the same. If this seems like a feasible > approach, I can take it forward and implement code with fixes for > identified gaps and appropriate test cases. > > Thanks, > Robin >