Collector out) throws Exception {
}
})
.keyBy(Enriched::getFilePath)
.map(Enriched::getData)
.sink()
>
>
> Greetings
>
>
>
> Thias
>
>
>
>
>
> *From:* Gopi Krishna M
> *Sent:* Monday, June 27, 2022 3:01 PM
> *To:*
: user@flink.apache.org
Subject: Re: Synchronizing streams in coprocessfunction
Thanks Quingsheng, that would definitely work. But I'm unable to figure out how
I can apply this with CoProcessFunction. One stream is windowed and trigger
implementation uses the 2nd stream.
On Mon, Jun 27, 2022 at
Thanks Quingsheng, that would definitely work. But I'm unable to figure out
how I can apply this with CoProcessFunction. One stream is windowed and
trigger implementation uses the 2nd stream.
On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren wrote:
> Hi Gopi,
>
> What about using a window with a cus
Hi Gopi,
What about using a window with a custom trigger? The window is doing nothing
but aggregating your input to a collection. The trigger accepts metadata from
the low input stream so it can fire and purge the window (emit all elements in
the window to downstream) on arrival of metadata.