This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new e43e59f  [feature]Implement DorisAvroConverter to support parsing 
schema from avro avsc file path (#32)
e43e59f is described below

commit e43e59f5d42eda6a3844a9b8825a9971bcefe4ad
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Wed Jun 26 16:49:26 2024 +0800

    [feature]Implement DorisAvroConverter to support parsing schema from avro 
avsc file path (#32)
---
 .github/workflows/license-eyes.yml                 |   4 +
 .../workflows/license-eyes.yml => .licenserc.yaml  |  31 ++--
 .../kafka/connector/decode/DorisConverter.java     |  40 +++++
 .../kafka/connector/decode/DorisJsonSchema.java    |  90 ++++++++++
 .../connector/decode/avro/DorisAvroConverter.java  | 194 +++++++++++++++++++++
 .../connector/exception/DataDecodeException.java   |  35 ++++
 .../decode/avro/DorisAvroConverterTest.java        |  96 ++++++++++
 src/test/resources/decode/avro/product.avsc        |  18 ++
 src/test/resources/decode/avro/user.avsc           |  18 ++
 9 files changed, 507 insertions(+), 19 deletions(-)

diff --git a/.github/workflows/license-eyes.yml 
b/.github/workflows/license-eyes.yml
index 02d108a..ba0c303 100644
--- a/.github/workflows/license-eyes.yml
+++ b/.github/workflows/license-eyes.yml
@@ -23,6 +23,7 @@ on:
   push:
     branches:
       - master
+
 jobs:
   license-check:
     name: "License Check"
@@ -30,7 +31,10 @@ jobs:
     steps:
       - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
         uses: actions/checkout@v2
+
       - name: Check License
         uses: apache/skywalking-eyes@v0.2.0
+        with:
+          config-path: ./.licenserc.yaml
         env:
           GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
diff --git a/.github/workflows/license-eyes.yml b/.licenserc.yaml
similarity index 66%
copy from .github/workflows/license-eyes.yml
copy to .licenserc.yaml
index 02d108a..ca50638 100644
--- a/.github/workflows/license-eyes.yml
+++ b/.licenserc.yaml
@@ -15,22 +15,15 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
----
-name: License Check
-on:
-  pull_request:
-  push:
-    branches:
-      - master
-jobs:
-  license-check:
-    name: "License Check"
-    runs-on: ubuntu-latest
-    steps:
-      - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
-        uses: actions/checkout@v2
-      - name: Check License
-        uses: apache/skywalking-eyes@v0.2.0
-        env:
-          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+
+header:
+  license:
+    spdx-id: Apache-2.0
+    copyright-owner: Apache Software Foundation
+
+  paths-ignore:
+    - 'LICENSE'
+    - '.gitignore'
+    - 'src/test/resources/decode/avro/**'
+
+  comment: on-failure
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/decode/DorisConverter.java 
b/src/main/java/org/apache/doris/kafka/connector/decode/DorisConverter.java
new file mode 100644
index 0000000..6b0e960
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/decode/DorisConverter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.decode;
+
+import java.util.Map;
+import org.apache.doris.kafka.connector.exception.DataDecodeException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.storage.Converter;
+
+public abstract class DorisConverter implements Converter {
+
+    /** unused */
+    @Override
+    public void configure(final Map<String, ?> map, final boolean b) {
+        // not necessary
+    }
+
+    /** doesn't support data source connector */
+    @Override
+    public byte[] fromConnectData(String topic, Schema schema, Object value) {
+        throw new DataDecodeException("DorisConverter doesn't support data 
source connector yet.");
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/decode/DorisJsonSchema.java 
b/src/main/java/org/apache/doris/kafka/connector/decode/DorisJsonSchema.java
new file mode 100644
index 0000000..55f7fad
--- /dev/null
+++ b/src/main/java/org/apache/doris/kafka/connector/decode/DorisJsonSchema.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.decode;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+
+public class DorisJsonSchema implements Schema {
+    static String NAME = "DORIS_JSON_SCHEMA";
+    static int VERSION = 1;
+
+    @Override
+    public Schema.Type type() {
+        return Type.STRUCT;
+    }
+
+    @Override
+    public boolean isOptional() {
+        return false;
+    }
+
+    @Override
+    public Object defaultValue() {
+        return null;
+    }
+
+    @Override
+    public String name() {
+        return NAME;
+    }
+
+    @Override
+    public Integer version() {
+        return VERSION;
+    }
+
+    @Override
+    public String doc() {
+        return null;
+    }
+
+    @Override
+    public Map<String, String> parameters() {
+        return null;
+    }
+
+    @Override
+    public Schema keySchema() {
+        return null;
+    }
+
+    @Override
+    public Schema valueSchema() {
+        return null;
+    }
+
+    @Override
+    public List<Field> fields() {
+        return null;
+    }
+
+    @Override
+    public Field field(final String s) {
+        return null;
+    }
+
+    @Override
+    public Schema schema() {
+        return null;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverter.java
 
b/src/main/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverter.java
new file mode 100644
index 0000000..56d8090
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.decode.avro;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.doris.kafka.connector.decode.DorisConverter;
+import org.apache.doris.kafka.connector.decode.DorisJsonSchema;
+import org.apache.doris.kafka.connector.exception.DataDecodeException;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Supports specifying the avsc file of avro, can directly obtain the avro 
schema by parsing the
+ * file.
+ */
+public class DorisAvroConverter extends DorisConverter {
+    public static final String AVRO_TOPIC_SCHEMA_FILEPATH = 
"avro.topic2schema.filepath";
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisAvroConverter.class);
+    private final Map<String, Schema> topic2SchemaMap = new HashMap<>();
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        parseTopic2Schema(configs);
+    }
+
+    @VisibleForTesting
+    public void parseTopic2Schema(final Map<String, ?> configs) {
+        Object avroSchemaPath = configs.get(AVRO_TOPIC_SCHEMA_FILEPATH);
+
+        if (avroSchemaPath == null) {
+            LOG.error(AVRO_TOPIC_SCHEMA_FILEPATH + " can not be empty in 
DorisAvroConverter.class");
+            throw new DataDecodeException(
+                    AVRO_TOPIC_SCHEMA_FILEPATH + " can not be empty in 
DorisAvroConverter.class");
+        }
+
+        if (avroSchemaPath instanceof String) {
+            Map<String, String> topic2SchemaFileMap = 
parseTopicSchemaPath((String) avroSchemaPath);
+            for (Map.Entry<String, String> entry : 
topic2SchemaFileMap.entrySet()) {
+                String topic = entry.getKey();
+                String schemaPath = entry.getValue();
+                Schema schema;
+                try {
+                    schema = new Schema.Parser().parse(new File(schemaPath));
+                } catch (SchemaParseException | IOException e) {
+                    LOG.error(
+                            "the provided for "
+                                    + AVRO_TOPIC_SCHEMA_FILEPATH
+                                    + " is no valid, failed to parse {} {}",
+                            topic,
+                            schemaPath,
+                            e);
+                    throw new DataDecodeException(
+                            "the provided for "
+                                    + AVRO_TOPIC_SCHEMA_FILEPATH
+                                    + " is no valid, failed to parse "
+                                    + topic
+                                    + " "
+                                    + schemaPath
+                                    + ".\n",
+                            e);
+                }
+                topic2SchemaMap.put(topic, schema);
+            }
+        } else {
+            LOG.error(AVRO_TOPIC_SCHEMA_FILEPATH + " must be a string.");
+            throw new DataDecodeException(
+                    "The "
+                            + AVRO_TOPIC_SCHEMA_FILEPATH
+                            + " is provided, but can not be parsed as an Avro 
schema.");
+        }
+    }
+
+    /**
+     * Parse the mapping between topic and schema file paths.
+     *
+     * @param topicSchemaPath 
topic1:file:///schema_test.avsc,topic2:file:///schema_test2.avsc
+     */
+    private Map<String, String> parseTopicSchemaPath(String topicSchemaPath) {
+        Map<String, String> topic2SchemaPathMap = new HashMap<>();
+        boolean isInvalid = false;
+        for (String s : topicSchemaPath.split(",")) {
+            String[] split = s.split(":file://");
+            if (split.length != 2 || split[0].trim().isEmpty() || 
split[1].trim().isEmpty()) {
+                LOG.error(
+                        "Invalid {} config format: {}",
+                        AVRO_TOPIC_SCHEMA_FILEPATH,
+                        topicSchemaPath);
+                isInvalid = true;
+            }
+
+            String topic = split[0].trim();
+            String schemaPath = split[1].trim();
+
+            if (topic2SchemaPathMap.containsKey(topic)) {
+                LOG.error("topic name {} is duplicated.", topic);
+                isInvalid = true;
+            }
+            topic2SchemaPathMap.put(topic, schemaPath);
+        }
+        if (isInvalid) {
+            throw new DataDecodeException("Failed to parse " + 
AVRO_TOPIC_SCHEMA_FILEPATH + " map");
+        }
+        return topic2SchemaPathMap;
+    }
+
+    @Override
+    public SchemaAndValue toConnectData(String topic, byte[] value) {
+        if (value == null) {
+            LOG.warn("cast bytes is null");
+            return new SchemaAndValue(new DorisJsonSchema(), null);
+        }
+
+        if (topic2SchemaMap.containsKey(topic)) {
+            Schema schema = topic2SchemaMap.get(topic);
+            ByteBuffer buffer = ByteBuffer.wrap(value);
+            int length = buffer.limit();
+            byte[] data = new byte[length];
+            buffer.get(data, 0, length);
+            try {
+                return new SchemaAndValue(
+                        new DorisJsonSchema(), parseAvroWithSchema(data, 
schema, schema));
+            } catch (IOException e) {
+                LOG.error("failed to parse AVRO record\n" + e);
+                throw new DataDecodeException("failed to parse AVRO record\n", 
e);
+            }
+        } else {
+            LOG.error("The avro schema file of {} is not provided.", topic);
+            throw new DataDecodeException("The avro schema file of " + topic + 
" is not provided.");
+        }
+    }
+
+    /**
+     * Parse Avro record with a writer schema and a reader schema. The writer 
and the reader schema
+     * have to be compatible as described in
+     * https://avro.apache.org/docs/1.9.2/spec.html#Schema+Resolution
+     *
+     * @param data avro data
+     * @param writerSchema avro schema with which data got serialized
+     * @param readerSchema avro schema with which will to be read and returned
+     */
+    private String parseAvroWithSchema(final byte[] data, Schema writerSchema, 
Schema readerSchema)
+            throws IOException {
+        final GenericData genericData = new GenericData();
+        // Conversion for logical type Decimal.
+        // There are conversions for other logical types as well.
+        genericData.addLogicalTypeConversion(new 
Conversions.DecimalConversion());
+
+        InputStream is = new ByteArrayInputStream(data);
+        Decoder decoder = DecoderFactory.get().binaryDecoder(is, null);
+        DatumReader<GenericRecord> reader =
+                new GenericDatumReader<>(writerSchema, readerSchema, 
genericData);
+        GenericRecord datum = reader.read(null, decoder);
+        return datum.toString();
+    }
+
+    @VisibleForTesting
+    public Map<String, Schema> getTopic2SchemaMap() {
+        return topic2SchemaMap;
+    }
+}
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/exception/DataDecodeException.java
 
b/src/main/java/org/apache/doris/kafka/connector/exception/DataDecodeException.java
new file mode 100644
index 0000000..1ae1f11
--- /dev/null
+++ 
b/src/main/java/org/apache/doris/kafka/connector/exception/DataDecodeException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.exception;
+
+public class DataDecodeException extends DorisException {
+
+    public DataDecodeException(String message) {
+        super(message);
+    }
+
+    public DataDecodeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public DataDecodeException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git 
a/src/test/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverterTest.java
 
b/src/test/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverterTest.java
new file mode 100644
index 0000000..421490c
--- /dev/null
+++ 
b/src/test/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverterTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.doris.kafka.connector.decode.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DorisAvroConverterTest {
+    private static final String USER_TOPIC = "user-topic";
+    private static final String PRODUCT_TOPIC = "product-topic";
+    private static final String USER_AVRO_PATH = 
"src/test/resources/decode/avro/user.avsc";
+    private static final String PRODUCT_AVRO_PATH = 
"src/test/resources/decode/avro/product.avsc";
+    private final DorisAvroConverter avroConverter = new DorisAvroConverter();
+    private final Map<String, String> configs = new HashMap<>();
+
+    @Before
+    public void init() {
+        String topic2SchemaPath =
+                USER_TOPIC
+                        + ":file://"
+                        + USER_AVRO_PATH
+                        + ", "
+                        + PRODUCT_TOPIC
+                        + ":file://"
+                        + PRODUCT_AVRO_PATH;
+        configs.put(DorisAvroConverter.AVRO_TOPIC_SCHEMA_FILEPATH, 
topic2SchemaPath);
+    }
+
+    @Test
+    public void testParseTopicSchema() throws IOException {
+        avroConverter.parseTopic2Schema(configs);
+        Map<String, Schema> topic2SchemaMap = 
avroConverter.getTopic2SchemaMap();
+
+        Assert.assertTrue(topic2SchemaMap.containsKey(USER_TOPIC));
+        Assert.assertTrue(topic2SchemaMap.containsKey(PRODUCT_TOPIC));
+
+        Schema productSchema = new Schema.Parser().parse(new 
File(PRODUCT_AVRO_PATH));
+        Schema userSchema = new Schema.Parser().parse(new 
File(USER_AVRO_PATH));
+        Assert.assertEquals(topic2SchemaMap.get(USER_TOPIC), userSchema);
+        Assert.assertEquals(topic2SchemaMap.get(PRODUCT_TOPIC), productSchema);
+    }
+
+    @Test
+    public void testConvert() throws IOException {
+        Schema userSchema = new Schema.Parser().parse(new 
File(USER_AVRO_PATH));
+        byte[] userAvroData = generateUserRecord(userSchema);
+        avroConverter.parseTopic2Schema(configs);
+        SchemaAndValue schemaAndValue = 
avroConverter.toConnectData("user-topic", userAvroData);
+        Object o = schemaAndValue.value();
+
+        Assert.assertEquals(o, "{\"id\": 1, \"name\": \"test\", \"age\": 18}");
+    }
+
+    private byte[] generateUserRecord(Schema userSchema) throws IOException {
+        GenericData.Record userRecord = new GenericData.Record(userSchema);
+        userRecord.put("id", 1);
+        userRecord.put("name", "test");
+        userRecord.put("age", 18);
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        BinaryEncoder encoder = 
EncoderFactory.get().binaryEncoder(outputStream, null);
+        DatumWriter<GenericRecord> writer = new 
SpecificDatumWriter<GenericRecord>(userSchema);
+        writer.write(userRecord, encoder);
+        encoder.flush();
+        return outputStream.toByteArray();
+    }
+}
diff --git a/src/test/resources/decode/avro/product.avsc 
b/src/test/resources/decode/avro/product.avsc
new file mode 100644
index 0000000..677bc78
--- /dev/null
+++ b/src/test/resources/decode/avro/product.avsc
@@ -0,0 +1,18 @@
+{
+    "type": "record",
+    "name": "Product",
+    "fields": [
+        {
+            "name": "id",
+            "type": "int"
+        },
+        {
+            "name": "name",
+            "type": "string"
+        },
+        {
+            "name": "price",
+            "type": "double"
+        }
+    ]
+}
\ No newline at end of file
diff --git a/src/test/resources/decode/avro/user.avsc 
b/src/test/resources/decode/avro/user.avsc
new file mode 100644
index 0000000..d493c84
--- /dev/null
+++ b/src/test/resources/decode/avro/user.avsc
@@ -0,0 +1,18 @@
+{
+    "type": "record",
+    "name": "User",
+    "fields": [
+        {
+            "name": "id",
+            "type": "int"
+        },
+        {
+            "name": "name",
+            "type": "string"
+        },
+        {
+            "name": "age",
+            "type": "int"
+        }
+    ]
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to