Hi all, The issue has been discussed before here - http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html
Our use case requires event time join of two streams and we use ConnectedStreams for the same. Within the CoProcessFunction, we buffer records until watermark and perform the join and business logic based on watermark. The issue is if one stream is slower than the other, the buffer (a rocksdb state) is unnecessarily filled by continuously reading from the fast stream. I took an inspiration from a response on the same thread <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html> by Elias Levy - The idea I was suggesting is not for operators to block an input. Rather, it is that they selectively choose from which input to process the next message from based on their timestamp, so long as there are buffered messages waiting to be processed. That is a best-effort alignment strategy. Seems to work relatively well in practice, at least within Kafka Streams. E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both its inputs. Instead, it could keep them separate and selectively consume from the one that had a buffer available, and if both have buffers available, from the buffer with the messages with a lower timestamp. And attempted a POC implementation of CoBackpressure whenever 2 streams are connected. This is committed in a branch in my own fork - https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1 The approach is 1. Provide a new method setCoBackpressureThreshold on ConnectedStream 2. Pass the user-provided CoBackpressureThreshold through various classes till StreamTwoInputProcessorFactory. 3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory that pauses input1 or input2 if the diff between watermarks is greater than the threshold. In other words, it selectively chooses the next input which is lagging behind. Some key points 1. One benefit of this approach is that a user can configure CoBackpressureThreshold at any join and it is not a global config. 2. IntervalJoins internally use ConnectedStreams and therefore this will work for intervalJoins as well. 3. Other window joins do not use ConnectedStreams but use UnionedStreams. Will have to find a solution for that. 4. I believe MultipleInputStreams will also need similar functioality. 5. IMP: This approach does not solve the problem of having event-time skew within different partitions/shards of the same input source. It only solves for event time alignment of different sources. Looking forward to inputs on the same. If this seems like a feasible approach, I can take it forward and implement code with fixes for identified gaps and appropriate test cases. Thanks, Robin