lvyanquan commented on code in PR #3791:
URL: https://github.com/apache/flink-cdc/pull/3791#discussion_r1976809930


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java:
##########
@@ -138,81 +185,202 @@ public byte[] serialize(Event event) {
         }
 
         DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
-        reuseGenericRowData.setField(
-                3,
-                GenericRowData.of(
-                        
StringData.fromString(dataChangeEvent.tableId().getSchemaName()),
-                        
StringData.fromString(dataChangeEvent.tableId().getTableName())));
+        BiConsumer<DataChangeEvent, GenericRowData> converter;
         try {
             switch (dataChangeEvent.op()) {
                 case INSERT:
-                    reuseGenericRowData.setField(0, null);
-                    reuseGenericRowData.setField(
-                            1,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.after(), false));
-                    reuseGenericRowData.setField(2, OP_INSERT);
-                    return jsonSerializers
-                            .get(dataChangeEvent.tableId())
-                            .getSerializationSchema()
-                            .serialize(reuseGenericRowData);
+                    converter = this::convertInsertEventToRowData;
+                    break;
                 case DELETE:
-                    reuseGenericRowData.setField(
-                            0,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.before(), false));
-                    reuseGenericRowData.setField(1, null);
-                    reuseGenericRowData.setField(2, OP_DELETE);
-                    return jsonSerializers
-                            .get(dataChangeEvent.tableId())
-                            .getSerializationSchema()
-                            .serialize(reuseGenericRowData);
+                    converter = this::convertDeleteEventToRowData;
+                    break;
                 case UPDATE:
                 case REPLACE:
-                    reuseGenericRowData.setField(
-                            0,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.before(), false));
-                    reuseGenericRowData.setField(
-                            1,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.after(), false));
-                    reuseGenericRowData.setField(2, OP_UPDATE);
-                    return jsonSerializers
-                            .get(dataChangeEvent.tableId())
-                            .getSerializationSchema()
-                            .serialize(reuseGenericRowData);
+                    converter = this::convertUpdateEventToRowData;
+                    break;
                 default:
                     throw new UnsupportedOperationException(
                             format(
                                     "Unsupported operation '%s' for 
OperationType.",
                                     dataChangeEvent.op()));
             }
+
+            if (isIncludedDebeziumSchema) {
+                converter.accept(dataChangeEvent, payloadGenericRowData);
+                reuseGenericRowData.setField(
+                        SCHEMA.getPosition(),
+                        
StringData.fromString(schemaMap.get(dataChangeEvent.tableId())));
+            } else {
+                converter.accept(dataChangeEvent, reuseGenericRowData);
+            }
+            return jsonSerializers
+                    .get(dataChangeEvent.tableId())
+                    .getSerializationSchema()
+                    .serialize(reuseGenericRowData);
         } catch (Throwable t) {
             throw new RuntimeException(format("Could not serialize event 
'%s'.", event), t);
         }
     }
 
+    public String convertSchemaToDebeziumSchema(Schema schema) {

Review Comment:
   It's better to add a example for the output to help developers understanding.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java:
##########
@@ -138,81 +185,202 @@ public byte[] serialize(Event event) {
         }
 
         DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
-        reuseGenericRowData.setField(
-                3,
-                GenericRowData.of(
-                        
StringData.fromString(dataChangeEvent.tableId().getSchemaName()),
-                        
StringData.fromString(dataChangeEvent.tableId().getTableName())));
+        BiConsumer<DataChangeEvent, GenericRowData> converter;
         try {
             switch (dataChangeEvent.op()) {
                 case INSERT:
-                    reuseGenericRowData.setField(0, null);
-                    reuseGenericRowData.setField(
-                            1,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.after(), false));
-                    reuseGenericRowData.setField(2, OP_INSERT);
-                    return jsonSerializers
-                            .get(dataChangeEvent.tableId())
-                            .getSerializationSchema()
-                            .serialize(reuseGenericRowData);
+                    converter = this::convertInsertEventToRowData;
+                    break;
                 case DELETE:
-                    reuseGenericRowData.setField(
-                            0,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.before(), false));
-                    reuseGenericRowData.setField(1, null);
-                    reuseGenericRowData.setField(2, OP_DELETE);
-                    return jsonSerializers
-                            .get(dataChangeEvent.tableId())
-                            .getSerializationSchema()
-                            .serialize(reuseGenericRowData);
+                    converter = this::convertDeleteEventToRowData;
+                    break;
                 case UPDATE:
                 case REPLACE:
-                    reuseGenericRowData.setField(
-                            0,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.before(), false));
-                    reuseGenericRowData.setField(
-                            1,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.after(), false));
-                    reuseGenericRowData.setField(2, OP_UPDATE);
-                    return jsonSerializers
-                            .get(dataChangeEvent.tableId())
-                            .getSerializationSchema()
-                            .serialize(reuseGenericRowData);
+                    converter = this::convertUpdateEventToRowData;
+                    break;
                 default:
                     throw new UnsupportedOperationException(
                             format(
                                     "Unsupported operation '%s' for 
OperationType.",
                                     dataChangeEvent.op()));
             }
+
+            if (isIncludedDebeziumSchema) {
+                converter.accept(dataChangeEvent, payloadGenericRowData);
+                reuseGenericRowData.setField(
+                        SCHEMA.getPosition(),
+                        
StringData.fromString(schemaMap.get(dataChangeEvent.tableId())));
+            } else {
+                converter.accept(dataChangeEvent, reuseGenericRowData);
+            }
+            return jsonSerializers
+                    .get(dataChangeEvent.tableId())
+                    .getSerializationSchema()
+                    .serialize(reuseGenericRowData);
         } catch (Throwable t) {
             throw new RuntimeException(format("Could not serialize event 
'%s'.", event), t);
         }
     }
 
+    public String convertSchemaToDebeziumSchema(Schema schema) {
+        List<Column> columns = schema.getColumns();
+        SchemaBuilder schemaBuilder = SchemaBuilder.struct();
+        SchemaBuilder beforeBuilder = SchemaBuilder.struct();
+        SchemaBuilder afterBuilder = SchemaBuilder.struct();
+        for (Column column : columns) {
+            String columnName = column.getName();
+            org.apache.flink.cdc.common.types.DataType columnType = 
column.getType();
+            final SchemaBuilder field;
+            switch (columnType.getTypeRoot()) {
+                case TINYINT:
+                case SMALLINT:
+                    field = SchemaBuilder.int16();
+                    break;
+                case INTEGER:
+                    field = SchemaBuilder.int32();
+                    break;
+                case BIGINT:
+                    field = SchemaBuilder.int64();
+                    break;
+                case DECIMAL:
+                    final int decimalPrecision = ((DecimalType) 
columnType).getPrecision();
+                    final int decimalScale = ((DecimalType) 
columnType).getScale();
+                    field =
+                            Decimal.builder(decimalScale)
+                                    .parameter(
+                                            "connect.decimal.precision",
+                                            String.valueOf(decimalPrecision));
+                    break;
+                case BOOLEAN:
+                    field = SchemaBuilder.bool();
+                    break;
+                case FLOAT:
+                case DOUBLE:
+                    field = SchemaBuilder.float64();
+                    break;
+                case DATE:
+                    field = 
SchemaBuilder.int32().name(Date.SCHEMA_NAME).version(1);
+                    break;
+                case TIME_WITHOUT_TIME_ZONE:
+                    field = 
SchemaBuilder.int64().name(MicroTime.SCHEMA_NAME).version(1);
+                    break;
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                case TIMESTAMP_WITH_TIME_ZONE:
+                    field = 
SchemaBuilder.string().name(Timestamp.SCHEMA_NAME).version(1);
+                    break;
+                case BINARY:
+                    field =
+                            SchemaBuilder.bytes()
+                                    .name(Bits.LOGICAL_NAME)
+                                    .parameter(
+                                            Bits.LENGTH_FIELD,
+                                            Integer.toString(
+                                                    
org.apache.flink.cdc.common.types.DataTypes
+                                                            
.getLength(columnType)
+                                                            .orElse(0)))
+                                    .version(1);
+                    break;
+                case CHAR:
+                case VARCHAR:
+                case VARBINARY:
+                default:
+                    field = SchemaBuilder.string();

Review Comment:
   This includes ARRAY/MAP/ROW, There may be some issues if all they were 
converted to `SchemaBuilder.string`.
   You can check if there is a better type, and if not, it is also acceptable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to