Hi, After some investigation, I found a simpler way to satisfy the demand, use cdc connector[1] which supports reading database snapshots and continues to read binlogs in the database without deploying kafka and debezium.
Best regards, JING ZHANG [1] https://github.com/ververica/flink-cdc-connectors JING ZHANG <beyond1...@gmail.com> 于2021年6月5日周六 下午2:36写道: > Hi, > Although JDBC connector could not read changlog from Database, however > there are already connectors which could satisfy your demands. You could > use Maxwell <https://maxwells-daemon.io/>[1], Canal > <https://github.com/alibaba/canal/wiki> [2],Debezium > <https://debezium.io/>[3] CDC tools to capture changes in databases, > please have a try. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/maxwell/ > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/canal/ > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/debezium/ > > Best regards, > JING ZHANG > > 1095193...@qq.com <1095193...@qq.com> 于2021年6月5日周六 下午12:56写道: > >> Thanks @JING ZHANG @Yun Gao. I will use processing time temporal join. >> BTW, would we support read changelog for JDBC source when it works as right >> stream of a regular join in future? >> >> ------------------------------ >> 1095193...@qq.com >> >> >> *From:* JING ZHANG <beyond1...@gmail.com> >> *Date:* 2021-06-04 18:32 >> *To:* Yun Gao <yungao...@aliyun.com> >> *CC:* 1095193...@qq.com; user <user@flink.apache.org> >> *Subject:* Re: Flink sql regular join not working as expect. >> Hi, >> JDBC source only does a snapshot and sends all datas in the snapshot to >> downstream when it works as a right stream of a regular join, it could not >> produce a changlog stream. >> After you update the field 'target' from '56.32.15.55:8080 >> <http://56.3215.55:8080/>' to '56.32.15.54:8080', JDBC source would not >> send new data to downstream. >> >> You could try to use Upsert kafka [1] as right side of the regular join >> and set `source` as primary key. >> >> BTW, if use Processing TIme Temporal Join[2] in your case, you could >> always join the latest version of dimension table, but updates on dimension >> table would not trigger join because it only waits for look up by keys. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/ >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins >> >> Best regards, >> JING ZHANG >> >> >> Yun Gao <yungao...@aliyun.com> 于2021年6月4日周五 下午5:07写道: >> >>> Hi, >>> >>> I'm not the expert for the table/sql, but it seems to me that for >>> regular joins, Flink would not re-read the dimension >>> table after it has read it fully for the first time. If you want to >>> always join the records with the latest version of >>> dimension table, you may need to use the temporal joins [1]. >>> >>> Best, >>> Yun >>> >>> >>> [1] >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins >>> >>> >>> ------------------------------------------------------------------ >>> From:1095193...@qq.com <1095193...@qq.com> >>> Send Time:2021 Jun. 4 (Fri.) 16:45 >>> To:user <user@flink.apache.org> >>> Subject:Flink sql regular join not working as expect. >>> >>> Hi >>> I am working on joining a Kafka stream with a Postgres Dimension >>> table. Accoring to: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/ >>> *"**Regular joins are the most generic type of join in which any new >>> record, or changes to either side of the join, are visible and affect the >>> entirety of the join result."* >>> However, in my test, change record in dimenstion table will not >>> affect the result of the join. My test steps: >>> 1. create Kafka table sql >>> CREATE TABLE test1 ( source String ) WITH ( 'connector' = >>> 'kafka', 'topic' = 'test' ...) >>> 2.create dimension table sql >>> CREATE TABLE test2 (source String, target String) WITH ( >>> 'connector' = 'jdbc'... ) >>> Prepared 1 record in dimenion table: >>> source | target >>> 172.16.1.109:8080 | 56.32.15.55:8080 >>> 3. regular join sql >>> select test1.source, test2.target from test1 join test2 on >>> test1.source = test2.source >>> 4. feed data into Kafka >>> {"source":"172.16.1.109:8080"} >>> Flink could output result as expect: +I[172.16.1.109:8080, >>> 56.32.15.55:8080] >>> 5. change field 'target' from '56.32.15.55:8080' to ' >>> 56.32.15.54:8080' in dimension table: >>> source | target >>> 172.16.1.109:8080 56.32.15.54:8080 >>> 6. feed data into Kafka >>> {"source":"172.16.1.109:8080"} >>> Flink still output result as not affected by changes to dimension >>> table: +I[172.16.1.109:8080, 56.32.15.55:8080] >>> Expect result: +I[172.16.1.109:8080, >>> 56.32.15.54:8080] >>> Could you give me some suggestions why regualar join result not be >>> affected by changes to dimension table in mytest? Appreciation. >>> >>> ------------------------------ >>> 1095193...@qq.com >>> >>> >>>