The problem with the LATERAL JOIN (via a LookupableTableSource+TableFunction because I need to call that function using the userId a a parameter) is that I cannot know the window start/end..to me it's not clear how to get that from TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)... Can you provide a pseudo-code example of how to implement this?
On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng <chenghe...@gmail.com> wrote: > Hi Flavio, > > Thanks for your information. > > From your description, it seems that you only use the window to get the > start and end time. There are no aggregations happen. If this is the case, > you can get the start and end time by yourself(the > `TimeWindow.getWindowStartWithOffset()` shows how to get window start > according to the timestamp). To be more specific, if you use processing > time, you can get your timestamp with System.currentTimeMillis(), and then > use it to get the window start and end > with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get > the timestamp from the rowtime field. > > With the start and end time, you can then perform LATERAL JOIN to enrich > the information. You can add a cache in your table function to avoid > frequent contacting with the REST endpoint. > > Best, Hequn > > > On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Hi Hequn, thanks for your answer. >> What I'm trying to do is to read a stream of events that basically >> contains a UserId field and, every X minutes (i.e. using a Time Window) and >> for each different UserId key, query 3 different REST services to enrich my >> POJOs*. >> For the moment what I do is to use a ProcessWindowFunction after the >> .keyBy().window() as shown in the previous mail example to contact those 3 >> services and enrich my object. >> >> However I don't like this solution because I'd like to use Flink to it's >> full potential so I'd like to enrich my object using LATERAL TABLEs or >> ASYNC IO.. >> The main problem I'm facing right now is that I can't find a way to pass >> the thumbing window start/end to the LATERAL JOIN table functions (because >> this is a parameter of the REST query). >> Moreover I don't know whether this use case is something that Table API >> aims to solve.. >> >> * Of course this could kill the REST endpoint if the number of users is >> very big ..because of this I'd like to keep the external state of source >> tables as an internal Flink state and then do a JOIN on the UserId. The >> problem here is that I need to "materialize" them using Debezium (or >> similar) via Kafka and dynamic tables..is there any example of keeping >> multiple tables synched with Flink state through Debezium (without the need >> of rewriting all the logic for managing UPDATE/INSERT/DELETE)? >> >> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <chenghe...@gmail.com> wrote: >> >>> Hi Flavio, >>> >>> Nice to hear your ideas on Table API! >>> >>> Could you be more specific about your requirements? A detailed scenario >>> would be quite helpful. For example, do you want to emit multi records >>> through the collector or do you want to use the timer? >>> >>> BTW, Table API introduces flatAggregate recently(both non-window >>> flatAggregate and window flatAggregate) and will be included in the near >>> coming release-1.9. The flatAggregate can emit multi records for a single >>> group. More details here[1][2]. >>> Hope this can solve your problem. >>> >>> Best, Hequn >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions >>> >>> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> Hi to all, >>>> from what I understood a ProcessWindowFunction can only be used in the >>>> Streaming API. >>>> Is there any plan to port them also in the Table API (in the near >>>> future)? >>>> I'd like to do with Table API the equivalent of: >>>> >>>> final DataStream<MyPojoEvent> events = env.addSource(src); >>>> events.filter(e -> e.getCode() != null) >>>> .keyBy(event -> Integer.valueOf(event.getCode())) >>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) >>>> .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent, >>>> Integer, TimeWindow>() {.....}); >>>> >>>> Best, >>>> Flavio >>>> >>> >> >> -- Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809