Thanks Hequn, I'll give it a try! Best, Flavio
On Thu, Jul 11, 2019 at 3:38 AM Hequn Cheng <chenghe...@gmail.com> wrote: > Hi, > > > Can you provide a pseudo-code example of how to implement this? > Processing time > If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each > record, you get the timestamp from System.currentTimeMillis(), say t, and > w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end = > w_start + 1000. > > Event time > If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each > record, get the timestamp from the corresponding timestamp field, say t, > and get w_start and w_end same as above. > > More examples can be found in TimeWindowTest[1]. > > Best, Hequn > > [1] > https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java > > > On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> The problem with the LATERAL JOIN (via >> a LookupableTableSource+TableFunction because I need to call that function >> using the userId a a parameter) is that I cannot know the window >> start/end..to me it's not clear how to get that from >> TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)... >> Can you provide a pseudo-code example of how to implement this? >> >> On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng <chenghe...@gmail.com> wrote: >> >>> Hi Flavio, >>> >>> Thanks for your information. >>> >>> From your description, it seems that you only use the window to get the >>> start and end time. There are no aggregations happen. If this is the case, >>> you can get the start and end time by yourself(the >>> `TimeWindow.getWindowStartWithOffset()` shows how to get window start >>> according to the timestamp). To be more specific, if you use processing >>> time, you can get your timestamp with System.currentTimeMillis(), and then >>> use it to get the window start and end >>> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get >>> the timestamp from the rowtime field. >>> >>> With the start and end time, you can then perform LATERAL JOIN to enrich >>> the information. You can add a cache in your table function to avoid >>> frequent contacting with the REST endpoint. >>> >>> Best, Hequn >>> >>> >>> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> Hi Hequn, thanks for your answer. >>>> What I'm trying to do is to read a stream of events that basically >>>> contains a UserId field and, every X minutes (i.e. using a Time Window) and >>>> for each different UserId key, query 3 different REST services to enrich my >>>> POJOs*. >>>> For the moment what I do is to use a ProcessWindowFunction after the >>>> .keyBy().window() as shown in the previous mail example to contact those 3 >>>> services and enrich my object. >>>> >>>> However I don't like this solution because I'd like to use Flink to >>>> it's full potential so I'd like to enrich my object using LATERAL TABLEs or >>>> ASYNC IO.. >>>> The main problem I'm facing right now is that I can't find a way to >>>> pass the thumbing window start/end to the LATERAL JOIN table functions >>>> (because this is a parameter of the REST query). >>>> Moreover I don't know whether this use case is something that Table API >>>> aims to solve.. >>>> >>>> * Of course this could kill the REST endpoint if the number of users is >>>> very big ..because of this I'd like to keep the external state of source >>>> tables as an internal Flink state and then do a JOIN on the UserId. The >>>> problem here is that I need to "materialize" them using Debezium (or >>>> similar) via Kafka and dynamic tables..is there any example of keeping >>>> multiple tables synched with Flink state through Debezium (without the need >>>> of rewriting all the logic for managing UPDATE/INSERT/DELETE)? >>>> >>>> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <chenghe...@gmail.com> >>>> wrote: >>>> >>>>> Hi Flavio, >>>>> >>>>> Nice to hear your ideas on Table API! >>>>> >>>>> Could you be more specific about your requirements? A detailed >>>>> scenario would be quite helpful. For example, do you want to emit multi >>>>> records through the collector or do you want to use the timer? >>>>> >>>>> BTW, Table API introduces flatAggregate recently(both non-window >>>>> flatAggregate and window flatAggregate) and will be included in the near >>>>> coming release-1.9. The flatAggregate can emit multi records for a single >>>>> group. More details here[1][2]. >>>>> Hope this can solve your problem. >>>>> >>>>> Best, Hequn >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations >>>>> [2] >>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions >>>>> >>>>> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Hi to all, >>>>>> from what I understood a ProcessWindowFunction can only be used in >>>>>> the Streaming API. >>>>>> Is there any plan to port them also in the Table API (in the near >>>>>> future)? >>>>>> I'd like to do with Table API the equivalent of: >>>>>> >>>>>> final DataStream<MyPojoEvent> events = env.addSource(src); >>>>>> events.filter(e -> e.getCode() != null) >>>>>> .keyBy(event -> Integer.valueOf(event.getCode())) >>>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) >>>>>> .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent, >>>>>> Integer, TimeWindow>() {.....}); >>>>>> >>>>>> Best, >>>>>> Flavio >>>>>> >>>>> >>>> >>>> >> >> -- >> Flavio Pompermaier >> Development Department >> >> OKKAM S.r.l. >> Tel. +(39) 0461 041809 >> >