Re: Key by Kafka partition / Kinesis shard

2021-04-29 Thread Till Rohrmann
Yes you would have to use the operator state for this. This would have the limitation that rescaling would probably not properly work. Also if the assignment of shards to operators changes upon failure recovery it can happen that it generates some incorrect results (some elements from shard 1 might

Re: Key by Kafka partition / Kinesis shard

2021-04-29 Thread Yegor Roganov
Hi Raghavendar, thank you for your reply. > stream.timeWindow(Time.seconds(10)).trigger(CustomTrigger.of(3)).apply(new TestWindow()); What would this stream be keyed on? On Thu, Apr 29, 2021 at 11:58 AM Raghavendar T S wrote: > Hi Yegor > > The trigger implementation in Flink does not support

Re: Key by Kafka partition / Kinesis shard

2021-04-29 Thread Yegor Roganov
Hi Till, thank you for your reply. > What you can do, though, is to create a custom operator or use a flatMap to build your own windowing operator. Since my stream wouldn't be keyed, does this mean that I would need to use "Managed Operator State" (aka raw state)? On Thu, Apr 29, 2021 at 10:34 AM

Re: Key by Kafka partition / Kinesis shard

2021-04-29 Thread Raghavendar T S
Hi Yegor The trigger implementation in Flink does not support trigger by event count and duration together. You can update the existing CountTrigger implementation to support your functionality. You can use the CustomTrigger.java (minor enhancement of CountTrigger) as such which I have attached i

Re: Key by Kafka partition / Kinesis shard

2021-04-29 Thread Till Rohrmann
Hi Yegor, If you want to use Flink's keyed windowing logic, then you need to insert a keyBy/shuffle operation because Flink currently cannot simply use the partitioning of the Kinesis shards. The reason is that Flink needs to group the keys into the correct key groups in order to support rescaling

Key by Kafka partition / Kinesis shard

2021-04-28 Thread Yegor Roganov
Hello To learn Flink I'm trying to build a simple application where I want to save events coming from Kinesis to S3. I want to subscribe to each shard, and within each shard I want to batch for 30 seconds, or until 1000 events are observed. These batches should then be uploaded to S3. What I don't