Thanks Hequn, I'll give it a try!

Best, Flavio

On Thu, Jul 11, 2019 at 3:38 AM Hequn Cheng <> wrote:

> Hi,
> > Can you provide a pseudo-code example of how to implement this?
> Processing time
> If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each
> record, you get the timestamp from System.currentTimeMillis(), say t, and
> w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end =
> w_start + 1000.
> Event time
> If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each
> record, get the timestamp from the corresponding timestamp field, say t,
> and get w_start and w_end same as above.
> More examples can be found in TimeWindowTest[1].
> Best, Hequn
> [1]
> On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier <>
> wrote:
>> The problem with the LATERAL JOIN (via
>> a LookupableTableSource+TableFunction because I need to call that function
>> using the userId a a parameter)  is that I cannot know the window
>> start/ me it's not clear how to get that from
>> TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
>> Can you provide a pseudo-code example of how to implement this?
>> On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng <> wrote:
>>> Hi Flavio,
>>> Thanks for your information.
>>> From your description, it seems that you only use the window to get the
>>> start and end time. There are no aggregations happen. If this is the case,
>>> you can get the start and end time by yourself(the
>>> `TimeWindow.getWindowStartWithOffset()` shows how to get window start
>>> according to the timestamp). To be more specific, if you use processing
>>> time, you can get your timestamp with System.currentTimeMillis(), and then
>>> use it to get the window start and end
>>> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
>>> the timestamp from the rowtime field.
>>> With the start and end time, you can then perform LATERAL JOIN to enrich
>>> the information. You can add a cache in your table function to avoid
>>> frequent contacting with the REST endpoint.
>>> Best, Hequn
>>> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <>
>>> wrote:
>>>> Hi Hequn, thanks for your answer.
>>>> What I'm trying to do is to read a stream of events that basically
>>>> contains a UserId field and, every X minutes (i.e. using a Time Window) and
>>>> for each different UserId key, query 3 different REST services to enrich my
>>>> POJOs*.
>>>> For the moment what I do is to use a ProcessWindowFunction after the
>>>> .keyBy().window() as shown in the  previous mail example to contact those 3
>>>> services and enrich my object.
>>>> However I don't like this solution because I'd like to use Flink to
>>>> it's full potential so I'd like to enrich my object using LATERAL TABLEs or
>>>> ASYNC IO..
>>>> The main problem I'm facing right now is that  I can't find a way to
>>>> pass the thumbing window start/end to the LATERAL JOIN table functions
>>>> (because this is a parameter of the REST query).
>>>> Moreover I don't know whether this use case is something that Table API
>>>> aims to solve..
>>>> * Of course this could kill the REST endpoint if the number of users is
>>>> very big ..because of this I'd like to keep the external state of source
>>>> tables as an internal Flink state and then do a JOIN on the UserId. The
>>>> problem here is that I need to "materialize" them using Debezium (or
>>>> similar) via Kafka and dynamic there any example of keeping
>>>> multiple tables synched with Flink state through Debezium (without the need
>>>> of rewriting all the logic for managing UPDATE/INSERT/DELETE)?
>>>> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <>
>>>> wrote:
>>>>> Hi Flavio,
>>>>> Nice to hear your ideas on Table API!
>>>>> Could you be more specific about your requirements? A detailed
>>>>> scenario would be quite helpful. For example, do you want to emit multi
>>>>> records through the collector or do you want to use the timer?
>>>>> BTW, Table API introduces flatAggregate recently(both non-window
>>>>> flatAggregate and window flatAggregate) and will be included in the near
>>>>> coming release-1.9. The flatAggregate can emit multi records for a single
>>>>> group. More details here[1][2].
>>>>> Hope this can solve your problem.
>>>>> Best, Hequn
>>>>> [1]
>>>>> [2]
>>>>> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <
>>>>>> wrote:
>>>>>> Hi to all,
>>>>>> from what I understood a ProcessWindowFunction can only be used in
>>>>>> the Streaming API.
>>>>>> Is there any plan to port them also in the Table API (in the near
>>>>>> future)?
>>>>>> I'd like to do with Table API the equivalent of:
>>>>>> final DataStream<MyPojoEvent> events = env.addSource(src);
>>>>>> events.filter(e -> e.getCode() != null)
>>>>>>     .keyBy(event -> Integer.valueOf(event.getCode()))
>>>>>>     .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>>>>     .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent,
>>>>>> Integer, TimeWindow>()              {.....});
>>>>>> Best,
>>>>>> Flavio
>> --
>> Flavio Pompermaier
>> Development Department
>> OKKAM S.r.l.
>> Tel. +(39) 0461 041809

Reply via email to