Hi, > Can you provide a pseudo-code example of how to implement this? Processing time If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each record, you get the timestamp from System.currentTimeMillis(), say t, and w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end = w_start + 1000.
Event time If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each record, get the timestamp from the corresponding timestamp field, say t, and get w_start and w_end same as above. More examples can be found in TimeWindowTest[1]. Best, Hequn [1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier <pomperma...@okkam.it> wrote: > The problem with the LATERAL JOIN (via > a LookupableTableSource+TableFunction because I need to call that function > using the userId a a parameter) is that I cannot know the window > start/end..to me it's not clear how to get that from > TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)... > Can you provide a pseudo-code example of how to implement this? > > On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi Flavio, >> >> Thanks for your information. >> >> From your description, it seems that you only use the window to get the >> start and end time. There are no aggregations happen. If this is the case, >> you can get the start and end time by yourself(the >> `TimeWindow.getWindowStartWithOffset()` shows how to get window start >> according to the timestamp). To be more specific, if you use processing >> time, you can get your timestamp with System.currentTimeMillis(), and then >> use it to get the window start and end >> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get >> the timestamp from the rowtime field. >> >> With the start and end time, you can then perform LATERAL JOIN to enrich >> the information. You can add a cache in your table function to avoid >> frequent contacting with the REST endpoint. >> >> Best, Hequn >> >> >> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >>> Hi Hequn, thanks for your answer. >>> What I'm trying to do is to read a stream of events that basically >>> contains a UserId field and, every X minutes (i.e. using a Time Window) and >>> for each different UserId key, query 3 different REST services to enrich my >>> POJOs*. >>> For the moment what I do is to use a ProcessWindowFunction after the >>> .keyBy().window() as shown in the previous mail example to contact those 3 >>> services and enrich my object. >>> >>> However I don't like this solution because I'd like to use Flink to it's >>> full potential so I'd like to enrich my object using LATERAL TABLEs or >>> ASYNC IO.. >>> The main problem I'm facing right now is that I can't find a way to >>> pass the thumbing window start/end to the LATERAL JOIN table functions >>> (because this is a parameter of the REST query). >>> Moreover I don't know whether this use case is something that Table API >>> aims to solve.. >>> >>> * Of course this could kill the REST endpoint if the number of users is >>> very big ..because of this I'd like to keep the external state of source >>> tables as an internal Flink state and then do a JOIN on the UserId. The >>> problem here is that I need to "materialize" them using Debezium (or >>> similar) via Kafka and dynamic tables..is there any example of keeping >>> multiple tables synched with Flink state through Debezium (without the need >>> of rewriting all the logic for managing UPDATE/INSERT/DELETE)? >>> >>> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <chenghe...@gmail.com> wrote: >>> >>>> Hi Flavio, >>>> >>>> Nice to hear your ideas on Table API! >>>> >>>> Could you be more specific about your requirements? A detailed scenario >>>> would be quite helpful. For example, do you want to emit multi records >>>> through the collector or do you want to use the timer? >>>> >>>> BTW, Table API introduces flatAggregate recently(both non-window >>>> flatAggregate and window flatAggregate) and will be included in the near >>>> coming release-1.9. The flatAggregate can emit multi records for a single >>>> group. More details here[1][2]. >>>> Hope this can solve your problem. >>>> >>>> Best, Hequn >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions >>>> >>>> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <pomperma...@okkam.it> >>>> wrote: >>>> >>>>> Hi to all, >>>>> from what I understood a ProcessWindowFunction can only be used in the >>>>> Streaming API. >>>>> Is there any plan to port them also in the Table API (in the near >>>>> future)? >>>>> I'd like to do with Table API the equivalent of: >>>>> >>>>> final DataStream<MyPojoEvent> events = env.addSource(src); >>>>> events.filter(e -> e.getCode() != null) >>>>> .keyBy(event -> Integer.valueOf(event.getCode())) >>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) >>>>> .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent, >>>>> Integer, TimeWindow>() {.....}); >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>> >>> >>> > > -- > Flavio Pompermaier > Development Department > > OKKAM S.r.l. > Tel. +(39) 0461 041809 >