Hi Raghavendar, thank you for your reply. > stream.timeWindow(Time.seconds(10)).trigger(CustomTrigger.of(3)).apply(new TestWindow()); What would this stream be keyed on?
On Thu, Apr 29, 2021 at 11:58 AM Raghavendar T S <raghav280...@gmail.com> wrote: > Hi Yegor > > The trigger implementation in Flink does not support trigger by event > count and duration together. You can update the existing CountTrigger > implementation to support your functionality. > You can use the CustomTrigger.java (minor enhancement of CountTrigger) as > such which I have attached in this thread. TestWindow is the window > function which lets you receive the grouped events. You check the diff of > CountTrigger and CustomTrigger for your better understanding. > > *Usage* > stream.timeWindow(Time.seconds(10)).trigger(CustomTrigger.of(3)).apply(new > TestWindow()); > > Thank you > Raghavendar T S > merasplugins.com > teknosrc.com > > > > > > > <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> > Virus-free. > www.avast.com > <https://www.avast.com/sig-email?utm_medium=email&utm_source=link&utm_campaign=sig-email&utm_content=webmail> > <#m_2491226419129409135_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2> > > On Thu, Apr 29, 2021 at 1:04 PM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Yegor, >> >> If you want to use Flink's keyed windowing logic, then you need to insert >> a keyBy/shuffle operation because Flink currently cannot simply use the >> partitioning of the Kinesis shards. The reason is that Flink needs to group >> the keys into the correct key groups in order to support rescaling of the >> state. >> >> What you can do, though, is to create a custom operator or use a flatMap >> to build your own windowing operator. This operator could then use the >> partitioning of the Kinesis shards by simply collecting the events until >> either 30 seconds or 1000 events are observed. >> >> Cheers, >> Till >> >> On Wed, Apr 28, 2021 at 11:12 AM Yegor Roganov <yegor....@gmail.com> >> wrote: >> >>> Hello >>> >>> To learn Flink I'm trying to build a simple application where I want to >>> save events coming from Kinesis to S3. >>> I want to subscribe to each shard, and within each shard I want to batch >>> for 30 seconds, or until 1000 events are observed. These batches should >>> then be uploaded to S3. >>> What I don't understand is how to key my source on shard id, and do it >>> in a way that doesn't induce unnecessary shuffling. >>> Is this possible with Flink? >>> >> > > -- > Raghavendar T S > www.teknosrc.com >