Thanks Hequn, I'll give it a try!

Best, Flavio

On Thu, Jul 11, 2019 at 3:38 AM Hequn Cheng <chenghe...@gmail.com> wrote:

> Hi,
>
> > Can you provide a pseudo-code example of how to implement this?
> Processing time
> If you use a TumblingProcessingTimeWindows.of(Time.seconds(1)), for each
> record, you get the timestamp from System.currentTimeMillis(), say t, and
> w_start = TimeWindow.getWindowStartWithOffset(t, 0, 1000), and w_end =
> w_start + 1000.
>
> Event time
> If you use a TumblingEventTimeWindows.of(Time.seconds(1)), for each
> record, get the timestamp from the corresponding timestamp field, say t,
> and get w_start and w_end same as above.
>
> More examples can be found in TimeWindowTest[1].
>
> Best, Hequn
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTest.java
>
>
> On Wed, Jul 10, 2019 at 8:46 PM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> The problem with the LATERAL JOIN (via
>> a LookupableTableSource+TableFunction because I need to call that function
>> using the userId a a parameter)  is that I cannot know the window
>> start/end..to me it's not clear how to get that from
>> TimeWindow.getWindowStartWithOffset(timestamp, offset, windowSize)...
>> Can you provide a pseudo-code example of how to implement this?
>>
>> On Tue, Jul 9, 2019 at 4:15 AM Hequn Cheng <chenghe...@gmail.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> Thanks for your information.
>>>
>>> From your description, it seems that you only use the window to get the
>>> start and end time. There are no aggregations happen. If this is the case,
>>> you can get the start and end time by yourself(the
>>> `TimeWindow.getWindowStartWithOffset()` shows how to get window start
>>> according to the timestamp). To be more specific, if you use processing
>>> time, you can get your timestamp with System.currentTimeMillis(), and then
>>> use it to get the window start and end
>>> with `TimeWindow.getWindowStartWithOffset()`. For even time, you can get
>>> the timestamp from the rowtime field.
>>>
>>> With the start and end time, you can then perform LATERAL JOIN to enrich
>>> the information. You can add a cache in your table function to avoid
>>> frequent contacting with the REST endpoint.
>>>
>>> Best, Hequn
>>>
>>>
>>> On Mon, Jul 8, 2019 at 10:46 PM Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> Hi Hequn, thanks for your answer.
>>>> What I'm trying to do is to read a stream of events that basically
>>>> contains a UserId field and, every X minutes (i.e. using a Time Window) and
>>>> for each different UserId key, query 3 different REST services to enrich my
>>>> POJOs*.
>>>> For the moment what I do is to use a ProcessWindowFunction after the
>>>> .keyBy().window() as shown in the  previous mail example to contact those 3
>>>> services and enrich my object.
>>>>
>>>> However I don't like this solution because I'd like to use Flink to
>>>> it's full potential so I'd like to enrich my object using LATERAL TABLEs or
>>>> ASYNC IO..
>>>> The main problem I'm facing right now is that  I can't find a way to
>>>> pass the thumbing window start/end to the LATERAL JOIN table functions
>>>> (because this is a parameter of the REST query).
>>>> Moreover I don't know whether this use case is something that Table API
>>>> aims to solve..
>>>>
>>>> * Of course this could kill the REST endpoint if the number of users is
>>>> very big ..because of this I'd like to keep the external state of source
>>>> tables as an internal Flink state and then do a JOIN on the UserId. The
>>>> problem here is that I need to "materialize" them using Debezium (or
>>>> similar) via Kafka and dynamic tables..is there any example of keeping
>>>> multiple tables synched with Flink state through Debezium (without the need
>>>> of rewriting all the logic for managing UPDATE/INSERT/DELETE)?
>>>>
>>>> On Mon, Jul 8, 2019 at 3:55 PM Hequn Cheng <chenghe...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> Nice to hear your ideas on Table API!
>>>>>
>>>>> Could you be more specific about your requirements? A detailed
>>>>> scenario would be quite helpful. For example, do you want to emit multi
>>>>> records through the collector or do you want to use the timer?
>>>>>
>>>>> BTW, Table API introduces flatAggregate recently(both non-window
>>>>> flatAggregate and window flatAggregate) and will be included in the near
>>>>> coming release-1.9. The flatAggregate can emit multi records for a single
>>>>> group. More details here[1][2].
>>>>> Hope this can solve your problem.
>>>>>
>>>>> Best, Hequn
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#row-based-operations
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-aggregation-functions
>>>>>
>>>>> On Mon, Jul 8, 2019 at 6:27 PM Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> Hi to all,
>>>>>> from what I understood a ProcessWindowFunction can only be used in
>>>>>> the Streaming API.
>>>>>> Is there any plan to port them also in the Table API (in the near
>>>>>> future)?
>>>>>> I'd like to do with Table API the equivalent of:
>>>>>>
>>>>>> final DataStream<MyPojoEvent> events = env.addSource(src);
>>>>>> events.filter(e -> e.getCode() != null)
>>>>>>     .keyBy(event -> Integer.valueOf(event.getCode()))
>>>>>>     .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>>>>     .process(new ProcessWindowFunction<MyPojoEvent, MyPojoEvent,
>>>>>> Integer, TimeWindow>()              {.....});
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>
>>>>
>>>>
>>
>> --
>> Flavio Pompermaier
>> Development Department
>>
>> OKKAM S.r.l.
>> Tel. +(39) 0461 041809
>>
>

Reply via email to