The LookupTableSource is used when you join based on processing time, as described in here[1]. Moreover it supports only equi lookups, therefore it won't work with range queries as in your case.
Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#processing-time-temporal-join On 15/01/2021 09:25, Marek Maj wrote: > Hi Dawid, > thanks for your answers! > > I guess CDC and Debezium will be the right choice for that case. Iit's > integration with oracle however, is in incubating phase as > documentation states [1], we will need to investigate further. > > I was hoping there will be some way to incorporate LookupCache [2] in > that scenario and bypass the problem of finished tasks. > JDBCDynamicTableSource, that you mentioned, implements > both LookupTableSource and ScanTableSource, but.I could not force the > planner to invoke getLookupRuntimeProvider [3] > > Anyway, I will take a closer look at JDBCDynamicTableSource > implementation and try to find a workaround > > best regards, > Marek > > [1] https://debezium.io/documentation/reference/connectors/oracle.html > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#lookup-cache > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/table/connector/source/LookupTableSource.html#getLookupRuntimeProvider-org.apache.flink.table.connector.source.LookupTableSource.LookupContext- > > > czw., 14 sty 2021 o 20:07 Dawid Wysakowicz <dwysakow...@apache.org > <mailto:dwysakow...@apache.org>> napisał(a): > > Hi Marek, > > I am afraid I don't have a good answer for your question. The > problem indeed is that the JDBC source can work only as a bounded > source. As you correctly pointed out, as of now mixing bounded > with unbounded sources does not work with checkpointing, which we > want to address in the FLIP-147 (that you linked as well). > > I agree one solution would be to change the implementation of > JDBCDynamicTableSource so that it produces an UNBOUNDED source. > Unfortunately it is not the most straightforward task. > > Another solution would be to actually use a CDC. I think you could > use one of the connectors from here[1], which use the embedded > Debezium engine, therefore you would not need to setup any > external tools, but just embed the CDC in FLINK. Ofc, if I am not > mistaken here, as I haven't tried those connectors myself. > > Unfortunately I don't have any other ideas right now. Maybe > someone else can chime in @Timo @Jark > > Lastly, I think once you solve the problem of a finishing source > you could also consider using the temporal join[2] instead of an > interval join. > > Best, > > Dawid > > [1] https://github.com/ververica/flink-cdc-connectors > > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#temporal-joins > > On 12/01/2021 16:40, Marek Maj wrote: >> Hello, >> I am trying to use Flink SQL api to join two tables. My stream >> data source is kafka (defined through catalog and schema >> registry) and my enrichment data is located in relational >> database (JDBC connector). I think this setup reflects quite >> common use case >> >> Enrichment table definition looks like this: >> CREATE TABLE dim ( >> ID BIGINT, >> ENRICH STRING, >> FROM_DATE TIMESTAMP(6), >> TO_DATE TIMESTAMP(6), >> WATERMARK FOR TO_DATE AS TO_DATE >> ) WITH ( >> 'connector' = 'jdbc', >> 'url' = ‘…’, >> 'table-name' = ‘…’, >> 'username' = ‘…’, >> 'password' = ‘…’ >> ) >> >> And this is join I use against stream coming from kafka (table >> with watermark spec), trunc is udf: >> SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`, >> D1.ENRICH as `ENRICH`, >> T1.FIELD as `FIELD`, >> FROM `kafka.topic` T1, dim D1 >> WHERE T1.ENRICH_ID = D1.ID <http://D1.ID> >> AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND >> D1.TO_DATE >> AND T1.START_TIME >= D1.FROM_DATE >> >> Result job graph contains two table source scan operators >> together with interval join operator. >> >> The problem I am trying to solve is how to change the character >> of enrichment table. Currently, related operator task reads whole >> data from table when the job start and finishes afterwards. >> Ideally, I would like to have continuously updated enrichment >> table. Is it possible to achieve without CDC for example by >> querying whole database periodically or use some kind of cache >> for keys? We can assume that enrichment table is append only, >> there are no deletes or updates, only inserts for new time intervals >> >> If updates are not possible, how can I deal with finished task? >> Due to a known issue [1], all checkpoints are aborted . Maybe I >> could live with restarting job to get new enrichment data as it >> is not refreshed so frequently, but checkpointing is a must. >> >> flink version 1.12 >> >> regards >> Marek >> >> [1] >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished >
signature.asc
Description: OpenPGP digital signature