I probably messed up with the meaning of eval()..thus it is called once for
every distinct key (that could be composed by a combination of fields)?
So, the other question is..how do I enable Blink planner support?
Since when is LATERAL TABLE available in Flink? Is it equivalent to using
temporal tables [1]?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html

Best,
Flavio

On Sat, Jun 29, 2019 at 3:16 AM JingsongLee <lzljs3620...@aliyun.com> wrote:

> The keys means joint primary keys, it is not list of keys, in your case,
> maybe there is a single key?
>
> Best, Jingsong Lee
>
>
> 来自阿里邮箱 iPhone版
> <https://itunes.apple.com/us/app/a-li-yun-you/id923828102?l=zh&ls=1&mt=8>
>
> ------------------Original Mail ------------------
> *From:*Flavio Pompermaier <pomperma...@okkam.it>
> *Date:*2019-06-28 22:53:31
> *Recipient:*JingsongLee <lzljs3620...@aliyun.com>
> *CC:*user <user@flink.apache.org>
> *Subject:*Re: LookupableTableSource question
> Sorry I copied and pasted twice the current eval method...I'd do this:
>
> public void eval(Object... keys) {
>     for (Object kkk : keys) {
>         Row keyRow = Row.of(kkk);
>         if (cache != null) {
>             List<Row> cachedRows = cache.getIfPresent(keyRow);
>             if (cachedRows != null) {
>                 for (Row cachedRow : cachedRows) {
>                     collect(cachedRow);
>                 }
>                 return;
>             }
>         }
>     }
>  ...
>
> On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> This could be a good fit, I'll try to dig into it and see if it can be
>> adapted to a REST service.
>> The only strange thing I see is that the key of the local cache is per
>> block of keys..am I wrong?
>> Shouldn't it cycle over the list of passed keys?
>>
>> Right now it's the following:
>>
>> Cache<Row, List<Row>> cache;
>>
>> public void eval(Object... keys) {
>>     Row keyRow = Row.of(keys);
>>     if (cache != null) {
>>         List<Row> cachedRows = cache.getIfPresent(keyRow);
>>         if (cachedRows != null) {
>>             for (Row cachedRow : cachedRows) {
>>                 collect(cachedRow);
>>             }
>>             return;
>>         }
>>     }
>>  ...
>>
>> while I'd use the following (also for JDBC):
>>
>> Cache<Row, List<Row>> cache;
>>
>> public void eval(Object... keys) {
>>     Row keyRow = Row.of(keys);
>>     if (cache != null) {
>>         List<Row> cachedRows = cache.getIfPresent(keyRow);
>>         if (cachedRows != null) {
>>             for (Row cachedRow : cachedRows) {
>>                 collect(cachedRow);
>>             }
>>             return;
>>         }
>>     }
>>  ...
>>
>> public void eval(Object... keys) {
>>     for (Object kkk : keys) {
>>         Row keyRow = Row.of(kkk);
>>         if (cache != null) {
>>             List<Row> cachedRows = cache.getIfPresent(keyRow);
>>             if (cachedRows != null) {
>>                 for (Row cachedRow : cachedRows) {
>>                     collect(cachedRow);
>>                 }
>>                 return;
>>             }
>>         }
>>     }
>>  ...
>>
>> Am I missing something?
>>
>>
>> On Fri, Jun 28, 2019 at 4:18 PM JingsongLee <lzljs3620...@aliyun.com>
>> wrote:
>>
>>> Hi Flavio:
>>>
>>> I just implement a JDBCLookupFunction[1]. You can use it as table
>>> function[2]. Or use
>>> blink temporal table join[3] (Need blink planner support).
>>> I add a google guava cache in JDBCLookupFunction with configurable
>>> cacheMaxSize
>>> (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
>>> Is that you want?
>>>
>>> [1]
>>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
>>> [2]
>>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
>>> [3]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75
>>>
>>>  Best, JingsongLee
>>>
>>> ------------------------------------------------------------------
>>> From:Flavio Pompermaier <pomperma...@okkam.it>
>>> Send Time:2019年6月28日(星期五) 21:04
>>> To:user <user@flink.apache.org>
>>> Subject:LookupableTableSource question
>>>
>>> Hi to all,
>>> I have a use case where I'd like to enrich a stream using a rarely
>>> updated lookup table.
>>> Basically, I'd like to be able to set a refresh policy that is triggered
>>> either when a key was not found (a new key has probably been added in the
>>> mean time) or a configurable refresh-period has elapsed.
>>>
>>> Is there any suggested solution to this? The LookupableTableSource looks
>>> very similar to what I'd like to achieve but I can't find a real-world
>>> example using it and it lacks of such 2 requirements (key-values are not
>>> refreshed after a configurable timeout and a KeyNotFound callback cannot be
>>> handled).
>>>
>>> Any help is appreciated,
>>> Flavio
>>>
>>>
>>>
>>
>

Reply via email to