Hi Pablo Flores, I think there is a missing method `trigger()` taking an async trigger for WindowedStream. Will add this[1]. And we also should add a converter from `ProcessingTimeoutTrigger` with any nested timer to the async version, like others do[2]. A ticket is tracking this[3].
But currently I'm afraid there is no way to use the async version of `ProcessingTimeoutTrigger` without modification of flink distribution. [1] https://issues.apache.org/jira/browse/FLINK-38363 [2] https://github.com/apache/flink/blob/3bc4f4fc17de1de758be7aca094146dfec844a37/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java#L625 [3] https://issues.apache.org/jira/browse/FLINK-38364 Best, Zakelly On Tue, Sep 16, 2025 at 3:34 AM Pablo Flores Hernández < [email protected]> wrote: > Hello, > I'm trying to find a way to read a stream of events from Kafka, key > them with a proper key function and then grouping records in windows > of a given size with the Count trigger or to buffer the window for a > couple seconds. Since the system could receive multiple GBs of data in > a very short period of time, being able to create windows of a certain > size is essential to avoid overwhelming the operators. > Since further down the pipeline I'm using the ForST backend and the V2 > State APIs I cannot find a way to use the AsyncCountTrigger, as the > class expected in the `window().trigger()` call requires a subclass of > `Trigger` > A simple example using scala is as follows > > // kafkaSource is just a datastream with a wrapper over raw kafka records > > kafkaSource.keyBy(new KeySelector[KafkaRecord,String] { > override def getKey(in: KafkaRecord): String = new String(in.key) > }).enableAsyncState() > .window(GlobalWindows.create()) > .trigger(AsyncProcessingTimeTrigger.create()) > .process(new WindowMatcherWithStateV2()) > > If I use the ProcessingTimeoutTrigger with the regular CountTrigger, I > get the following error: > Trigger is for state V1 APIs, window operator with async state enabled > only accept state V2 APIs. > Any hints on how to achieve a similar behavior like the one obtained > combining ProcessingTimeTrigger and Count trigger? I don't mind > writing my own trigger, but it seems I cannot pass an async trigger to > the current APIs exposed in WindowedStream. > > Thanks in advance and let me know if you need more details, > > Regards, > > Pablo Flores. >
