> 2024年10月10日 上午4:34,Yaroslav Tkachenko <yaros...@goldsky.com> 写道: > > Hi Ken, > > Snapshotting is implemented differently in Flink CDC, it doesn't re-use > Debezium's implementation. So you can override some Debezium properties using > "debezium.", but not "debezium.snapshot.".
Yeah, Yaroslav is right that Flink CDC has its own snapshot mode, i.e. scan.startup.mode [1], and we also have plan to disable debezium.snapshot.* to avoid user confusion. Best, Leonard [1]https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mysql-cdc/ > > On Wed, Oct 9, 2024 at 12:46 PM Ken CHUAN YU <ken.hung...@gmail.com > <mailto:ken.hung...@gmail.com>> wrote: > Hi there > I have issue to use flink sql connector to capture change data from > MariaDB(MySQL) when configure “debezium.* settings here are more details: > I have following table in the source database (MariaDB): > ‘’’CREATE TABLE `client_test` ( > `id` int(11) unsigned NOT NULL AUTO_INCREMENT, > `name` varchar(500) NOT NULL DEFAULT '', > `age` int(11) NOT NULL, > PRIMARY KEY (`id`) > ); > ‘’' > > Becasue some reason I need only partial data in this table for the snapshot > so I define the Flink stream table as follow: > > ‘’’CREATE TABLE client_cdc ( > id DOUBLE, > name VARCHAR(500), > age DOUBLE, > PRIMARY KEY(id) NOT ENFORCED > ) > WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'mariadb', > 'port' = '3306', > 'username' = 'ooo', > 'password' = 'ooo', > 'database-name' = 'xxx', > 'scan.startup.mode' = 'initial', > 'table-name' = 'client_test', > 'debezium.snapshot.query.mode' = 'custom', > 'debezium.snapshot.select.statement.overrides' = 'xxx.client_test', > 'debezium.snapshot.select.statement.overrides.xxx.client_test' = 'SELECT * > FROM xxx.client_test WHERE id > 3' > ); > ‘’’ > Above, I tried do filter out the rows which id is less than 3 when sanpshot. > But after execute select * from client_cdc; in flink client I can still see > all the sanpshot. > I also try to run this: > > ‘’’CREATE TABLE client_cdc ( > id DOUBLE, > name VARCHAR(500), > age DOUBLE, > PRIMARY KEY(id) NOT ENFORCED > ) > WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'mariadb', > 'port' = '3306', > 'username' = 'ooo', > 'password' = 'ooo', > 'database-name' = 'xxx', > 'scan.startup.mode' = 'initial', > 'table-name' = 'client_test', > 'debezium.snapshot.query.mode' = 'custom', > 'debezium.snapshot.select.statement.overrides' = 'xxx.client_test', > 'debezium.snapshot.select.statement.overrides.xxx.client_test' = 'this should > failed SELECT * FROM xxx.client_test WHERE id > 3' > ); > ‘’' > This time I give an invaild query to > 'debezium.snapshot.select.statement.overrides.xxx.client_test' but I can > still execute select * from client_cdc; and it still take a full snapshot. In > other word it seems to me the Flink CDC connector is ignoring the settings > are Prefix debezium.* Am I missing anything here? > According to the document I be able to config the debezium but doesn’t seems > the case. > > The expectation is to see only rows selected during snapshot in > configuration: > "debezium.snapshot.select.statement.overrides.[database].[table] “ ex: SELECT > * FROM xxx.client_test WHERE id > 3 I should only see id is greater than 3 > after the snapshot in the stream table even I have id less than 3 in the > table in mysql database > > Am I missing anything here? > > The Flink version I’m using :1.18 > Flink cdc connector I’m using : flink-sql-connector-mysql-cdc-3.1.1 > JDBC version: mysql-connector-j-9.0.0 > Here is the setting about debezium.snapshot.select.statement.overrides : > https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-select-statement-overrides > > <https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-select-statement-overrides> > > Thank you for your help in advanced > Br, > Ken Hung > > > > > > >