Hi, John If you want to trigger the aggregation calculation of the window “earlier” I think you might need to define your customized window trigger in DataStream yourself. I think you could find more detailed information from the doc [1]
If you think that the customize the trigger is a little harder I think you could use the `PorcessFunction` in DataStream. You could extend this `ProcessFunction` to simulate early trigger window behavior. There is a good example[2] in the doc. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#example Best, Guowei On Sat, Sep 4, 2021 at 2:16 AM John Smith <mylearningemail2...@gmail.com> wrote: > Thanks Guowei and Caizhi. > As Guowei noted, I am using Table API and it seems that it does not > support triggers at the moment. Is there a plan to support custom triggers > in Table API/SQL too? > Also, if I follow Guowei's suggestion, should I use DataStream for other > parts of the aggregate computation too or is there a way to create a > GroupedWindowedTable from the DataStream? > > Thanks, > > On Thu, Sep 2, 2021 at 9:24 PM Guowei Ma <guowei....@gmail.com> wrote: > >> Hi, John >> >> I agree with Caizhi that you might need to customize a window trigger. >> But there is a small addition, you need to convert Table to DataStream >> first. >> Then you can customize the trigger of the window. Because as far as I >> know, Table API does not support custom windows yet. For details on how to >> convert, you can refer to [1] >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#datastream-api-integration >> Best, >> Guowei >> >> >> On Fri, Sep 3, 2021 at 10:28 AM Caizhi Weng <tsreape...@gmail.com> wrote: >> >>> Hi! >>> >>> You might want to use your custom trigger to achieve this. >>> >>> Tumble windows are using EventTimeTrigger by default. Flink has another >>> built-in trigger called CountTrigger but it only fires for every X records, >>> ignoring the event time completely. You might want to create your own >>> trigger to combine the two, or more specifically, combine >>> EventTimeTrigger#onEventTime and CountTrigger#onElement. >>> >>> For more about custom triggers see >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers >>> >>> John Smith <mylearningemail2...@gmail.com> 于2021年9月3日周五 上午2:00写道: >>> >>>> Hi, >>>> >>>> Sorry if this has been answered previously but I couldn't find any >>>> answer for the question and would appreciate any help. >>>> Context: >>>> Let's say I have a log stream in Kafka where message values have an >>>> *id* field along with a few other fields. I want to count the number >>>> of messages for each id for a tumbling window of* ten minutes *and if >>>> the count for any id in the given window is higher than 5, I want to write >>>> the message into the sink topic. However, I don't want to wait until the >>>> end of the 10 minute window to emit the result and want to immediately emit >>>> the result when the count is more than 5 for an id in the window. >>>> For example, if I see 6 messages in the first minute for an id, I want to >>>> trigger a write with the count of 6 in the sink topic immediately and not >>>> wait the whole 10 minutes. >>>> The following code does the aggregation but emits the results at the >>>> end of the window. How can I trigger the emitting result earlier? >>>> >>>> final Table resultTable = sourceTable >>>> .select( $("id") >>>> , $("key") >>>> >>>> .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w") ) >>>> .groupBy($("w"), $("id")) >>>> .select($("w").start().as("WindowStart"), $("id"), >>>> $("key").count().as("count")) >>>> ; >>>> >>>> resultTable.execute().print(); >>>> >>>> >>>> Thanks in advance! >>>> >>>>