Thanks Pioter and Guowei for the inputs. I understand that this can lead to deadlocks and the right solution should be based on FLIP-27. Will look further into FLIP-27 for this.
Thanks, Robin On Wed, Mar 24, 2021 at 2:21 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi Robin, > > Flink has a functionality to block reading from an input. It's not > documented, as it's not fully working in Streaming. Take a look at the > `org.apache.flink.streaming.api.operators.InputSelectable` class and how > it's being used (you have to implement your own operator to use it). It has > two limitations: > > 1. As Guowei Ma mentioned, if there is a diamond pattern in the job graph, > this can lead to deadlocks in the job. > 2. When blocking reads from an input, you are blocking checkpoints from > progressing, so currently there is a safety check that disallows using > `InputSelectable` when checkpointing is enabled. > > Having said that, and as Guowei mentioned, this doesn't seem to be a > correct solution for your problem. Throttling FLIP-27 sources based on > watermarks progression is the correct approach here, that avoids above > issues. > > Best, > Piotrek > > śr., 24 mar 2021 o 08:23 Guowei Ma <guowei....@gmail.com> napisał(a): > > > Hi Robin > > > > Thank you for bringing up this discussion. AFAIK there are many same > > requirements.But it might lead to a deadlock if we depend on pausing one > > input of two to align the watermark. > > After the FLIP-27 Flink would introduce some new mechanism for aligning > the > > watermark of different sources .Maybe @Becket could give some inputs or > > some plans for this. > > > > Best, > > Guowei > > > > > > On Wed, Mar 24, 2021 at 1:46 PM Robin KC <arora.k.ro...@gmail.com> > wrote: > > > > > Hi all, > > > > > > The issue has been discussed before here - > > > > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html > > > > > > Our use case requires event time join of two streams and we use > > > ConnectedStreams for the same. Within the CoProcessFunction, we buffer > > > records until watermark and perform the join and business logic based > on > > > watermark. The issue is if one stream is slower than the other, the > > buffer > > > (a rocksdb state) is unnecessarily filled by continuously reading from > > the > > > fast stream. > > > > > > I took an inspiration from a response on the same thread > > > < > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-tp24489p24564.html > > > > > > > by Elias Levy - > > > > > > The idea I was suggesting is not for operators to block an input. > > Rather, > > > it is that they selectively choose from which input to process the next > > > message from based on their timestamp, so long as there are buffered > > > messages waiting to be processed. That is a best-effort alignment > > > strategy. Seems to work relatively well in practice, at least within > > Kafka > > > Streams. > > > > > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for > both > > > its inputs. Instead, it could keep them separate and selectively > consume > > > from the one that had a buffer available, and if both have buffers > > > available, from the buffer with the messages with a lower timestamp. > > > > > > And attempted a POC implementation of CoBackpressure whenever 2 streams > > are > > > connected. This is committed in a branch in my own fork - > > > > > > > > > https://github.com/apache/flink/commit/65eef7a5baeec9db588a6ba1f8fe339b3b436fb1 > > > > > > The approach is > > > > > > 1. Provide a new method setCoBackpressureThreshold on > ConnectedStream > > > 2. Pass the user-provided CoBackpressureThreshold through various > > > classes till StreamTwoInputProcessorFactory. > > > 3. Implement a WatermarkHandler in StreamTwoInputProcessorFactory > that > > > pauses input1 or input2 if the diff between watermarks is greater > than > > > the > > > threshold. In other words, it selectively chooses the next input > which > > > is > > > lagging behind. > > > > > > Some key points > > > > > > 1. One benefit of this approach is that a user can configure > > > CoBackpressureThreshold at any join and it is not a global config. > > > 2. IntervalJoins internally use ConnectedStreams and therefore this > > will > > > work for intervalJoins as well. > > > 3. Other window joins do not use ConnectedStreams but use > > > UnionedStreams. Will have to find a solution for that. > > > 4. I believe MultipleInputStreams will also need similar > functioality. > > > 5. IMP: This approach does not solve the problem of having > event-time > > > skew within different partitions/shards of the same input source. It > > > only > > > solves for event time alignment of different sources. > > > > > > Looking forward to inputs on the same. If this seems like a feasible > > > approach, I can take it forward and implement code with fixes for > > > identified gaps and appropriate test cases. > > > > > > Thanks, > > > Robin > > > > > >