I think this would require using lower level API and implementing a custom
`TwoInputStreamOperator`. Then you can hook to `processWatemark{1,2}`
methods.

Let's also make sure we're on the same page on what the watermark is. You
can think of the watermark as event time clock. It basically gives you an
information, that *no more events with timestamp lower than the watermark
should appear in your stream*.

You simply delay emitting of the window result from your "connect"
operator, until watermark from the second (side output) stream passes the
window's max timestamp (maximum timestamp that is included in the window).

Does that make sense?

Best,
D.

On Thu, Dec 2, 2021 at 4:25 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> Could you elaborate on what you mean with synchronize? Buffering in the
> state would be fine, but I haven’t been able to come up with a good way of
> ensuring that all data from the side stream for a given minute is processed
> by processElement2 before all data for the same (windowed) minute reaches
> processElement1, even when considering watermarks.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek <d...@apache.org>
> *Sent:* Donnerstag, 2. Dezember 2021 15:45
> *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> You can not rely on order of the two streams that easily. In case you are
> for example processing backlog and the windows fire quickly, it can happen
> that it's actually faster than the second branch which has less work to do.
> This will make the pipeline non-deterministic.
>
>
>
> What you can do is to "synchronize" watermarks of both streams in your
> "connect" operator, but that of course involves buffering events in the
> state.
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 3:02 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Hi David,
>
>
>
> A watermark step simply refers to assigning timestamps and watermarks, my
> source doesn’t do that.
>
>
>
> I have a test system with only a couple of data points per day, so there’s
> definitely no back pressure. I basically have a browser where I can see the
> results from the sink, and I found one result that should have been
> discarded but wasn’t, which makes me think that the operator processed the
> "open" state but waited too long and didn’t process the "close" state
> before the window fired. I can also see that the closure (going from open
> to close) triggered on second 17, and my windows are evaluated every
> minute, so it wasn’t a race condition.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* David Morávek <d...@apache.org>
> *Sent:* Donnerstag, 2. Dezember 2021 14:52
> *To:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Buffering when connecting streams
>
>
>
> Hi Alexis,
>
>
>
> I'm not sure what "watermark" step refers to in you graph, but in general
> I'd say your intuition is correct.
>
>
>
> For the "buffering" part, each sub-task needs to send data via data
> exchange (last operator in chain) has an output buffer and the operator
> that consumes this data (maybe on different machine) has an input buffer
> (buffer de-bloating feature can help to mitigate excessive buffering in
> case of back-pressure).
>
>
>
> but I’m not sure if this actually happens
>
>
>
> How are you trying to verify this? Also can you check whether the
> operators are not back-pressured?
>
>
>
> Best,
>
> D.
>
>
>
> On Thu, Dec 2, 2021 at 12:27 PM Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> wrote:
>
> Hello,
>
>
>
> I have a use case with event-time processing that ends up with a DAG
> roughly like this:
>
>
>
> source -> filter -> keyBy -> watermark -> window -> process -> keyBy_2 ->
> connect (KeyedCoProcessFunction) -> sink
>
>            |
>                                                   /
>
>       (side output) -> keyBy -> watermark
> --------------------------------/
>
>
>
>
>
> (In case the text gets mangled in the e-mail, the side output comes from
> the filter and joins back with the connect operation)
>
>
>
> The filter takes all data and its main output is all _*valid*_ data with
> state "open"; the side output is all _*valid*_ data regardless of state.
>
>
>
> The goal of the KeyedCoProcessFunction is to check the results of the
> (sliding) window. The window only receives open states, but
> KeyedCoProcessFunction receives all valid data and should discard results
> from the main stream if states changed from "open" to something else before
> the window was evaluated.
>
>
>
> I would have expected all data from the side output to be processed
> roughly immediately in KeyedCoProcessFunction’s processElement2 because
> there’s no windowing in the side stream, but I’m not sure if this actually
> happens, maybe the side stream (or both streams) buffers some data before
> passing it to the connected stream? If yes, is there any way I could tune
> this?
>
>
>
> Regards,
>
> Alexis.
>
>
>
>

Reply via email to