lvyanquan commented on code in PR #4027: URL: https://github.com/apache/flink-cdc/pull/4027#discussion_r2108253977
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java: ########## @@ -293,4 +293,15 @@ public class MySqlSourceOptions { .defaultValue(false) .withDescription( "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false."); + + @Experimental + public static final ConfigOption<Boolean> IGNORE_NO_PRIMARY_KEY_TABLE = + ConfigOptions.key("ignore-no-primary-key-table") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to ignore tables without primary key. When enabled, the connector will skip tables " + + "that don't have a primary key. By default these tables will be processed, but for some " + + "scenarios it may be desirable to ignore them since tables without primary keys " + + "might cause performance issues during incremental snapshot."); Review Comment: What would happen if we met the record of these tables in incremental reading phase? ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java: ########## @@ -267,7 +267,8 @@ private boolean shouldEmit(SourceRecord sourceRecord) { ChunkUtils.getChunkKeyColumnType( statefulTaskContext.getDatabaseSchema().tableFor(tableId), statefulTaskContext.getSourceConfig().getChunkKeyColumns(), - statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean()); + statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean(), + statefulTaskContext.getSourceConfig()); Review Comment: Passing statefulTaskContext.getSourceConfig().isIgnoreNoPrimaryKeyTable() is more accurate. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java: ########## @@ -330,4 +330,11 @@ public class MySqlDataSourceOptions { .defaultValue(false) .withDescription( "Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially."); + + @Experimental + public static final ConfigOption<Boolean> IGNORE_NO_PRIMARY_KEY_TABLE = + ConfigOptions.key("ignore-no-primary-key-table") + .booleanType() + .defaultValue(false) + .withDescription("Whether to ignore tables without primary key in MySQL."); Review Comment: Please keep in consistence with MySqlSourceOptions. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java: ########## @@ -85,4 +93,53 @@ void testSplitEvenlySizedChunksNormal() { ChunkRange.of(2147483637, 2147483647), ChunkRange.of(2147483647, null)); } + + @Test + void testIgnoreNoPrimaryKeyTable() throws Exception { + // 创建配置,设置ignoreNoPrimaryKeyTable为true + MySqlSourceConfig sourceConfig = + new MySqlSourceConfigFactory() + .startupOptions(StartupOptions.initial()) + .databaseList("test_db") + .tableList("test_db.test_table") + .hostname("localhost") + .username("test") + .password("test") + .serverTimeZone(ZoneId.of("UTC").toString()) + .ignoreNoPrimaryKeyTable(true) + .createConfig(0); + + // 创建一个简单的MySqlSchema实现 + MySqlSchema schema = + new MySqlSchema(sourceConfig, true) { + @Override + public TableChanges.TableChange getTableSchema( + MySqlPartition partition, JdbcConnection jdbc, TableId tableId) { + // 创建一个没有主键的表 + Table noPkTable = + Table.editor() + .tableId(tableId) + .addColumn( + Column.editor() + .name("id") + .type("BIGINT") + .jdbcType(-5) + .optional(false) + .create()) + .create(); + return new TableChanges.TableChange( + TableChanges.TableChangeType.CREATE, noPkTable); + } + }; + + MySqlChunkSplitter splitter = new MySqlChunkSplitter(schema, sourceConfig); + MySqlPartition partition = new MySqlPartition("mysql_binlog_source"); + + // 测试无主键表 Review Comment: Avoid using language other than English. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org