lvyanquan commented on code in PR #4027:
URL: https://github.com/apache/flink-cdc/pull/4027#discussion_r2108253977


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java:
##########
@@ -293,4 +293,15 @@ public class MySqlSourceOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Whether to assign the unbounded chunks first 
during snapshot reading phase. This might help reduce the risk of the 
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of 
the largest unbounded chunk. Defaults to false.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> IGNORE_NO_PRIMARY_KEY_TABLE =
+            ConfigOptions.key("ignore-no-primary-key-table")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to ignore tables without primary key. 
When enabled, the connector will skip tables "
+                                    + "that don't have a primary key. By 
default these tables will be processed, but for some "
+                                    + "scenarios it may be desirable to ignore 
them since tables without primary keys "
+                                    + "might cause performance issues during 
incremental snapshot.");

Review Comment:
   What would happen if we met the record of these tables in incremental 
reading phase?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java:
##########
@@ -267,7 +267,8 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
                         ChunkUtils.getChunkKeyColumnType(
                                 
statefulTaskContext.getDatabaseSchema().tableFor(tableId),
                                 
statefulTaskContext.getSourceConfig().getChunkKeyColumns(),
-                                
statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean());
+                                
statefulTaskContext.getSourceConfig().isTreatTinyInt1AsBoolean(),
+                                statefulTaskContext.getSourceConfig());

Review Comment:
   Passing statefulTaskContext.getSourceConfig().isIgnoreNoPrimaryKeyTable() is 
more accurate.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java:
##########
@@ -330,4 +330,11 @@ public class MySqlDataSourceOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Whether to skip backfill in snapshot reading 
phase. If backfill is skipped, changes on captured tables during snapshot phase 
will be consumed later in change log reading phase instead of being merged into 
the snapshot.WARNING: Skipping backfill might lead to data inconsistency 
because some change log events happened within the snapshot phase might be 
replayed (only at-least-once semantic is promised). For example updating an 
already updated value in snapshot, or deleting an already deleted entry in 
snapshot. These replayed change log events should be handled specially.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> IGNORE_NO_PRIMARY_KEY_TABLE =
+            ConfigOptions.key("ignore-no-primary-key-table")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to ignore tables without primary 
key in MySQL.");

Review Comment:
   Please keep in consistence with MySqlSourceOptions.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java:
##########
@@ -85,4 +93,53 @@ void testSplitEvenlySizedChunksNormal() {
                         ChunkRange.of(2147483637, 2147483647),
                         ChunkRange.of(2147483647, null));
     }
+
+    @Test
+    void testIgnoreNoPrimaryKeyTable() throws Exception {
+        // 创建配置,设置ignoreNoPrimaryKeyTable为true
+        MySqlSourceConfig sourceConfig =
+                new MySqlSourceConfigFactory()
+                        .startupOptions(StartupOptions.initial())
+                        .databaseList("test_db")
+                        .tableList("test_db.test_table")
+                        .hostname("localhost")
+                        .username("test")
+                        .password("test")
+                        .serverTimeZone(ZoneId.of("UTC").toString())
+                        .ignoreNoPrimaryKeyTable(true)
+                        .createConfig(0);
+
+        // 创建一个简单的MySqlSchema实现
+        MySqlSchema schema =
+                new MySqlSchema(sourceConfig, true) {
+                    @Override
+                    public TableChanges.TableChange getTableSchema(
+                            MySqlPartition partition, JdbcConnection jdbc, 
TableId tableId) {
+                        // 创建一个没有主键的表
+                        Table noPkTable =
+                                Table.editor()
+                                        .tableId(tableId)
+                                        .addColumn(
+                                                Column.editor()
+                                                        .name("id")
+                                                        .type("BIGINT")
+                                                        .jdbcType(-5)
+                                                        .optional(false)
+                                                        .create())
+                                        .create();
+                        return new TableChanges.TableChange(
+                                TableChanges.TableChangeType.CREATE, 
noPkTable);
+                    }
+                };
+
+        MySqlChunkSplitter splitter = new MySqlChunkSplitter(schema, 
sourceConfig);
+        MySqlPartition partition = new MySqlPartition("mysql_binlog_source");
+
+        // 测试无主键表

Review Comment:
   Avoid using language other than English.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to