Re: Synchronizing streams in coprocessfunction

2022-06-28 Thread Gopi Krishna M
Collector out) throws Exception { } }) .keyBy(Enriched::getFilePath) .map(Enriched::getData) .sink() > > > Greetings > > > > Thias > > > > > > *From:* Gopi Krishna M > *Sent:* Monday, June 27, 2022 3:01 PM > *To:*

RE: Synchronizing streams in coprocessfunction

2022-06-27 Thread Schwalbe Matthias
: user@flink.apache.org Subject: Re: Synchronizing streams in coprocessfunction Thanks Quingsheng, that would definitely work. But I'm unable to figure out how I can apply this with CoProcessFunction. One stream is windowed and trigger implementation uses the 2nd stream. On Mon, Jun 27, 2022 at

Re: Synchronizing streams in coprocessfunction

2022-06-27 Thread Gopi Krishna M
Thanks Quingsheng, that would definitely work. But I'm unable to figure out how I can apply this with CoProcessFunction. One stream is windowed and trigger implementation uses the 2nd stream. On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren wrote: > Hi Gopi, > > What about using a window with a cus

Re: Synchronizing streams in coprocessfunction

2022-06-27 Thread Qingsheng Ren
Hi Gopi, What about using a window with a custom trigger? The window is doing nothing but aggregating your input to a collection. The trigger accepts metadata from the low input stream so it can fire and purge the window (emit all elements in the window to downstream) on arrival of metadata.