lvyanquan commented on code in PR #3791:
URL: https://github.com/apache/flink-cdc/pull/3791#discussion_r1975258209


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml:
##########
@@ -74,6 +74,25 @@ limitations under the License.
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-core</artifactId>
+            <version>${debezium.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <version>3.2.0</version>

Review Comment:
   It's better to make the version as a property.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml:
##########
@@ -101,6 +120,9 @@ limitations under the License.
                                 <relocation>
                                     <pattern>org.apache.kafka</pattern>
                                     
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
+                                    <excludes>
+                                        
<exclude>org/apache/kafka/connect/data/**</exclude>
+                                    </excludes>

Review Comment:
   Could you explain the reason about this?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java:
##########
@@ -138,81 +185,202 @@ public byte[] serialize(Event event) {
         }
 
         DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
-        reuseGenericRowData.setField(
-                3,
-                GenericRowData.of(
-                        
StringData.fromString(dataChangeEvent.tableId().getSchemaName()),
-                        
StringData.fromString(dataChangeEvent.tableId().getTableName())));
+        BiConsumer<DataChangeEvent, GenericRowData> converter;
         try {
             switch (dataChangeEvent.op()) {
                 case INSERT:
-                    reuseGenericRowData.setField(0, null);
-                    reuseGenericRowData.setField(
-                            1,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.after(), false));
-                    reuseGenericRowData.setField(2, OP_INSERT);
-                    return jsonSerializers
-                            .get(dataChangeEvent.tableId())
-                            .getSerializationSchema()
-                            .serialize(reuseGenericRowData);
+                    converter = this::convertInsertEventToRowData;
+                    break;
                 case DELETE:
-                    reuseGenericRowData.setField(
-                            0,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.before(), false));
-                    reuseGenericRowData.setField(1, null);
-                    reuseGenericRowData.setField(2, OP_DELETE);
-                    return jsonSerializers
-                            .get(dataChangeEvent.tableId())
-                            .getSerializationSchema()
-                            .serialize(reuseGenericRowData);
+                    converter = this::convertDeleteEventToRowData;
+                    break;
                 case UPDATE:
                 case REPLACE:
-                    reuseGenericRowData.setField(
-                            0,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.before(), false));
-                    reuseGenericRowData.setField(
-                            1,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.after(), false));
-                    reuseGenericRowData.setField(2, OP_UPDATE);
-                    return jsonSerializers
-                            .get(dataChangeEvent.tableId())
-                            .getSerializationSchema()
-                            .serialize(reuseGenericRowData);
+                    converter = this::convertUpdateEventToRowData;
+                    break;
                 default:
                     throw new UnsupportedOperationException(
                             format(
                                     "Unsupported operation '%s' for 
OperationType.",
                                     dataChangeEvent.op()));
             }
+
+            if (isIncludedDebeziumSchema) {
+                converter.accept(dataChangeEvent, payloadGenericRowData);
+                reuseGenericRowData.setField(
+                        SCHEMA.getPosition(),
+                        
StringData.fromString(schemaMap.get(dataChangeEvent.tableId())));
+            } else {
+                converter.accept(dataChangeEvent, reuseGenericRowData);
+            }
+            return jsonSerializers
+                    .get(dataChangeEvent.tableId())
+                    .getSerializationSchema()
+                    .serialize(reuseGenericRowData);
         } catch (Throwable t) {
             throw new RuntimeException(format("Could not serialize event 
'%s'.", event), t);
         }
     }
 
+    public String convertSchemaToDebeziumSchema(Schema schema) {
+        List<Column> columns = schema.getColumns();
+        SchemaBuilder schemaBuilder = SchemaBuilder.struct();
+        SchemaBuilder beforeBuilder = SchemaBuilder.struct();
+        SchemaBuilder afterBuilder = SchemaBuilder.struct();
+        for (Column column : columns) {
+            String columnName = column.getName();
+            org.apache.flink.cdc.common.types.DataType columnType = 
column.getType();
+            final SchemaBuilder field;
+            switch (columnType.getTypeRoot()) {

Review Comment:
   Great!
   We can extract this to a separate method to make it clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to