This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new e43e59f [feature]Implement DorisAvroConverter to support parsing schema from avro avsc file path (#32) e43e59f is described below commit e43e59f5d42eda6a3844a9b8825a9971bcefe4ad Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed Jun 26 16:49:26 2024 +0800 [feature]Implement DorisAvroConverter to support parsing schema from avro avsc file path (#32) --- .github/workflows/license-eyes.yml | 4 + .../workflows/license-eyes.yml => .licenserc.yaml | 31 ++-- .../kafka/connector/decode/DorisConverter.java | 40 +++++ .../kafka/connector/decode/DorisJsonSchema.java | 90 ++++++++++ .../connector/decode/avro/DorisAvroConverter.java | 194 +++++++++++++++++++++ .../connector/exception/DataDecodeException.java | 35 ++++ .../decode/avro/DorisAvroConverterTest.java | 96 ++++++++++ src/test/resources/decode/avro/product.avsc | 18 ++ src/test/resources/decode/avro/user.avsc | 18 ++ 9 files changed, 507 insertions(+), 19 deletions(-) diff --git a/.github/workflows/license-eyes.yml b/.github/workflows/license-eyes.yml index 02d108a..ba0c303 100644 --- a/.github/workflows/license-eyes.yml +++ b/.github/workflows/license-eyes.yml @@ -23,6 +23,7 @@ on: push: branches: - master + jobs: license-check: name: "License Check" @@ -30,7 +31,10 @@ jobs: steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" uses: actions/checkout@v2 + - name: Check License uses: apache/skywalking-eyes@v0.2.0 + with: + config-path: ./.licenserc.yaml env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/license-eyes.yml b/.licenserc.yaml similarity index 66% copy from .github/workflows/license-eyes.yml copy to .licenserc.yaml index 02d108a..ca50638 100644 --- a/.github/workflows/license-eyes.yml +++ b/.licenserc.yaml @@ -15,22 +15,15 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# ---- -name: License Check -on: - pull_request: - push: - branches: - - master -jobs: - license-check: - name: "License Check" - runs-on: ubuntu-latest - steps: - - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@v2 - - name: Check License - uses: apache/skywalking-eyes@v0.2.0 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + +header: + license: + spdx-id: Apache-2.0 + copyright-owner: Apache Software Foundation + + paths-ignore: + - 'LICENSE' + - '.gitignore' + - 'src/test/resources/decode/avro/**' + + comment: on-failure \ No newline at end of file diff --git a/src/main/java/org/apache/doris/kafka/connector/decode/DorisConverter.java b/src/main/java/org/apache/doris/kafka/connector/decode/DorisConverter.java new file mode 100644 index 0000000..6b0e960 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/decode/DorisConverter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.decode; + +import java.util.Map; +import org.apache.doris.kafka.connector.exception.DataDecodeException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.storage.Converter; + +public abstract class DorisConverter implements Converter { + + /** unused */ + @Override + public void configure(final Map<String, ?> map, final boolean b) { + // not necessary + } + + /** doesn't support data source connector */ + @Override + public byte[] fromConnectData(String topic, Schema schema, Object value) { + throw new DataDecodeException("DorisConverter doesn't support data source connector yet."); + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/decode/DorisJsonSchema.java b/src/main/java/org/apache/doris/kafka/connector/decode/DorisJsonSchema.java new file mode 100644 index 0000000..55f7fad --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/decode/DorisJsonSchema.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.decode; + +import java.util.List; +import java.util.Map; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; + +public class DorisJsonSchema implements Schema { + static String NAME = "DORIS_JSON_SCHEMA"; + static int VERSION = 1; + + @Override + public Schema.Type type() { + return Type.STRUCT; + } + + @Override + public boolean isOptional() { + return false; + } + + @Override + public Object defaultValue() { + return null; + } + + @Override + public String name() { + return NAME; + } + + @Override + public Integer version() { + return VERSION; + } + + @Override + public String doc() { + return null; + } + + @Override + public Map<String, String> parameters() { + return null; + } + + @Override + public Schema keySchema() { + return null; + } + + @Override + public Schema valueSchema() { + return null; + } + + @Override + public List<Field> fields() { + return null; + } + + @Override + public Field field(final String s) { + return null; + } + + @Override + public Schema schema() { + return null; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverter.java b/src/main/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverter.java new file mode 100644 index 0000000..56d8090 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverter.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.decode.avro; + +import com.google.common.annotations.VisibleForTesting; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.apache.avro.Conversions; +import org.apache.avro.Schema; +import org.apache.avro.SchemaParseException; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.doris.kafka.connector.decode.DorisConverter; +import org.apache.doris.kafka.connector.decode.DorisJsonSchema; +import org.apache.doris.kafka.connector.exception.DataDecodeException; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Supports specifying the avsc file of avro, can directly obtain the avro schema by parsing the + * file. + */ +public class DorisAvroConverter extends DorisConverter { + public static final String AVRO_TOPIC_SCHEMA_FILEPATH = "avro.topic2schema.filepath"; + private static final Logger LOG = LoggerFactory.getLogger(DorisAvroConverter.class); + private final Map<String, Schema> topic2SchemaMap = new HashMap<>(); + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + parseTopic2Schema(configs); + } + + @VisibleForTesting + public void parseTopic2Schema(final Map<String, ?> configs) { + Object avroSchemaPath = configs.get(AVRO_TOPIC_SCHEMA_FILEPATH); + + if (avroSchemaPath == null) { + LOG.error(AVRO_TOPIC_SCHEMA_FILEPATH + " can not be empty in DorisAvroConverter.class"); + throw new DataDecodeException( + AVRO_TOPIC_SCHEMA_FILEPATH + " can not be empty in DorisAvroConverter.class"); + } + + if (avroSchemaPath instanceof String) { + Map<String, String> topic2SchemaFileMap = parseTopicSchemaPath((String) avroSchemaPath); + for (Map.Entry<String, String> entry : topic2SchemaFileMap.entrySet()) { + String topic = entry.getKey(); + String schemaPath = entry.getValue(); + Schema schema; + try { + schema = new Schema.Parser().parse(new File(schemaPath)); + } catch (SchemaParseException | IOException e) { + LOG.error( + "the provided for " + + AVRO_TOPIC_SCHEMA_FILEPATH + + " is no valid, failed to parse {} {}", + topic, + schemaPath, + e); + throw new DataDecodeException( + "the provided for " + + AVRO_TOPIC_SCHEMA_FILEPATH + + " is no valid, failed to parse " + + topic + + " " + + schemaPath + + ".\n", + e); + } + topic2SchemaMap.put(topic, schema); + } + } else { + LOG.error(AVRO_TOPIC_SCHEMA_FILEPATH + " must be a string."); + throw new DataDecodeException( + "The " + + AVRO_TOPIC_SCHEMA_FILEPATH + + " is provided, but can not be parsed as an Avro schema."); + } + } + + /** + * Parse the mapping between topic and schema file paths. + * + * @param topicSchemaPath topic1:file:///schema_test.avsc,topic2:file:///schema_test2.avsc + */ + private Map<String, String> parseTopicSchemaPath(String topicSchemaPath) { + Map<String, String> topic2SchemaPathMap = new HashMap<>(); + boolean isInvalid = false; + for (String s : topicSchemaPath.split(",")) { + String[] split = s.split(":file://"); + if (split.length != 2 || split[0].trim().isEmpty() || split[1].trim().isEmpty()) { + LOG.error( + "Invalid {} config format: {}", + AVRO_TOPIC_SCHEMA_FILEPATH, + topicSchemaPath); + isInvalid = true; + } + + String topic = split[0].trim(); + String schemaPath = split[1].trim(); + + if (topic2SchemaPathMap.containsKey(topic)) { + LOG.error("topic name {} is duplicated.", topic); + isInvalid = true; + } + topic2SchemaPathMap.put(topic, schemaPath); + } + if (isInvalid) { + throw new DataDecodeException("Failed to parse " + AVRO_TOPIC_SCHEMA_FILEPATH + " map"); + } + return topic2SchemaPathMap; + } + + @Override + public SchemaAndValue toConnectData(String topic, byte[] value) { + if (value == null) { + LOG.warn("cast bytes is null"); + return new SchemaAndValue(new DorisJsonSchema(), null); + } + + if (topic2SchemaMap.containsKey(topic)) { + Schema schema = topic2SchemaMap.get(topic); + ByteBuffer buffer = ByteBuffer.wrap(value); + int length = buffer.limit(); + byte[] data = new byte[length]; + buffer.get(data, 0, length); + try { + return new SchemaAndValue( + new DorisJsonSchema(), parseAvroWithSchema(data, schema, schema)); + } catch (IOException e) { + LOG.error("failed to parse AVRO record\n" + e); + throw new DataDecodeException("failed to parse AVRO record\n", e); + } + } else { + LOG.error("The avro schema file of {} is not provided.", topic); + throw new DataDecodeException("The avro schema file of " + topic + " is not provided."); + } + } + + /** + * Parse Avro record with a writer schema and a reader schema. The writer and the reader schema + * have to be compatible as described in + * https://avro.apache.org/docs/1.9.2/spec.html#Schema+Resolution + * + * @param data avro data + * @param writerSchema avro schema with which data got serialized + * @param readerSchema avro schema with which will to be read and returned + */ + private String parseAvroWithSchema(final byte[] data, Schema writerSchema, Schema readerSchema) + throws IOException { + final GenericData genericData = new GenericData(); + // Conversion for logical type Decimal. + // There are conversions for other logical types as well. + genericData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + + InputStream is = new ByteArrayInputStream(data); + Decoder decoder = DecoderFactory.get().binaryDecoder(is, null); + DatumReader<GenericRecord> reader = + new GenericDatumReader<>(writerSchema, readerSchema, genericData); + GenericRecord datum = reader.read(null, decoder); + return datum.toString(); + } + + @VisibleForTesting + public Map<String, Schema> getTopic2SchemaMap() { + return topic2SchemaMap; + } +} diff --git a/src/main/java/org/apache/doris/kafka/connector/exception/DataDecodeException.java b/src/main/java/org/apache/doris/kafka/connector/exception/DataDecodeException.java new file mode 100644 index 0000000..1ae1f11 --- /dev/null +++ b/src/main/java/org/apache/doris/kafka/connector/exception/DataDecodeException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.exception; + +public class DataDecodeException extends DorisException { + + public DataDecodeException(String message) { + super(message); + } + + public DataDecodeException(String message, Throwable cause) { + super(message, cause); + } + + public DataDecodeException(Throwable cause) { + super(cause); + } +} diff --git a/src/test/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverterTest.java b/src/test/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverterTest.java new file mode 100644 index 0000000..421490c --- /dev/null +++ b/src/test/java/org/apache/doris/kafka/connector/decode/avro/DorisAvroConverterTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.doris.kafka.connector.decode.avro; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.*; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class DorisAvroConverterTest { + private static final String USER_TOPIC = "user-topic"; + private static final String PRODUCT_TOPIC = "product-topic"; + private static final String USER_AVRO_PATH = "src/test/resources/decode/avro/user.avsc"; + private static final String PRODUCT_AVRO_PATH = "src/test/resources/decode/avro/product.avsc"; + private final DorisAvroConverter avroConverter = new DorisAvroConverter(); + private final Map<String, String> configs = new HashMap<>(); + + @Before + public void init() { + String topic2SchemaPath = + USER_TOPIC + + ":file://" + + USER_AVRO_PATH + + ", " + + PRODUCT_TOPIC + + ":file://" + + PRODUCT_AVRO_PATH; + configs.put(DorisAvroConverter.AVRO_TOPIC_SCHEMA_FILEPATH, topic2SchemaPath); + } + + @Test + public void testParseTopicSchema() throws IOException { + avroConverter.parseTopic2Schema(configs); + Map<String, Schema> topic2SchemaMap = avroConverter.getTopic2SchemaMap(); + + Assert.assertTrue(topic2SchemaMap.containsKey(USER_TOPIC)); + Assert.assertTrue(topic2SchemaMap.containsKey(PRODUCT_TOPIC)); + + Schema productSchema = new Schema.Parser().parse(new File(PRODUCT_AVRO_PATH)); + Schema userSchema = new Schema.Parser().parse(new File(USER_AVRO_PATH)); + Assert.assertEquals(topic2SchemaMap.get(USER_TOPIC), userSchema); + Assert.assertEquals(topic2SchemaMap.get(PRODUCT_TOPIC), productSchema); + } + + @Test + public void testConvert() throws IOException { + Schema userSchema = new Schema.Parser().parse(new File(USER_AVRO_PATH)); + byte[] userAvroData = generateUserRecord(userSchema); + avroConverter.parseTopic2Schema(configs); + SchemaAndValue schemaAndValue = avroConverter.toConnectData("user-topic", userAvroData); + Object o = schemaAndValue.value(); + + Assert.assertEquals(o, "{\"id\": 1, \"name\": \"test\", \"age\": 18}"); + } + + private byte[] generateUserRecord(Schema userSchema) throws IOException { + GenericData.Record userRecord = new GenericData.Record(userSchema); + userRecord.put("id", 1); + userRecord.put("name", "test"); + userRecord.put("age", 18); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); + DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(userSchema); + writer.write(userRecord, encoder); + encoder.flush(); + return outputStream.toByteArray(); + } +} diff --git a/src/test/resources/decode/avro/product.avsc b/src/test/resources/decode/avro/product.avsc new file mode 100644 index 0000000..677bc78 --- /dev/null +++ b/src/test/resources/decode/avro/product.avsc @@ -0,0 +1,18 @@ +{ + "type": "record", + "name": "Product", + "fields": [ + { + "name": "id", + "type": "int" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "price", + "type": "double" + } + ] +} \ No newline at end of file diff --git a/src/test/resources/decode/avro/user.avsc b/src/test/resources/decode/avro/user.avsc new file mode 100644 index 0000000..d493c84 --- /dev/null +++ b/src/test/resources/decode/avro/user.avsc @@ -0,0 +1,18 @@ +{ + "type": "record", + "name": "User", + "fields": [ + { + "name": "id", + "type": "int" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "age", + "type": "int" + } + ] +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org