Thanks for the quick response and for clarifying this, I was stuck for a couple hours and found no other way than modifying the flink distribution. Knowing this makes the time spent worth it.
I'll keep an eye on the progress of this, or if I have a chance I'll share some of the changes that worked for me in a PR. Thank you again. El mar, 16 de sep de 2025, 5:52 a.m., Zakelly Lan <[email protected]> escribió: > Hi Pablo Flores, > > I think there is a missing method `trigger()` taking an async trigger for > WindowedStream. Will add this[1]. > And we also should add a converter from `ProcessingTimeoutTrigger` with > any nested timer to the async version, like others do[2]. A ticket is > tracking this[3]. > > But currently I'm afraid there is no way to use the async version of > `ProcessingTimeoutTrigger` without modification of flink distribution. > > > [1] https://issues.apache.org/jira/browse/FLINK-38363 > [2] > https://github.com/apache/flink/blob/3bc4f4fc17de1de758be7aca094146dfec844a37/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java#L625 > [3] https://issues.apache.org/jira/browse/FLINK-38364 > > Best, > Zakelly > > On Tue, Sep 16, 2025 at 3:34 AM Pablo Flores Hernández < > [email protected]> wrote: > >> Hello, >> I'm trying to find a way to read a stream of events from Kafka, key >> them with a proper key function and then grouping records in windows >> of a given size with the Count trigger or to buffer the window for a >> couple seconds. Since the system could receive multiple GBs of data in >> a very short period of time, being able to create windows of a certain >> size is essential to avoid overwhelming the operators. >> Since further down the pipeline I'm using the ForST backend and the V2 >> State APIs I cannot find a way to use the AsyncCountTrigger, as the >> class expected in the `window().trigger()` call requires a subclass of >> `Trigger` >> A simple example using scala is as follows >> >> // kafkaSource is just a datastream with a wrapper over raw kafka records >> >> kafkaSource.keyBy(new KeySelector[KafkaRecord,String] { >> override def getKey(in: KafkaRecord): String = new String(in.key) >> }).enableAsyncState() >> .window(GlobalWindows.create()) >> .trigger(AsyncProcessingTimeTrigger.create()) >> .process(new WindowMatcherWithStateV2()) >> >> If I use the ProcessingTimeoutTrigger with the regular CountTrigger, I >> get the following error: >> Trigger is for state V1 APIs, window operator with async state enabled >> only accept state V2 APIs. >> Any hints on how to achieve a similar behavior like the one obtained >> combining ProcessingTimeTrigger and Count trigger? I don't mind >> writing my own trigger, but it seems I cannot pass an async trigger to >> the current APIs exposed in WindowedStream. >> >> Thanks in advance and let me know if you need more details, >> >> Regards, >> >> Pablo Flores. >> >
