Hi,

> Can you provide a pseudo-code example of how to implement this?
Processing time
If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each
record, you get the timestamp from System.currentTimeMillis(), say t, and
w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end =
w_start + 1000.

Event time
If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each record,
get the timestamp from the corresponding timestamp field, say t, and get
w_start and w_end same as above.

More examples can be found in TimeWindowTest[1].

Best, Hequn

[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java


On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> The problem with the LATERAL JOIN (via
> a LookupableTableSource+TableFunction because I need to call that function
> using the userId a a parameter)  is that I cannot know the window
> start/end..to me it's not clear how to get that from
> TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
> Can you provide a pseudo-code example of how to implement this?
>
> On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng <chenghe...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> Thanks for your information.
>>
>> From your description, it seems that you only use the window to get the
>> start and end time. There are no aggregations happen. If this is the case,
>> you can get the start and end time by yourself(the
>> `TimeWindow.getWindowStartWithOffset()` shows how to get window start
>> according to the timestamp). To be more specific, if you use processing
>> time, you can get your timestamp with System.currentTimeMillis(), and then
>> use it to get the window start and end
>> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
>> the timestamp from the rowtime field.
>>
>> With the start and end time, you can then perform LATERAL JOIN to enrich
>> the information. You can add a cache in your table function to avoid
>> frequent contacting with the REST endpoint.
>>
>> Best, Hequn
>>
>>
>> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> Hi Hequn, thanks for your answer.
>>> What I'm trying to do is to read a stream of events that basically
>>> contains a UserId field and, every X minutes (i.e. using a Time Window) and
>>> for each different UserId key, query 3 different REST services to enrich my
>>> POJOs*.
>>> For the moment what I do is to use a ProcessWindowFunction after the
>>> .keyBy().window() as shown in the  previous mail example to contact those 3
>>> services and enrich my object.
>>>
>>> However I don't like this solution because I'd like to use Flink to it's
>>> full potential so I'd like to enrich my object using LATERAL TABLEs or
>>> ASYNC IO..
>>> The main problem I'm facing right now is that  I can't find a way to
>>> pass the thumbing window start/end to the LATERAL JOIN table functions
>>> (because this is a parameter of the REST query).
>>> Moreover I don't know whether this use case is something that Table API
>>> aims to solve..
>>>
>>> * Of course this could kill the REST endpoint if the number of users is
>>> very big ..because of this I'd like to keep the external state of source
>>> tables as an internal Flink state and then do a JOIN on the UserId. The
>>> problem here is that I need to "materialize" them using Debezium (or
>>> similar) via Kafka and dynamic tables..is there any example of keeping
>>> multiple tables synched with Flink state through Debezium (without the need
>>> of rewriting all the logic for managing UPDATE/INSERT/DELETE)?
>>>
>>> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <chenghe...@gmail.com> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> Nice to hear your ideas on Table API!
>>>>
>>>> Could you be more specific about your requirements? A detailed scenario
>>>> would be quite helpful. For example, do you want to emit multi records
>>>> through the collector or do you want to use the timer?
>>>>
>>>> BTW, Table API introduces flatAggregate recently(both non-window
>>>> flatAggregate and window flatAggregate) and will be included in the near
>>>> coming release-1.9. The flatAggregate can emit multi records for a single
>>>> group. More details here[1][2].
>>>> Hope this can solve your problem.
>>>>
>>>> Best, Hequn
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions
>>>>
>>>> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <pomperma...@okkam.it>
>>>> wrote:
>>>>
>>>>> Hi to all,
>>>>> from what I understood a ProcessWindowFunction can only be used in the
>>>>> Streaming API.
>>>>> Is there any plan to port them also in the Table API (in the near
>>>>> future)?
>>>>> I'd like to do with Table API the equivalent of:
>>>>>
>>>>> final DataStream<MyPojoEvent> events = env.addSource(src);
>>>>> events.filter(e -> e.getCode() != null)
>>>>>     .keyBy(event -> Integer.valueOf(event.getCode()))
>>>>>     .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>>>     .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent,
>>>>> Integer, TimeWindow>()              {.....});
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>
>>>
>>>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809
>

Reply via email to