Hi, John

If you want to trigger the aggregation calculation of the window “earlier”
I think you might need to define your customized window trigger in
DataStream yourself.
I think you could find more detailed information from the doc [1]

If you think that the customize the trigger is a little harder I think you
could use the `PorcessFunction` in DataStream.
You could extend this `ProcessFunction` to simulate early trigger window
behavior. There is a good example[2] in the doc.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#example

Best,
Guowei


On Sat, Sep 4, 2021 at 2:16 AM John Smith <mylearningemail2...@gmail.com>
wrote:

> Thanks Guowei and Caizhi.
> As Guowei noted, I am using Table API and it seems that it does not
> support triggers at the moment. Is there a plan to support custom triggers
> in Table API/SQL too?
> Also, if I follow Guowei's suggestion, should I use DataStream for other
> parts of the aggregate computation too or is there a way to create a
> GroupedWindowedTable from the DataStream?
>
> Thanks,
>
> On Thu, Sep 2, 2021 at 9:24 PM Guowei Ma <guowei....@gmail.com> wrote:
>
>> Hi, John
>>
>> I agree with Caizhi that you might need to customize a window trigger.
>> But there is a small addition, you need to convert Table to DataStream
>> first.
>> Then you can customize the trigger of the window. Because as far as I
>> know, Table API does not support custom windows yet. For details on how to
>> convert, you can refer to [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#datastream-api-integration
>> Best,
>> Guowei
>>
>>
>> On Fri, Sep 3, 2021 at 10:28 AM Caizhi Weng <tsreape...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> You might want to use your custom trigger to achieve this.
>>>
>>> Tumble windows are using EventTimeTrigger by default. Flink has another
>>> built-in trigger called CountTrigger but it only fires for every X records,
>>> ignoring the event time completely. You might want to create your own
>>> trigger to combine the two, or more specifically, combine
>>> EventTimeTrigger#onEventTime and CountTrigger#onElement.
>>>
>>> For more about custom triggers see
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers
>>>
>>> John Smith <mylearningemail2...@gmail.com> 于2021年9月3日周五 上午2:00写道:
>>>
>>>> Hi,
>>>>
>>>> Sorry if this has been answered previously but I couldn't find any
>>>> answer for the question and would appreciate any help.
>>>> Context:
>>>> Let's say I have a log stream in Kafka where message values have an
>>>> *id* field along with a few other fields. I want to count the number
>>>> of messages for each id for a tumbling window of* ten minutes *and if
>>>> the count for any id in the given window is higher than 5, I want to write
>>>> the message into the sink topic. However, I don't want to wait until the
>>>> end of the 10 minute window to emit the result and want to immediately emit
>>>> the result when the count is more than 5 for an id in the window.
>>>> For example, if I see 6 messages in the first minute for an id, I want to
>>>> trigger a write with the count of 6 in the sink topic immediately and not
>>>> wait the whole 10 minutes.
>>>> The following code does the aggregation but emits the results at the
>>>> end of the window. How can I trigger the emitting result earlier?
>>>>
>>>> final Table resultTable = sourceTable
>>>>                 .select( $("id")
>>>>                         , $("key")
>>>>                 
>>>> .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w")   )
>>>>                 .groupBy($("w"), $("id"))
>>>>                 .select($("w").start().as("WindowStart"), $("id"), 
>>>> $("key").count().as("count"))
>>>>                 ;
>>>>
>>>> resultTable.execute().print();
>>>>
>>>>
>>>> Thanks in advance!
>>>>
>>>>

Reply via email to