This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new e43e59f [feature]Implement DorisAvroConverter to support parsing
schema from avro avsc file path (#32)
e43e59f is described below
commit e43e59f5d42eda6a3844a9b8825a9971bcefe4ad
Author: wudongliang <[email protected]>
AuthorDate: Wed Jun 26 16:49:26 2024 +0800
[feature]Implement DorisAvroConverter to support parsing schema from avro
avsc file path (#32)
---
.github/workflows/license-eyes.yml | 4 +
.../workflows/license-eyes.yml => .licenserc.yaml | 31 ++--
.../kafka/connector/decode/DorisConverter.java | 40 +++++
.../kafka/connector/decode/DorisJsonSchema.java | 90 ++++++++++
.../connector/decode/avro/DorisAvroConverter.java | 194 +++++++++++++++++++++
.../connector/exception/DataDecodeException.java | 35 ++++
.../decode/avro/DorisAvroConverterTest.java | 96 ++++++++++
src/test/resources/decode/avro/product.avsc | 18 ++
src/test/resources/decode/avro/user.avsc | 18 ++
9 files changed, 507 insertions(+), 19 deletions(-)
diff --git a/.github/workflows/license-eyes.yml
b/.github/workflows/license-eyes.yml
index 02d108a..ba0c303 100644
--- a/.github/workflows/license-eyes.yml
+++ b/.github/workflows/license-eyes.yml
@@ -23,6 +23,7 @@ on:
push:
branches:
- master
+
jobs:
license-check:
name: "License Check"
@@ -30,7 +31,10 @@ jobs:
steps:
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v2
+
- name: Check License
uses: apache/[email protected]
+ with:
+ config-path: ./.licenserc.yaml
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
diff --git a/.github/workflows/license-eyes.yml b/.licenserc.yaml
similarity index 66%
copy from .github/workflows/license-eyes.yml
copy to .licenserc.yaml
index 02d108a..ca50638 100644
--- a/.github/workflows/license-eyes.yml
+++ b/.licenserc.yaml
@@ -15,22 +15,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-#
----
-name: License Check
-on:
- pull_request:
- push:
- branches:
- - master
-jobs:
- license-check:
- name: "License Check"
- runs-on: ubuntu-latest
- steps:
- - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
- uses: actions/checkout@v2
- - name: Check License
- uses: apache/[email protected]
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+
+header:
+ license:
+ spdx-id: Apache-2.0
+ copyright-owner: Apache Software Foundation
+
+ paths-ignore:
+ - 'LICENSE'
+ - '.gitignore'
+ - 'src/test/resources/decode/avro/**'
+
+ comment: on-failure
\ No newline at end of file
diff --git
a/src/main/java/org/apache/doris/kafka/connector/decode/DorisConverter.java
b/src/main/java/org/apache/doris/kafka/connector/decode/DorisConverter.java
new file mode 100644
index 0000000..6b0e960
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/decode/DorisConverter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.decode;
+
+import java.util.Map;
+import org.apache.doris.kafka.connector.exception.DataDecodeException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.storage.Converter;
+
+public abstract class DorisConverter implements Converter {
+
+ /** unused */
+ @Override
+ public void configure(final Map<String, ?> map, final boolean b) {
+ // not necessary
+ }
+
+ /** doesn't support data source connector */
+ @Override
+ public byte[] fromConnectData(String topic, Schema schema, Object value) {
+ throw new DataDecodeException("DorisConverter doesn't support data
source connector yet.");
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/decode/DorisJsonSchema.java
b/src/main/java/org/apache/doris/kafka/connector/decode/DorisJsonSchema.java
new file mode 100644
index 0000000..55f7fad
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/decode/DorisJsonSchema.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.decode;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+
+public class DorisJsonSchema implements Schema {
+ static String NAME = "DORIS_JSON_SCHEMA";
+ static int VERSION = 1;
+
+ @Override
+ public Schema.Type type() {
+ return Type.STRUCT;
+ }
+
+ @Override
+ public boolean isOptional() {
+ return false;
+ }
+
+ @Override
+ public Object defaultValue() {
+ return null;
+ }
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public Integer version() {
+ return VERSION;
+ }
+
+ @Override
+ public String doc() {
+ return null;
+ }
+
+ @Override
+ public Map<String, String> parameters() {
+ return null;
+ }
+
+ @Override
+ public Schema keySchema() {
+ return null;
+ }
+
+ @Override
+ public Schema valueSchema() {
+ return null;
+ }
+
+ @Override
+ public List<Field> fields() {
+ return null;
+ }
+
+ @Override
+ public Field field(final String s) {
+ return null;
+ }
+
+ @Override
+ public Schema schema() {
+ return null;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverter.java
b/src/main/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverter.java
new file mode 100644
index 0000000..56d8090
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.decode.avro;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.doris.kafka.connector.decode.DorisConverter;
+import org.apache.doris.kafka.connector.decode.DorisJsonSchema;
+import org.apache.doris.kafka.connector.exception.DataDecodeException;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Supports specifying the avsc file of avro, can directly obtain the avro
schema by parsing the
+ * file.
+ */
+public class DorisAvroConverter extends DorisConverter {
+ public static final String AVRO_TOPIC_SCHEMA_FILEPATH =
"avro.topic2schema.filepath";
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisAvroConverter.class);
+ private final Map<String, Schema> topic2SchemaMap = new HashMap<>();
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean isKey) {
+ parseTopic2Schema(configs);
+ }
+
+ @VisibleForTesting
+ public void parseTopic2Schema(final Map<String, ?> configs) {
+ Object avroSchemaPath = configs.get(AVRO_TOPIC_SCHEMA_FILEPATH);
+
+ if (avroSchemaPath == null) {
+ LOG.error(AVRO_TOPIC_SCHEMA_FILEPATH + " can not be empty in
DorisAvroConverter.class");
+ throw new DataDecodeException(
+ AVRO_TOPIC_SCHEMA_FILEPATH + " can not be empty in
DorisAvroConverter.class");
+ }
+
+ if (avroSchemaPath instanceof String) {
+ Map<String, String> topic2SchemaFileMap =
parseTopicSchemaPath((String) avroSchemaPath);
+ for (Map.Entry<String, String> entry :
topic2SchemaFileMap.entrySet()) {
+ String topic = entry.getKey();
+ String schemaPath = entry.getValue();
+ Schema schema;
+ try {
+ schema = new Schema.Parser().parse(new File(schemaPath));
+ } catch (SchemaParseException | IOException e) {
+ LOG.error(
+ "the provided for "
+ + AVRO_TOPIC_SCHEMA_FILEPATH
+ + " is no valid, failed to parse {} {}",
+ topic,
+ schemaPath,
+ e);
+ throw new DataDecodeException(
+ "the provided for "
+ + AVRO_TOPIC_SCHEMA_FILEPATH
+ + " is no valid, failed to parse "
+ + topic
+ + " "
+ + schemaPath
+ + ".\n",
+ e);
+ }
+ topic2SchemaMap.put(topic, schema);
+ }
+ } else {
+ LOG.error(AVRO_TOPIC_SCHEMA_FILEPATH + " must be a string.");
+ throw new DataDecodeException(
+ "The "
+ + AVRO_TOPIC_SCHEMA_FILEPATH
+ + " is provided, but can not be parsed as an Avro
schema.");
+ }
+ }
+
+ /**
+ * Parse the mapping between topic and schema file paths.
+ *
+ * @param topicSchemaPath
topic1:file:///schema_test.avsc,topic2:file:///schema_test2.avsc
+ */
+ private Map<String, String> parseTopicSchemaPath(String topicSchemaPath) {
+ Map<String, String> topic2SchemaPathMap = new HashMap<>();
+ boolean isInvalid = false;
+ for (String s : topicSchemaPath.split(",")) {
+ String[] split = s.split(":file://");
+ if (split.length != 2 || split[0].trim().isEmpty() ||
split[1].trim().isEmpty()) {
+ LOG.error(
+ "Invalid {} config format: {}",
+ AVRO_TOPIC_SCHEMA_FILEPATH,
+ topicSchemaPath);
+ isInvalid = true;
+ }
+
+ String topic = split[0].trim();
+ String schemaPath = split[1].trim();
+
+ if (topic2SchemaPathMap.containsKey(topic)) {
+ LOG.error("topic name {} is duplicated.", topic);
+ isInvalid = true;
+ }
+ topic2SchemaPathMap.put(topic, schemaPath);
+ }
+ if (isInvalid) {
+ throw new DataDecodeException("Failed to parse " +
AVRO_TOPIC_SCHEMA_FILEPATH + " map");
+ }
+ return topic2SchemaPathMap;
+ }
+
+ @Override
+ public SchemaAndValue toConnectData(String topic, byte[] value) {
+ if (value == null) {
+ LOG.warn("cast bytes is null");
+ return new SchemaAndValue(new DorisJsonSchema(), null);
+ }
+
+ if (topic2SchemaMap.containsKey(topic)) {
+ Schema schema = topic2SchemaMap.get(topic);
+ ByteBuffer buffer = ByteBuffer.wrap(value);
+ int length = buffer.limit();
+ byte[] data = new byte[length];
+ buffer.get(data, 0, length);
+ try {
+ return new SchemaAndValue(
+ new DorisJsonSchema(), parseAvroWithSchema(data,
schema, schema));
+ } catch (IOException e) {
+ LOG.error("failed to parse AVRO record\n" + e);
+ throw new DataDecodeException("failed to parse AVRO record\n",
e);
+ }
+ } else {
+ LOG.error("The avro schema file of {} is not provided.", topic);
+ throw new DataDecodeException("The avro schema file of " + topic +
" is not provided.");
+ }
+ }
+
+ /**
+ * Parse Avro record with a writer schema and a reader schema. The writer
and the reader schema
+ * have to be compatible as described in
+ * https://avro.apache.org/docs/1.9.2/spec.html#Schema+Resolution
+ *
+ * @param data avro data
+ * @param writerSchema avro schema with which data got serialized
+ * @param readerSchema avro schema with which will to be read and returned
+ */
+ private String parseAvroWithSchema(final byte[] data, Schema writerSchema,
Schema readerSchema)
+ throws IOException {
+ final GenericData genericData = new GenericData();
+ // Conversion for logical type Decimal.
+ // There are conversions for other logical types as well.
+ genericData.addLogicalTypeConversion(new
Conversions.DecimalConversion());
+
+ InputStream is = new ByteArrayInputStream(data);
+ Decoder decoder = DecoderFactory.get().binaryDecoder(is, null);
+ DatumReader<GenericRecord> reader =
+ new GenericDatumReader<>(writerSchema, readerSchema,
genericData);
+ GenericRecord datum = reader.read(null, decoder);
+ return datum.toString();
+ }
+
+ @VisibleForTesting
+ public Map<String, Schema> getTopic2SchemaMap() {
+ return topic2SchemaMap;
+ }
+}
diff --git
a/src/main/java/org/apache/doris/kafka/connector/exception/DataDecodeException.java
b/src/main/java/org/apache/doris/kafka/connector/exception/DataDecodeException.java
new file mode 100644
index 0000000..1ae1f11
--- /dev/null
+++
b/src/main/java/org/apache/doris/kafka/connector/exception/DataDecodeException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.exception;
+
+public class DataDecodeException extends DorisException {
+
+ public DataDecodeException(String message) {
+ super(message);
+ }
+
+ public DataDecodeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DataDecodeException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/src/test/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverterTest.java
b/src/test/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverterTest.java
new file mode 100644
index 0000000..421490c
--- /dev/null
+++
b/src/test/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverterTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.decode.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DorisAvroConverterTest {
+ private static final String USER_TOPIC = "user-topic";
+ private static final String PRODUCT_TOPIC = "product-topic";
+ private static final String USER_AVRO_PATH =
"src/test/resources/decode/avro/user.avsc";
+ private static final String PRODUCT_AVRO_PATH =
"src/test/resources/decode/avro/product.avsc";
+ private final DorisAvroConverter avroConverter = new DorisAvroConverter();
+ private final Map<String, String> configs = new HashMap<>();
+
+ @Before
+ public void init() {
+ String topic2SchemaPath =
+ USER_TOPIC
+ + ":file://"
+ + USER_AVRO_PATH
+ + ", "
+ + PRODUCT_TOPIC
+ + ":file://"
+ + PRODUCT_AVRO_PATH;
+ configs.put(DorisAvroConverter.AVRO_TOPIC_SCHEMA_FILEPATH,
topic2SchemaPath);
+ }
+
+ @Test
+ public void testParseTopicSchema() throws IOException {
+ avroConverter.parseTopic2Schema(configs);
+ Map<String, Schema> topic2SchemaMap =
avroConverter.getTopic2SchemaMap();
+
+ Assert.assertTrue(topic2SchemaMap.containsKey(USER_TOPIC));
+ Assert.assertTrue(topic2SchemaMap.containsKey(PRODUCT_TOPIC));
+
+ Schema productSchema = new Schema.Parser().parse(new
File(PRODUCT_AVRO_PATH));
+ Schema userSchema = new Schema.Parser().parse(new
File(USER_AVRO_PATH));
+ Assert.assertEquals(topic2SchemaMap.get(USER_TOPIC), userSchema);
+ Assert.assertEquals(topic2SchemaMap.get(PRODUCT_TOPIC), productSchema);
+ }
+
+ @Test
+ public void testConvert() throws IOException {
+ Schema userSchema = new Schema.Parser().parse(new
File(USER_AVRO_PATH));
+ byte[] userAvroData = generateUserRecord(userSchema);
+ avroConverter.parseTopic2Schema(configs);
+ SchemaAndValue schemaAndValue =
avroConverter.toConnectData("user-topic", userAvroData);
+ Object o = schemaAndValue.value();
+
+ Assert.assertEquals(o, "{\"id\": 1, \"name\": \"test\", \"age\": 18}");
+ }
+
+ private byte[] generateUserRecord(Schema userSchema) throws IOException {
+ GenericData.Record userRecord = new GenericData.Record(userSchema);
+ userRecord.put("id", 1);
+ userRecord.put("name", "test");
+ userRecord.put("age", 18);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BinaryEncoder encoder =
EncoderFactory.get().binaryEncoder(outputStream, null);
+ DatumWriter<GenericRecord> writer = new
SpecificDatumWriter<GenericRecord>(userSchema);
+ writer.write(userRecord, encoder);
+ encoder.flush();
+ return outputStream.toByteArray();
+ }
+}
diff --git a/src/test/resources/decode/avro/product.avsc
b/src/test/resources/decode/avro/product.avsc
new file mode 100644
index 0000000..677bc78
--- /dev/null
+++ b/src/test/resources/decode/avro/product.avsc
@@ -0,0 +1,18 @@
+{
+ "type": "record",
+ "name": "Product",
+ "fields": [
+ {
+ "name": "id",
+ "type": "int"
+ },
+ {
+ "name": "name",
+ "type": "string"
+ },
+ {
+ "name": "price",
+ "type": "double"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/src/test/resources/decode/avro/user.avsc
b/src/test/resources/decode/avro/user.avsc
new file mode 100644
index 0000000..d493c84
--- /dev/null
+++ b/src/test/resources/decode/avro/user.avsc
@@ -0,0 +1,18 @@
+{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {
+ "name": "id",
+ "type": "int"
+ },
+ {
+ "name": "name",
+ "type": "string"
+ },
+ {
+ "name": "age",
+ "type": "int"
+ }
+ ]
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]