Hi Pablo Flores,

I think there is a missing method `trigger()` taking an async trigger for
WindowedStream. Will add this[1].
And we also should add a converter from `ProcessingTimeoutTrigger` with any
nested timer to the async version, like others do[2]. A ticket is tracking
this[3].

But currently I'm afraid there is no way to use the async version of
`ProcessingTimeoutTrigger` without modification of flink distribution.


[1] https://issues.apache.org/jira/browse/FLINK-38363
[2]
https://github.com/apache/flink/blob/3bc4f4fc17de1de758be7aca094146dfec844a37/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java#L625
[3] https://issues.apache.org/jira/browse/FLINK-38364

Best,
Zakelly

On Tue, Sep 16, 2025 at 3:34 AM Pablo Flores Hernández <
[email protected]> wrote:

> Hello,
> I'm trying to find a way to read a stream of events from Kafka, key
> them with a proper key function and then grouping records in windows
> of a given size with the Count trigger or to buffer the window for a
> couple seconds. Since the system could receive multiple GBs of data in
> a very short period of time, being able to create windows of a certain
> size is essential to avoid overwhelming the operators.
> Since further down the pipeline I'm using the ForST backend and the V2
> State APIs I cannot find a way to use the AsyncCountTrigger, as the
> class expected in the `window().trigger()` call requires a subclass of
> `Trigger`
> A simple example using scala is as follows
>
> // kafkaSource is just a datastream with a wrapper over raw kafka records
>
> kafkaSource.keyBy(new KeySelector[KafkaRecord,String] {
>     override def getKey(in: KafkaRecord): String = new String(in.key)
>   }).enableAsyncState()
>   .window(GlobalWindows.create())
>   .trigger(AsyncProcessingTimeTrigger.create())
>   .process(new WindowMatcherWithStateV2())
>
> If I use the ProcessingTimeoutTrigger with the regular CountTrigger, I
> get the following error:
> Trigger is for state V1 APIs, window operator with async state enabled
> only accept state V2 APIs.
> Any hints on how to achieve a similar behavior like the one obtained
> combining ProcessingTimeTrigger and Count trigger? I don't mind
> writing my own trigger, but it seems I cannot pass an async trigger to
> the current APIs exposed in WindowedStream.
>
> Thanks in advance and let me know if you need more details,
>
> Regards,
>
> Pablo Flores.
>

Reply via email to