lvyanquan commented on code in PR #3791: URL: https://github.com/apache/flink-cdc/pull/3791#discussion_r1975258209
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml: ########## @@ -74,6 +74,25 @@ limitations under the License. <version>${flink.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>io.debezium</groupId> + <artifactId>debezium-core</artifactId> + <version>${debezium.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-api</artifactId> + <version>3.2.0</version> Review Comment: It's better to make the version as a property. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml: ########## @@ -101,6 +120,9 @@ limitations under the License. <relocation> <pattern>org.apache.kafka</pattern> <shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern> + <excludes> + <exclude>org/apache/kafka/connect/data/**</exclude> + </excludes> Review Comment: Could you explain the reason about this? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java: ########## @@ -138,81 +185,202 @@ public byte[] serialize(Event event) { } DataChangeEvent dataChangeEvent = (DataChangeEvent) event; - reuseGenericRowData.setField( - 3, - GenericRowData.of( - StringData.fromString(dataChangeEvent.tableId().getSchemaName()), - StringData.fromString(dataChangeEvent.tableId().getTableName()))); + BiConsumer<DataChangeEvent, GenericRowData> converter; try { switch (dataChangeEvent.op()) { case INSERT: - reuseGenericRowData.setField(0, null); - reuseGenericRowData.setField( - 1, - jsonSerializers - .get(dataChangeEvent.tableId()) - .getRowDataFromRecordData(dataChangeEvent.after(), false)); - reuseGenericRowData.setField(2, OP_INSERT); - return jsonSerializers - .get(dataChangeEvent.tableId()) - .getSerializationSchema() - .serialize(reuseGenericRowData); + converter = this::convertInsertEventToRowData; + break; case DELETE: - reuseGenericRowData.setField( - 0, - jsonSerializers - .get(dataChangeEvent.tableId()) - .getRowDataFromRecordData(dataChangeEvent.before(), false)); - reuseGenericRowData.setField(1, null); - reuseGenericRowData.setField(2, OP_DELETE); - return jsonSerializers - .get(dataChangeEvent.tableId()) - .getSerializationSchema() - .serialize(reuseGenericRowData); + converter = this::convertDeleteEventToRowData; + break; case UPDATE: case REPLACE: - reuseGenericRowData.setField( - 0, - jsonSerializers - .get(dataChangeEvent.tableId()) - .getRowDataFromRecordData(dataChangeEvent.before(), false)); - reuseGenericRowData.setField( - 1, - jsonSerializers - .get(dataChangeEvent.tableId()) - .getRowDataFromRecordData(dataChangeEvent.after(), false)); - reuseGenericRowData.setField(2, OP_UPDATE); - return jsonSerializers - .get(dataChangeEvent.tableId()) - .getSerializationSchema() - .serialize(reuseGenericRowData); + converter = this::convertUpdateEventToRowData; + break; default: throw new UnsupportedOperationException( format( "Unsupported operation '%s' for OperationType.", dataChangeEvent.op())); } + + if (isIncludedDebeziumSchema) { + converter.accept(dataChangeEvent, payloadGenericRowData); + reuseGenericRowData.setField( + SCHEMA.getPosition(), + StringData.fromString(schemaMap.get(dataChangeEvent.tableId()))); + } else { + converter.accept(dataChangeEvent, reuseGenericRowData); + } + return jsonSerializers + .get(dataChangeEvent.tableId()) + .getSerializationSchema() + .serialize(reuseGenericRowData); } catch (Throwable t) { throw new RuntimeException(format("Could not serialize event '%s'.", event), t); } } + public String convertSchemaToDebeziumSchema(Schema schema) { + List<Column> columns = schema.getColumns(); + SchemaBuilder schemaBuilder = SchemaBuilder.struct(); + SchemaBuilder beforeBuilder = SchemaBuilder.struct(); + SchemaBuilder afterBuilder = SchemaBuilder.struct(); + for (Column column : columns) { + String columnName = column.getName(); + org.apache.flink.cdc.common.types.DataType columnType = column.getType(); + final SchemaBuilder field; + switch (columnType.getTypeRoot()) { Review Comment: Great! We can extract this to a separate method to make it clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org