The LookupTableSource is used when you join based on processing time, as
described in here[1]. Moreover it supports only equi lookups, therefore
it won't work with range queries as in your case.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#processing-time-temporal-join

On 15/01/2021 09:25, Marek Maj wrote:
> Hi Dawid,
> thanks for your answers!
>
> I guess CDC and Debezium will be the right choice for that case. Iit's
> integration with oracle however, is in incubating phase as
> documentation states [1], we will need to investigate further.
>
> I was hoping there will be some way to incorporate LookupCache [2] in
> that scenario and bypass the problem of finished tasks.
> JDBCDynamicTableSource, that you mentioned, implements
> both LookupTableSource and ScanTableSource, but.I could not force the
> planner to invoke getLookupRuntimeProvider [3] 
>
> Anyway, I will take a closer look at JDBCDynamicTableSource
> implementation and try to find a workaround
>
> best regards,
> Marek
>
> [1] https://debezium.io/documentation/reference/connectors/oracle.html
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#lookup-cache
> [3] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/table/connector/source/LookupTableSource.html#getLookupRuntimeProvider-org.apache.flink.table.connector.source.LookupTableSource.LookupContext-
>
>
> czw., 14 sty 2021 o 20:07 Dawid Wysakowicz <dwysakow...@apache.org
> <mailto:dwysakow...@apache.org>> napisał(a):
>
>     Hi Marek,
>
>     I am afraid I don't have a good answer for your question. The
>     problem indeed is that the JDBC source can work only as a bounded
>     source. As you correctly pointed out, as of now mixing bounded
>     with unbounded sources does not work with checkpointing, which we
>     want to address in the FLIP-147 (that you linked as well).
>
>     I agree one solution would be to change the implementation of
>     JDBCDynamicTableSource so that it produces an UNBOUNDED source.
>     Unfortunately it is not the most straightforward task.
>
>     Another solution would be to actually use a CDC. I think you could
>     use one of the connectors from here[1], which use the embedded
>     Debezium engine, therefore you would not need to setup any
>     external tools, but just embed the CDC in FLINK. Ofc, if I am not
>     mistaken here, as I haven't tried those connectors myself.
>
>     Unfortunately I don't have any other ideas right now. Maybe
>     someone else can chime in @Timo @Jark
>
>     Lastly, I think once you solve the problem of a finishing source
>     you could also consider using the temporal join[2] instead of an
>     interval join.
>
>     Best,
>
>     Dawid
>
>     [1] https://github.com/ververica/flink-cdc-connectors
>
>     [2]
>     
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#temporal-joins
>
>     On 12/01/2021 16:40, Marek Maj wrote:
>>     Hello,
>>     I am trying to use Flink SQL api to join two tables. My stream
>>     data source is kafka (defined through catalog and schema
>>     registry) and my enrichment data is located in relational
>>     database (JDBC connector). I think this setup reflects quite
>>     common use case
>>
>>     Enrichment table definition looks like this:
>>     CREATE TABLE dim (
>>       ID BIGINT,
>>       ENRICH STRING,
>>       FROM_DATE TIMESTAMP(6),
>>       TO_DATE TIMESTAMP(6),
>>       WATERMARK FOR TO_DATE AS TO_DATE
>>     ) WITH (
>>        'connector' = 'jdbc',
>>        'url' = ‘…’,
>>        'table-name' = ‘…’,
>>        'username' = ‘…’,
>>        'password' = ‘…’
>>     )
>>
>>     And this is join I use against stream coming from kafka (table
>>     with watermark spec), trunc is udf:
>>     SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`,
>>     D1.ENRICH as `ENRICH`,
>>     T1.FIELD as `FIELD`,
>>     FROM `kafka.topic` T1, dim D1
>>     WHERE T1.ENRICH_ID = D1.ID <http://D1.ID>
>>     AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND
>>     D1.TO_DATE
>>     AND T1.START_TIME >= D1.FROM_DATE
>>
>>     Result job graph contains two table source scan operators
>>     together with interval join operator.
>>      
>>     The problem I am trying to solve is how to change the character
>>     of enrichment table. Currently, related operator task reads whole
>>     data from table when the job start and finishes afterwards.
>>     Ideally, I would like to have continuously updated enrichment
>>     table. Is it possible to achieve without CDC for example by
>>     querying whole database periodically or use some kind of cache
>>     for keys? We can assume that enrichment table is append only,
>>     there are no deletes or updates, only inserts for new time intervals
>>
>>     If updates are not possible, how can I deal with finished task?
>>     Due to a known issue [1], all checkpoints are aborted . Maybe I
>>     could live with restarting job to get new enrichment data as it
>>     is not refreshed so frequently, but checkpointing is a must.
>>
>>     flink version 1.12
>>
>>     regards
>>     Marek
>>
>>     [1]
>>     
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to