The problem with the LATERAL JOIN (via
a LookupableTableSource+TableFunction because I need to call that function
using the userId a a parameter)  is that I cannot know the window
start/end..to me it's not clear how to get that from
TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
Can you provide a pseudo-code example of how to implement this?

On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi Flavio,
>
> Thanks for your information.
>
> From your description, it seems that you only use the window to get the
> start and end time. There are no aggregations happen. If this is the case,
> you can get the start and end time by yourself(the
> `TimeWindow.getWindowStartWithOffset()` shows how to get window start
> according to the timestamp). To be more specific, if you use processing
> time, you can get your timestamp with System.currentTimeMillis(), and then
> use it to get the window start and end
> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
> the timestamp from the rowtime field.
>
> With the start and end time, you can then perform LATERAL JOIN to enrich
> the information. You can add a cache in your table function to avoid
> frequent contacting with the REST endpoint.
>
> Best, Hequn
>
>
> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Hi Hequn, thanks for your answer.
>> What I'm trying to do is to read a stream of events that basically
>> contains a UserId field and, every X minutes (i.e. using a Time Window) and
>> for each different UserId key, query 3 different REST services to enrich my
>> POJOs*.
>> For the moment what I do is to use a ProcessWindowFunction after the
>> .keyBy().window() as shown in the  previous mail example to contact those 3
>> services and enrich my object.
>>
>> However I don't like this solution because I'd like to use Flink to it's
>> full potential so I'd like to enrich my object using LATERAL TABLEs or
>> ASYNC IO..
>> The main problem I'm facing right now is that  I can't find a way to pass
>> the thumbing window start/end to the LATERAL JOIN table functions (because
>> this is a parameter of the REST query).
>> Moreover I don't know whether this use case is something that Table API
>> aims to solve..
>>
>> * Of course this could kill the REST endpoint if the number of users is
>> very big ..because of this I'd like to keep the external state of source
>> tables as an internal Flink state and then do a JOIN on the UserId. The
>> problem here is that I need to "materialize" them using Debezium (or
>> similar) via Kafka and dynamic tables..is there any example of keeping
>> multiple tables synched with Flink state through Debezium (without the need
>> of rewriting all the logic for managing UPDATE/INSERT/DELETE)?
>>
>> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <chenghe...@gmail.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> Nice to hear your ideas on Table API!
>>>
>>> Could you be more specific about your requirements? A detailed scenario
>>> would be quite helpful. For example, do you want to emit multi records
>>> through the collector or do you want to use the timer?
>>>
>>> BTW, Table API introduces flatAggregate recently(both non-window
>>> flatAggregate and window flatAggregate) and will be included in the near
>>> coming release-1.9. The flatAggregate can emit multi records for a single
>>> group. More details here[1][2].
>>> Hope this can solve your problem.
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions
>>>
>>> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> Hi to all,
>>>> from what I understood a ProcessWindowFunction can only be used in the
>>>> Streaming API.
>>>> Is there any plan to port them also in the Table API (in the near
>>>> future)?
>>>> I'd like to do with Table API the equivalent of:
>>>>
>>>> final DataStream<MyPojoEvent> events = env.addSource(src);
>>>> events.filter(e -> e.getCode() != null)
>>>>     .keyBy(event -> Integer.valueOf(event.getCode()))
>>>>     .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>>     .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent,
>>>> Integer, TimeWindow>()              {.....});
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>
>>
>>

-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809

Reply via email to