lvyanquan commented on code in PR #3791: URL: https://github.com/apache/flink-cdc/pull/3791#discussion_r1976808787
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java: ########## @@ -138,81 +185,202 @@ public byte[] serialize(Event event) { } DataChangeEvent dataChangeEvent = (DataChangeEvent) event; - reuseGenericRowData.setField( - 3, - GenericRowData.of( - StringData.fromString(dataChangeEvent.tableId().getSchemaName()), - StringData.fromString(dataChangeEvent.tableId().getTableName()))); + BiConsumer<DataChangeEvent, GenericRowData> converter; try { switch (dataChangeEvent.op()) { case INSERT: - reuseGenericRowData.setField(0, null); - reuseGenericRowData.setField( - 1, - jsonSerializers - .get(dataChangeEvent.tableId()) - .getRowDataFromRecordData(dataChangeEvent.after(), false)); - reuseGenericRowData.setField(2, OP_INSERT); - return jsonSerializers - .get(dataChangeEvent.tableId()) - .getSerializationSchema() - .serialize(reuseGenericRowData); + converter = this::convertInsertEventToRowData; + break; case DELETE: - reuseGenericRowData.setField( - 0, - jsonSerializers - .get(dataChangeEvent.tableId()) - .getRowDataFromRecordData(dataChangeEvent.before(), false)); - reuseGenericRowData.setField(1, null); - reuseGenericRowData.setField(2, OP_DELETE); - return jsonSerializers - .get(dataChangeEvent.tableId()) - .getSerializationSchema() - .serialize(reuseGenericRowData); + converter = this::convertDeleteEventToRowData; + break; case UPDATE: case REPLACE: - reuseGenericRowData.setField( - 0, - jsonSerializers - .get(dataChangeEvent.tableId()) - .getRowDataFromRecordData(dataChangeEvent.before(), false)); - reuseGenericRowData.setField( - 1, - jsonSerializers - .get(dataChangeEvent.tableId()) - .getRowDataFromRecordData(dataChangeEvent.after(), false)); - reuseGenericRowData.setField(2, OP_UPDATE); - return jsonSerializers - .get(dataChangeEvent.tableId()) - .getSerializationSchema() - .serialize(reuseGenericRowData); + converter = this::convertUpdateEventToRowData; + break; default: throw new UnsupportedOperationException( format( "Unsupported operation '%s' for OperationType.", dataChangeEvent.op())); } + + if (isIncludedDebeziumSchema) { + converter.accept(dataChangeEvent, payloadGenericRowData); + reuseGenericRowData.setField( + SCHEMA.getPosition(), + StringData.fromString(schemaMap.get(dataChangeEvent.tableId()))); + } else { + converter.accept(dataChangeEvent, reuseGenericRowData); + } + return jsonSerializers + .get(dataChangeEvent.tableId()) + .getSerializationSchema() + .serialize(reuseGenericRowData); } catch (Throwable t) { throw new RuntimeException(format("Could not serialize event '%s'.", event), t); } } + public String convertSchemaToDebeziumSchema(Schema schema) { + List<Column> columns = schema.getColumns(); + SchemaBuilder schemaBuilder = SchemaBuilder.struct(); + SchemaBuilder beforeBuilder = SchemaBuilder.struct(); + SchemaBuilder afterBuilder = SchemaBuilder.struct(); + for (Column column : columns) { + String columnName = column.getName(); + org.apache.flink.cdc.common.types.DataType columnType = column.getType(); + final SchemaBuilder field; + switch (columnType.getTypeRoot()) { + case TINYINT: + case SMALLINT: + field = SchemaBuilder.int16(); + break; + case INTEGER: + field = SchemaBuilder.int32(); + break; + case BIGINT: + field = SchemaBuilder.int64(); + break; + case DECIMAL: + final int decimalPrecision = ((DecimalType) columnType).getPrecision(); + final int decimalScale = ((DecimalType) columnType).getScale(); + field = + Decimal.builder(decimalScale) + .parameter( + "connect.decimal.precision", + String.valueOf(decimalPrecision)); + break; + case BOOLEAN: + field = SchemaBuilder.bool(); + break; + case FLOAT: + case DOUBLE: + field = SchemaBuilder.float64(); + break; + case DATE: + field = SchemaBuilder.int32().name(Date.SCHEMA_NAME).version(1); + break; + case TIME_WITHOUT_TIME_ZONE: + field = SchemaBuilder.int64().name(MicroTime.SCHEMA_NAME).version(1); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + field = SchemaBuilder.string().name(Timestamp.SCHEMA_NAME).version(1); + break; + case BINARY: + field = + SchemaBuilder.bytes() + .name(Bits.LOGICAL_NAME) + .parameter( + Bits.LENGTH_FIELD, + Integer.toString( + org.apache.flink.cdc.common.types.DataTypes + .getLength(columnType) + .orElse(0))) + .version(1); + break; + case CHAR: + case VARCHAR: + case VARBINARY: + default: + field = SchemaBuilder.string(); + } + if (columnType.isNullable()) { + field.optional(); + } else { + field.required(); + } + if (column.getDefaultValueExpression() != null) { + field.defaultValue(column.getDefaultValueExpression()); + } Review Comment: We can also add column comment to the field too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org