ruanhang1993 commented on code in PR #3608: URL: https://github.com/apache/flink-cdc/pull/3608#discussion_r1914211831
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java: ########## @@ -366,7 +369,7 @@ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) () -> { Column column = columnDefinitionListener.getColumn(); Map<String, DataType> typeMapping = new HashMap<>(); - typeMapping.put(column.name(), fromDbzColumn(column)); + typeMapping.put(column.name(), fromDbzColumn(column, tinyInt1isBit)); Review Comment: We should use fromDbzColumn(column, false) here. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java: ########## @@ -315,7 +318,7 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) String newColumnName = parser.parseName(ctx.newColumn); Map<String, DataType> typeMapping = new HashMap<>(); - typeMapping.put(column.name(), fromDbzColumn(column)); + typeMapping.put(column.name(), fromDbzColumn(column, tinyInt1isBit)); Review Comment: We should use `fromDbzColumn(column, false)` here. `Alter xxx add column newcol BOOL;` will not be parsed as a tinyint type. We should add some tests for bool and tinyint(1) types for alter sql. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java: ########## @@ -413,7 +416,7 @@ public void exitDropTable(MySqlParser.DropTableContext ctx) { private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) { return org.apache.flink.cdc.common.schema.Column.physicalColumn( dbzColumn.name(), - fromDbzColumn(dbzColumn), + fromDbzColumn(dbzColumn, tinyInt1isBit), Review Comment: We should use fromDbzColumn(column, false) here. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java: ########## @@ -607,6 +652,22 @@ public void testSchemaChangeEvents() throws Exception { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("newcol1", DataTypes.INT()))))); + // Add a TINYINT(1) column + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` ADD COLUMN `new_tinyint1_col1` TINYINT(1) NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "new_tinyint1_col1", + tinyInt1isBit + ? DataTypes.BOOLEAN() + : DataTypes.TINYINT()))))); + Review Comment: Add a BOOL column in the relative tests. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java: ########## @@ -129,14 +130,19 @@ public static Schema getTableSchema( new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) { TableChanges.TableChange tableSchema = mySqlSchema.getTableSchema(partition, jdbc, toDbzTableId(tableId)); - return toSchema(tableSchema.getTable()); + boolean tinyInt1isBit = + Boolean.parseBoolean( + sourceConfig Review Comment: Add a method `getTinyInt1isBit` in MySqlSourceConfig to get the tinyInt1isBit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org