Hi Dawid, thanks for your answers! I guess CDC and Debezium will be the right choice for that case. Iit's integration with oracle however, is in incubating phase as documentation states [1], we will need to investigate further.
I was hoping there will be some way to incorporate LookupCache [2] in that scenario and bypass the problem of finished tasks. JDBCDynamicTableSource, that you mentioned, implements both LookupTableSource and ScanTableSource, but.I could not force the planner to invoke getLookupRuntimeProvider [3] Anyway, I will take a closer look at JDBCDynamicTableSource implementation and try to find a workaround best regards, Marek [1] https://debezium.io/documentation/reference/connectors/oracle.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#lookup-cache [3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/table/connector/source/LookupTableSource.html#getLookupRuntimeProvider-org.apache.flink.table.connector.source.LookupTableSource.LookupContext- czw., 14 sty 2021 o 20:07 Dawid Wysakowicz <dwysakow...@apache.org> napisał(a): > Hi Marek, > > I am afraid I don't have a good answer for your question. The problem > indeed is that the JDBC source can work only as a bounded source. As you > correctly pointed out, as of now mixing bounded with unbounded sources does > not work with checkpointing, which we want to address in the FLIP-147 (that > you linked as well). > > I agree one solution would be to change the implementation of > JDBCDynamicTableSource so that it produces an UNBOUNDED source. > Unfortunately it is not the most straightforward task. > > Another solution would be to actually use a CDC. I think you could use one > of the connectors from here[1], which use the embedded Debezium engine, > therefore you would not need to setup any external tools, but just embed > the CDC in FLINK. Ofc, if I am not mistaken here, as I haven't tried those > connectors myself. > > Unfortunately I don't have any other ideas right now. Maybe someone else > can chime in @Timo @Jark > > Lastly, I think once you solve the problem of a finishing source you could > also consider using the temporal join[2] instead of an interval join. > > Best, > > Dawid > > [1] https://github.com/ververica/flink-cdc-connectors > > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#temporal-joins > On 12/01/2021 16:40, Marek Maj wrote: > > Hello, > I am trying to use Flink SQL api to join two tables. My stream data source > is kafka (defined through catalog and schema registry) and my enrichment > data is located in relational database (JDBC connector). I think this setup > reflects quite common use case > > Enrichment table definition looks like this: > CREATE TABLE dim ( > ID BIGINT, > ENRICH STRING, > FROM_DATE TIMESTAMP(6), > TO_DATE TIMESTAMP(6), > WATERMARK FOR TO_DATE AS TO_DATE > ) WITH ( > 'connector' = 'jdbc', > 'url' = ‘…’, > 'table-name' = ‘…’, > 'username' = ‘…’, > 'password' = ‘…’ > ) > > And this is join I use against stream coming from kafka (table with > watermark spec), trunc is udf: > SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`, > D1.ENRICH as `ENRICH`, > T1.FIELD as `FIELD`, > FROM `kafka.topic` T1, dim D1 > WHERE T1.ENRICH_ID = D1.ID > AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND D1.TO_DATE > AND T1.START_TIME >= D1.FROM_DATE > > Result job graph contains two table source scan operators together with > interval join operator. > > The problem I am trying to solve is how to change the character of > enrichment table. Currently, related operator task reads whole data from > table when the job start and finishes afterwards. Ideally, I would like to > have continuously updated enrichment table. Is it possible to achieve > without CDC for example by querying whole database periodically or use some > kind of cache for keys? We can assume that enrichment table is append only, > there are no deletes or updates, only inserts for new time intervals > > If updates are not possible, how can I deal with finished task? Due to a > known issue [1], all checkpoints are aborted . Maybe I could live with > restarting job to get new enrichment data as it is not refreshed so > frequently, but checkpointing is a must. > > flink version 1.12 > > regards > Marek > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished > >