Good point, thanks for the clarification best regards, Marek
pt., 15 sty 2021 o 09:32 Dawid Wysakowicz <dwysakow...@apache.org> napisał(a): > The LookupTableSource is used when you join based on processing time, as > described in here[1]. Moreover it supports only equi lookups, therefore it > won't work with range queries as in your case. > > Best, > > Dawid > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#processing-time-temporal-join > On 15/01/2021 09:25, Marek Maj wrote: > > Hi Dawid, > thanks for your answers! > > I guess CDC and Debezium will be the right choice for that case. Iit's > integration with oracle however, is in incubating phase as documentation > states [1], we will need to investigate further. > > I was hoping there will be some way to incorporate LookupCache [2] in that > scenario and bypass the problem of finished tasks. JDBCDynamicTableSource, > that you mentioned, implements both LookupTableSource and ScanTableSource, > but.I could not force the planner to invoke getLookupRuntimeProvider [3] > > Anyway, I will take a closer look at JDBCDynamicTableSource implementation > and try to find a workaround > > best regards, > Marek > > [1] https://debezium.io/documentation/reference/connectors/oracle.html > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html#lookup-cache > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/table/connector/source/LookupTableSource.html#getLookupRuntimeProvider-org.apache.flink.table.connector.source.LookupTableSource.LookupContext- > > > czw., 14 sty 2021 o 20:07 Dawid Wysakowicz <dwysakow...@apache.org> > napisał(a): > >> Hi Marek, >> >> I am afraid I don't have a good answer for your question. The problem >> indeed is that the JDBC source can work only as a bounded source. As you >> correctly pointed out, as of now mixing bounded with unbounded sources does >> not work with checkpointing, which we want to address in the FLIP-147 (that >> you linked as well). >> >> I agree one solution would be to change the implementation of >> JDBCDynamicTableSource so that it produces an UNBOUNDED source. >> Unfortunately it is not the most straightforward task. >> >> Another solution would be to actually use a CDC. I think you could use >> one of the connectors from here[1], which use the embedded Debezium engine, >> therefore you would not need to setup any external tools, but just embed >> the CDC in FLINK. Ofc, if I am not mistaken here, as I haven't tried those >> connectors myself. >> >> Unfortunately I don't have any other ideas right now. Maybe someone else >> can chime in @Timo @Jark >> >> Lastly, I think once you solve the problem of a finishing source you >> could also consider using the temporal join[2] instead of an interval join. >> >> Best, >> >> Dawid >> >> [1] https://github.com/ververica/flink-cdc-connectors >> >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#temporal-joins >> On 12/01/2021 16:40, Marek Maj wrote: >> >> Hello, >> I am trying to use Flink SQL api to join two tables. My stream data >> source is kafka (defined through catalog and schema registry) and my >> enrichment data is located in relational database (JDBC connector). I think >> this setup reflects quite common use case >> >> Enrichment table definition looks like this: >> CREATE TABLE dim ( >> ID BIGINT, >> ENRICH STRING, >> FROM_DATE TIMESTAMP(6), >> TO_DATE TIMESTAMP(6), >> WATERMARK FOR TO_DATE AS TO_DATE >> ) WITH ( >> 'connector' = 'jdbc', >> 'url' = ‘…’, >> 'table-name' = ‘…’, >> 'username' = ‘…’, >> 'password' = ‘…’ >> ) >> >> And this is join I use against stream coming from kafka (table with >> watermark spec), trunc is udf: >> SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`, >> D1.ENRICH as `ENRICH`, >> T1.FIELD as `FIELD`, >> FROM `kafka.topic` T1, dim D1 >> WHERE T1.ENRICH_ID = D1.ID >> AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND D1.TO_DATE >> AND T1.START_TIME >= D1.FROM_DATE >> >> Result job graph contains two table source scan operators together with >> interval join operator. >> >> The problem I am trying to solve is how to change the character of >> enrichment table. Currently, related operator task reads whole data from >> table when the job start and finishes afterwards. Ideally, I would like to >> have continuously updated enrichment table. Is it possible to achieve >> without CDC for example by querying whole database periodically or use some >> kind of cache for keys? We can assume that enrichment table is append only, >> there are no deletes or updates, only inserts for new time intervals >> >> If updates are not possible, how can I deal with finished task? Due to a >> known issue [1], all checkpoints are aborted . Maybe I could live with >> restarting job to get new enrichment data as it is not refreshed so >> frequently, but checkpointing is a must. >> >> flink version 1.12 >> >> regards >> Marek >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished >> >>