ruanhang1993 commented on code in PR #3608:
URL: https://github.com/apache/flink-cdc/pull/3608#discussion_r1914211831


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java:
##########
@@ -366,7 +369,7 @@ public void 
exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx)
                 () -> {
                     Column column = columnDefinitionListener.getColumn();
                     Map<String, DataType> typeMapping = new HashMap<>();
-                    typeMapping.put(column.name(), fromDbzColumn(column));
+                    typeMapping.put(column.name(), fromDbzColumn(column, 
tinyInt1isBit));

Review Comment:
   We should use fromDbzColumn(column,  false) here.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java:
##########
@@ -315,7 +318,7 @@ public void 
exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
                     String newColumnName = parser.parseName(ctx.newColumn);
 
                     Map<String, DataType> typeMapping = new HashMap<>();
-                    typeMapping.put(column.name(), fromDbzColumn(column));
+                    typeMapping.put(column.name(), fromDbzColumn(column, 
tinyInt1isBit));

Review Comment:
   We should use `fromDbzColumn(column,  false)` here. 
   `Alter xxx add column newcol BOOL;` will not be parsed as a tinyint type. We 
should add some tests for bool and tinyint(1) types for alter sql.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java:
##########
@@ -413,7 +416,7 @@ public void exitDropTable(MySqlParser.DropTableContext ctx) 
{
     private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column 
dbzColumn) {
         return org.apache.flink.cdc.common.schema.Column.physicalColumn(
                 dbzColumn.name(),
-                fromDbzColumn(dbzColumn),
+                fromDbzColumn(dbzColumn, tinyInt1isBit),

Review Comment:
   We should use fromDbzColumn(column,  false) here.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java:
##########
@@ -607,6 +652,22 @@ public void testSchemaChangeEvents() throws Exception {
                                     new AddColumnEvent.ColumnWithPosition(
                                             Column.physicalColumn("newcol1", 
DataTypes.INT())))));
 
+            // Add a TINYINT(1) column
+            statement.execute(
+                    String.format(
+                            "ALTER TABLE `%s`.`customers` ADD COLUMN 
`new_tinyint1_col1` TINYINT(1) NULL;",
+                            inventoryDatabase.getDatabaseName()));
+            expected.add(
+                    new AddColumnEvent(
+                            
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+                            Collections.singletonList(
+                                    new AddColumnEvent.ColumnWithPosition(
+                                            Column.physicalColumn(
+                                                    "new_tinyint1_col1",
+                                                    tinyInt1isBit
+                                                            ? 
DataTypes.BOOLEAN()
+                                                            : 
DataTypes.TINYINT())))));
+

Review Comment:
   Add a BOOL column in the relative tests.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java:
##########
@@ -129,14 +130,19 @@ public static Schema getTableSchema(
                 new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) {
             TableChanges.TableChange tableSchema =
                     mySqlSchema.getTableSchema(partition, jdbc, 
toDbzTableId(tableId));
-            return toSchema(tableSchema.getTable());
+            boolean tinyInt1isBit =
+                    Boolean.parseBoolean(
+                            sourceConfig

Review Comment:
   Add a method `getTinyInt1isBit` in MySqlSourceConfig to get the 
tinyInt1isBit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to