Hi Marek, I am afraid I don't have a good answer for your question. The problem indeed is that the JDBC source can work only as a bounded source. As you correctly pointed out, as of now mixing bounded with unbounded sources does not work with checkpointing, which we want to address in the FLIP-147 (that you linked as well).
I agree one solution would be to change the implementation of JDBCDynamicTableSource so that it produces an UNBOUNDED source. Unfortunately it is not the most straightforward task. Another solution would be to actually use a CDC. I think you could use one of the connectors from here[1], which use the embedded Debezium engine, therefore you would not need to setup any external tools, but just embed the CDC in FLINK. Ofc, if I am not mistaken here, as I haven't tried those connectors myself. Unfortunately I don't have any other ideas right now. Maybe someone else can chime in @Timo @Jark Lastly, I think once you solve the problem of a finishing source you could also consider using the temporal join[2] instead of an interval join. Best, Dawid [1] https://github.com/ververica/flink-cdc-connectors [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#temporal-joins On 12/01/2021 16:40, Marek Maj wrote: > Hello, > I am trying to use Flink SQL api to join two tables. My stream data > source is kafka (defined through catalog and schema registry) and my > enrichment data is located in relational database (JDBC connector). I > think this setup reflects quite common use case > > Enrichment table definition looks like this: > CREATE TABLE dim ( > ID BIGINT, > ENRICH STRING, > FROM_DATE TIMESTAMP(6), > TO_DATE TIMESTAMP(6), > WATERMARK FOR TO_DATE AS TO_DATE > ) WITH ( > 'connector' = 'jdbc', > 'url' = ‘…’, > 'table-name' = ‘…’, > 'username' = ‘…’, > 'password' = ‘…’ > ) > > And this is join I use against stream coming from kafka (table with > watermark spec), trunc is udf: > SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`, > D1.ENRICH as `ENRICH`, > T1.FIELD as `FIELD`, > FROM `kafka.topic` T1, dim D1 > WHERE T1.ENRICH_ID = D1.ID <http://D1.ID> > AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND D1.TO_DATE > AND T1.START_TIME >= D1.FROM_DATE > > Result job graph contains two table source scan operators together > with interval join operator. > > The problem I am trying to solve is how to change the character of > enrichment table. Currently, related operator task reads whole data > from table when the job start and finishes afterwards. Ideally, I > would like to have continuously updated enrichment table. Is it > possible to achieve without CDC for example by querying whole database > periodically or use some kind of cache for keys? We can assume that > enrichment table is append only, there are no deletes or updates, only > inserts for new time intervals > > If updates are not possible, how can I deal with finished task? Due to > a known issue [1], all checkpoints are aborted . Maybe I could live > with restarting job to get new enrichment data as it is not refreshed > so frequently, but checkpointing is a must. > > flink version 1.12 > > regards > Marek > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
signature.asc
Description: OpenPGP digital signature