I think this could happen, but I have a very limited knowledge about how the input gates work internally. @Piotr could definitely provide some more insight here.
D. On Thu, Dec 2, 2021 at 5:54 PM Alexis Sarda-Espinosa < alexis.sarda-espin...@microfocus.com> wrote: > I do have some logic with timers today, but it’s indeed not ideal. I guess > I’ll have a look at TwoInputStreamOperator, but I do have related > questions. You mentioned a sample scenario of "processing backlog" where > windows fire very quickly; could it happen that, in such a situation, the > framework calls the operator’s processElement1 continuously (even for > several minutes) before calling processElement2 a single time? How does the > framework decide when to switch the stream processing when the streams are > connected? > > > > Regards, > > Alexis. > > > > *From:* David Morávek <d...@apache.org> > *Sent:* Donnerstag, 2. Dezember 2021 17:18 > *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > *Cc:* user@flink.apache.org > *Subject:* Re: Buffering when connecting streams > > > > Even with the TwoInputStreamOperator you can not "halt" the processing. > You need to buffer these elements for example in the ListState for later > processing. At the time the watermark of the second stream arrives, you can > process all buffered elements that satisfy the condition. > > > > You could probably also implement a similar (less optimized) solution with > KeyedCoProcessFunction using event time timers. > > > > Best, > > D. > > > > On Thu, Dec 2, 2021 at 5:12 PM Alexis Sarda-Espinosa < > alexis.sarda-espin...@microfocus.com> wrote: > > Yes, that sounds right, but with my current KeyedCoProcessFunction I can’t > tell Flink to "halt" processElement1 and switch to the other stream > depending on watermarks. I could look into TwoInputStreamOperator if you > think that’s the best approach. > > > > Regards, > > Alexis. > > > > *From:* David Morávek <d...@apache.org> > *Sent:* Donnerstag, 2. Dezember 2021 16:59 > *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > *Cc:* user@flink.apache.org > *Subject:* Re: Buffering when connecting streams > > > > I think this would require using lower level API and implementing a custom > `TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}` > methods. > > > > Let's also make sure we're on the same page on what the watermark is. You > can think of the watermark as event time clock. It basically gives you an > information, that *no more events with timestamp lower than the watermark > should appear in your stream*. > > > > You simply delay emitting of the window result from your "connect" > operator, until watermark from the second (side output) stream passes the > window's max timestamp (maximum timestamp that is included in the window). > > > > Does that make sense? > > > > Best, > > D. > > > > On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa < > alexis.sarda-espin...@microfocus.com> wrote: > > Could you elaborate on what you mean with synchronize? Buffering in the > state would be fine, but I haven’t been able to come up with a good way of > ensuring that all data from the side stream for a given minute is processed > by processElement2 before all data for the same (windowed) minute reaches > processElement1, even when considering watermarks. > > > > Regards, > > Alexis. > > > > *From:* David Morávek <d...@apache.org> > *Sent:* Donnerstag, 2. Dezember 2021 15:45 > *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > *Cc:* user@flink.apache.org > *Subject:* Re: Buffering when connecting streams > > > > You can not rely on order of the two streams that easily. In case you are > for example processing backlog and the windows fire quickly, it can happen > that it's actually faster than the second branch which has less work to do. > This will make the pipeline non-deterministic. > > > > What you can do is to "synchronize" watermarks of both streams in your > "connect" operator, but that of course involves buffering events in the > state. > > > > Best, > > D. > > > > On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa < > alexis.sarda-espin...@microfocus.com> wrote: > > Hi David, > > > > A watermark step simply refers to assigning timestamps and watermarks, my > source doesn’t do that. > > > > I have a test system with only a couple of data points per day, so there’s > definitely no back pressure. I basically have a browser where I can see the > results from the sink, and I found one result that should have been > discarded but wasn’t, which makes me think that the operator processed the > "open" state but waited too long and didn’t process the "close" state > before the window fired. I can also see that the closure (going from open > to close) triggered on second 17, and my windows are evaluated every > minute, so it wasn’t a race condition. > > > > Regards, > > Alexis. > > > > *From:* David Morávek <d...@apache.org> > *Sent:* Donnerstag, 2. Dezember 2021 14:52 > *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> > *Cc:* user@flink.apache.org > *Subject:* Re: Buffering when connecting streams > > > > Hi Alexis, > > > > I'm not sure what "watermark" step refers to in you graph, but in general > I'd say your intuition is correct. > > > > For the "buffering" part, each sub-task needs to send data via data > exchange (last operator in chain) has an output buffer and the operator > that consumes this data (maybe on different machine) has an input buffer > (buffer de-bloating feature can help to mitigate excessive buffering in > case of back-pressure). > > > > but I’m not sure if this actually happens > > > > How are you trying to verify this? Also can you check whether the > operators are not back-pressured? > > > > Best, > > D. > > > > On Thu, Dec 2, 2021 at 12:27 PM Alexis Sarda-Espinosa < > alexis.sarda-espin...@microfocus.com> wrote: > > Hello, > > > > I have a use case with event-time processing that ends up with a DAG > roughly like this: > > > > source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 -> > connect (KeyedCoProcessFunction) -> sink > > | > / > > (side output) -> keyBy -> watermark > --------------------------------/ > > > > > > (In case the text gets mangled in the e-mail, the side output comes from > the filter and joins back with the connect operation) > > > > The filter takes all data and its main output is all _*valid*_ data with > state "open"; the side output is all _*valid*_ data regardless of state. > > > > The goal of the KeyedCoProcessFunction is to check the results of the > (sliding) window. The window only receives open states, but > KeyedCoProcessFunction receives all valid data and should discard results > from the main stream if states changed from "open" to something else before > the window was evaluated. > > > > I would have expected all data from the side output to be processed > roughly immediately in KeyedCoProcessFunction’s processElement2 because > there’s no windowing in the side stream, but I’m not sure if this actually > happens, maybe the side stream (or both streams) buffers some data before > passing it to the connected stream? If yes, is there any way I could tune > this? > > > > Regards, > > Alexis. > > > >