This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 1436aa4ba [INLONG-6335][Sort] Support the blob and binary data for MySql all migrate (#6336) 1436aa4ba is described below commit 1436aa4badf80a4ac98d492d368e11d28159fa19 Author: Schnapps <zpen...@connect.ust.hk> AuthorDate: Mon Oct 31 17:10:29 2022 +0800 [INLONG-6335][Sort] Support the blob and binary data for MySql all migrate (#6336) --- .../sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java index 204199598..29c39288b 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java @@ -51,9 +51,9 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; import java.math.BigDecimal; -import java.nio.ByteBuffer; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -598,6 +598,11 @@ public final class RowDataDebeziumDeserializeSchema if (schemaName != null) { fieldValue = getValueWithSchema(fieldValue, schemaName); } + if (fieldValue instanceof ByteBuffer) { + // binary data (blob or varbinary in mysql) are stored in bytebuffer + // use utf-8 to decode as a string by default + fieldValue = new String(((ByteBuffer) fieldValue).array()); + } data.put(fieldName, fieldValue); }