This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new bad24da [feature]add arrow type for streamload (#265) bad24da is described below commit bad24da688fce7001825f0e2d8b0385176c5423a Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Wed Dec 27 11:03:34 2023 +0800 [feature]add arrow type for streamload (#265) --- .github/workflows/build-extension.yml | 23 +++- flink-doris-connector/build.sh | 4 +- flink-doris-connector/pom.xml | 21 ++- .../doris/flink/sink/batch/BatchRecordBuffer.java | 4 +- .../flink/sink/batch/DorisBatchStreamLoad.java | 16 ++- .../doris/flink/sink/batch/DorisBatchWriter.java | 29 ++-- .../doris/flink/sink/writer/DorisStreamLoad.java | 23 ++-- .../doris/flink/sink/writer/DorisWriter.java | 24 ++-- .../doris/flink/sink/writer/LoadConstants.java | 1 + .../flink/sink/writer/serializer/DorisRecord.java | 3 + .../writer/serializer/DorisRecordSerializer.java | 8 ++ .../sink/writer/serializer/RowDataSerializer.java | 80 ++++++++++- .../runtime/arrow/serializers/ArrowSerializer.java | 153 +++++++++++++++++++++ .../flink/sink/writer/TestRowDataSerializer.java | 37 +++++ 14 files changed, 376 insertions(+), 50 deletions(-) diff --git a/.github/workflows/build-extension.yml b/.github/workflows/build-extension.yml index 2038d21..7259bb4 100644 --- a/.github/workflows/build-extension.yml +++ b/.github/workflows/build-extension.yml @@ -42,5 +42,26 @@ jobs: run: | cd flink-doris-connector && mvn clean package \ -Dflink.version=1.15.0 \ - -Dflink.minor.version=1.15 + -Dflink.minor.version=1.15 \ + -Dflink.python.id=flink-python_2.12 + - name: Build flink connector 1.16 + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=1.16.0 \ + -Dflink.minor.version=1.16 \ + -Dflink.python.id=flink-python + + - name: Build flink connector 1.17 + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=1.17.0 \ + -Dflink.minor.version=1.17 \ + -Dflink.python.id=flink-python + + - name: Build flink connector 1.18 + run: | + cd flink-doris-connector && mvn clean package \ + -Dflink.version=1.18.0 \ + -Dflink.minor.version=1.18 \ + -Dflink.python.id=flink-python diff --git a/flink-doris-connector/build.sh b/flink-doris-connector/build.sh index a646f58..1c67f2b 100755 --- a/flink-doris-connector/build.sh +++ b/flink-doris-connector/build.sh @@ -142,8 +142,10 @@ selectFlink() { FLINK_VERSION=0 selectFlink flinkVer=$? +FLINK_PYTHON_ID="flink-python" if [ ${flinkVer} -eq 1 ]; then FLINK_VERSION="1.15.0" + FLINK_PYTHON_ID="flink-python_2.12" elif [ ${flinkVer} -eq 2 ]; then FLINK_VERSION="1.16.0" elif [ ${flinkVer} -eq 3 ]; then @@ -160,7 +162,7 @@ FLINK_MAJOR_VERSION=0 echo_g " flink version: ${FLINK_VERSION}, major version: ${FLINK_MAJOR_VERSION}" echo_g " build starting..." -${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.major.version=${FLINK_MAJOR_VERSION} "$@" +${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.major.version=${FLINK_MAJOR_VERSION} -Dflink.python.id=${FLINK_PYTHON_ID} "$@" EXIT_CODE=$? if [ $EXIT_CODE -eq 0 ]; then diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index a54562e..aedb352 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -71,8 +71,9 @@ under the License. <flink.version>1.18.0</flink.version> <flink.major.version>1.18</flink.major.version> <flink.sql.cdc.version>2.4.2</flink.sql.cdc.version> + <flink.python.id>flink-python</flink.python.id> <libthrift.version>0.16.0</libthrift.version> - <arrow.version>5.0.0</arrow.version> + <arrow.version>13.0.0</arrow.version> <maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version> <maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version> <maven-source-plugin.version>3.2.1</maven-source-plugin.version> @@ -84,7 +85,6 @@ under the License. <spotless.version>2.4.2</spotless.version> <httpcomponents.version>4.5.13</httpcomponents.version> <commons-codec.version>1.15</commons-codec.version> - <netty.version>4.1.77.Final</netty.version> <fasterxml.version>2.13.3</fasterxml.version> <guava.version>31.1-jre</guava.version> <slf4j.version>1.7.25</slf4j.version> @@ -137,6 +137,13 @@ under the License. <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>${flink.python.id}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> @@ -195,19 +202,9 @@ under the License. <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty-common</artifactId> - </exclusion> </exclusions> </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-common</artifactId> - <version>${netty.version}</version> - </dependency> - <!-- jackson --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java index 297cb18..df40e7a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java @@ -57,7 +57,7 @@ public class BatchRecordBuffer { ensureCapacity(record.length); if (loadBatchFirstRecord) { loadBatchFirstRecord = false; - } else { + } else if (lineDelimiter != null) { this.buffer.put(this.lineDelimiter); } this.buffer.put(record); @@ -67,7 +67,7 @@ public class BatchRecordBuffer { @VisibleForTesting public void ensureCapacity(int length) { - int lineDelimiterSize = this.lineDelimiter.length; + int lineDelimiterSize = this.lineDelimiter == null ? 0 : this.lineDelimiter.length; if (buffer.remaining() - lineDelimiterSize >= length) { return; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 7ca0cda..f32ce2c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -61,6 +61,9 @@ import java.util.concurrent.atomic.AtomicReference; import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; +import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; +import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; +import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; @@ -105,10 +108,15 @@ public class DorisBatchStreamLoad implements Serializable { this.password = dorisOptions.getPassword(); this.loadProps = executionOptions.getStreamLoadProp(); this.labelGenerator = labelGenerator; - this.lineDelimiter = - EscapeHandler.escapeString( - loadProps.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) - .getBytes(); + if (loadProps.getProperty(FORMAT_KEY, CSV).equals(ARROW)) { + this.lineDelimiter = null; + } else { + this.lineDelimiter = + EscapeHandler.escapeString( + loadProps.getProperty( + LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) + .getBytes(); + } this.executionOptions = executionOptions; this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java index 43aff7f..6a6576c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java @@ -77,14 +77,15 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN> { this.dorisReadOptions = dorisReadOptions; this.executionOptions = executionOptions; this.flushIntervalMs = executionOptions.getBufferFlushIntervalMs(); + serializer.initial(); } public void initializeLoad() throws IOException { this.batchStreamLoad = new DorisBatchStreamLoad( dorisOptions, dorisReadOptions, executionOptions, labelGenerator); - // when uploading data in streaming mode, - // we need to regularly detect whether there areexceptions. + // when uploading data in streaming mode, we need to regularly detect whether there are + // exceptions. scheduledExecutorService.scheduleWithFixedDelay( this::intervalFlush, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS); } @@ -101,13 +102,24 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN> { @Override public void write(IN in, Context context) throws IOException, InterruptedException { checkFlushException(); - String db = this.database; - String tbl = this.table; - DorisRecord record = serializer.serialize(in); + writeOneDorisRecord(serializer.serialize(in)); + } + + @Override + public void flush(boolean flush) throws IOException, InterruptedException { + checkFlushException(); + writeOneDorisRecord(serializer.flush()); + LOG.info("checkpoint flush triggered."); + batchStreamLoad.flush(null, true); + } + + public void writeOneDorisRecord(DorisRecord record) throws InterruptedException { if (record == null || record.getRow() == null) { // ddl or value is null return; } + String db = this.database; + String tbl = this.table; // multi table load if (record.getTableIdentifier() != null) { db = record.getDatabase(); @@ -116,13 +128,6 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN> { batchStreamLoad.writeRecord(db, tbl, record.getRow()); } - @Override - public void flush(boolean flush) throws IOException, InterruptedException { - checkFlushException(); - LOG.info("checkpoint flush triggered."); - batchStreamLoad.flush(null, true); - } - @Override public void close() throws Exception { LOG.info("DorisBatchWriter Close"); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 30ff365..b095eb9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -54,6 +54,9 @@ import java.util.regex.Matcher; import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN; +import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; +import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; +import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; @@ -115,11 +118,15 @@ public class DorisStreamLoad implements Serializable { executionOptions.getBufferSize(), executionOptions.getBufferCount(), executionOptions.isUseCache()); - lineDelimiter = - EscapeHandler.escapeString( - streamLoadProp.getProperty( - LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) - .getBytes(); + if (streamLoadProp.getProperty(FORMAT_KEY, CSV).equals(ARROW)) { + lineDelimiter = null; + } else { + lineDelimiter = + EscapeHandler.escapeString( + streamLoadProp.getProperty( + LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) + .getBytes(); + } loadBatchFirstRecord = true; } @@ -157,8 +164,8 @@ public class DorisStreamLoad implements Serializable { LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID); while (true) { try { - // TODO: According to label abort txn. Currently, - // it can only be aborted based on txnid, + // TODO: According to label abort txn. Currently, it can only be aborted based on + // txnid, // so we must first request a streamload based on the label to get the txnid. String label = labelGenerator.generateTableLabel(startChkID); HttpPutBuilder builder = new HttpPutBuilder(); @@ -218,7 +225,7 @@ public class DorisStreamLoad implements Serializable { public void writeRecord(byte[] record) throws IOException { if (loadBatchFirstRecord) { loadBatchFirstRecord = false; - } else { + } else if (lineDelimiter != null) { recordStream.write(lineDelimiter); } recordStream.write(record); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 3fec941..e8cc1ff 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -109,6 +109,7 @@ public class DorisWriter<IN> this.globalLoading = false; initializeLoad(state); + serializer.initial(); } public void initializeLoad(Collection<DorisWriterState> state) { @@ -123,8 +124,8 @@ public class DorisWriter<IN> } // get main work thread. executorThread = Thread.currentThread(); - // when uploading data in streaming mode, - // we need to regularly detect whether there are exceptions. + // when uploading data in streaming mode, we need to regularly detect whether there are + // exceptions. scheduledExecutorService.scheduleWithFixedDelay( this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS); } @@ -167,14 +168,23 @@ public class DorisWriter<IN> @Override public void write(IN in, Context context) throws IOException { checkLoadException(); - String tableKey = dorisOptions.getTableIdentifier(); + writeOneDorisRecord(serializer.serialize(in)); + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + writeOneDorisRecord(serializer.flush()); + } + + public void writeOneDorisRecord(DorisRecord record) throws IOException { - DorisRecord record = serializer.serialize(in); if (record == null || record.getRow() == null) { // ddl or value is null return; } + // multi table load + String tableKey = dorisOptions.getTableIdentifier(); if (record.getTableIdentifier() != null) { tableKey = record.getTableIdentifier(); } @@ -191,11 +201,6 @@ public class DorisWriter<IN> streamLoader.writeRecord(record.getRow()); } - @Override - public void flush(boolean flush) throws IOException, InterruptedException { - // No action is triggered, everything is in the precommit method - } - @Override public Collection<DorisCommittable> prepareCommit() throws IOException, InterruptedException { // Verify whether data is written during a checkpoint @@ -369,5 +374,6 @@ public class DorisWriter<IN> dorisStreamLoad.close(); } } + serializer.close(); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java index 7b0d1d0..2e5d29a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java @@ -27,6 +27,7 @@ public class LoadConstants { public static final String FORMAT_KEY = "format"; public static final String JSON = "json"; public static final String CSV = "csv"; + public static final String ARROW = "arrow"; public static final String NULL_VALUE = "\\N"; public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__"; public static final String READ_JSON_BY_LINE = "read_json_by_line"; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java index d15f07c..6a5bdde 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java @@ -20,6 +20,9 @@ package org.apache.doris.flink.sink.writer.serializer; import java.io.Serializable; public class DorisRecord implements Serializable { + + public static DorisRecord empty = new DorisRecord(); + private String database; private String table; private byte[] row; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java index c1135fa..5582ea5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java @@ -35,4 +35,12 @@ public interface DorisRecordSerializer<T> extends Serializable { * @throws IOException */ DorisRecord serialize(T record) throws IOException; + + default void initial() {} + + default DorisRecord flush() { + return DorisRecord.empty; + } + + default void close() throws Exception {} } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java index 2615fb9..f7d9874 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java @@ -18,20 +18,29 @@ package org.apache.doris.flink.sink.writer.serializer; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.deserialization.converter.DorisRowConverter; import org.apache.doris.flink.sink.EscapeHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.StringJoiner; +import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN; import static org.apache.doris.flink.sink.writer.LoadConstants.JSON; @@ -39,12 +48,18 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.NULL_VALUE; /** Serializer for RowData. */ public class RowDataSerializer implements DorisRecordSerializer<RowData> { + private static final Logger LOG = LoggerFactory.getLogger(RowDataSerializer.class); String[] fieldNames; String type; private ObjectMapper objectMapper; private final String fieldDelimiter; private final boolean enableDelete; private final DorisRowConverter rowConverter; + private ArrowSerializer arrowSerializer; + ByteArrayOutputStream outputStream; + private final int arrowBatchCnt = 1000; + private int arrowWriteCnt = 0; + private final DataType[] dataTypes; private RowDataSerializer( String[] fieldNames, @@ -59,9 +74,25 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> { if (JSON.equals(type)) { objectMapper = new ObjectMapper(); } + this.dataTypes = dataTypes; this.rowConverter = new DorisRowConverter().setExternalConverter(dataTypes); } + @Override + public void initial() { + if (ARROW.equals(type)) { + LogicalType[] logicalTypes = TypeConversions.fromDataToLogicalType(dataTypes); + RowType rowType = RowType.of(logicalTypes, fieldNames); + arrowSerializer = new ArrowSerializer(rowType, rowType); + outputStream = new ByteArrayOutputStream(); + try { + arrowSerializer.open(new ByteArrayInputStream(new byte[0]), outputStream); + } catch (Exception e) { + throw new RuntimeException("failed to open arrow serializer:", e); + } + } + } + @Override public DorisRecord serialize(RowData record) throws IOException { int maxIndex = Math.min(record.getArity(), fieldNames.length); @@ -70,12 +101,54 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> { valString = buildJsonString(record, maxIndex); } else if (CSV.equals(type)) { valString = buildCSVString(record, maxIndex); + } else if (ARROW.equals(type)) { + arrowWriteCnt += 1; + arrowSerializer.write(record); + if (arrowWriteCnt < arrowBatchCnt) { + return DorisRecord.empty; + } + return arrowToDorisRecord(); } else { throw new IllegalArgumentException("The type " + type + " is not supported!"); } return DorisRecord.of(valString.getBytes(StandardCharsets.UTF_8)); } + @Override + public DorisRecord flush() { + if (JSON.equals(type) || CSV.equals(type)) { + return DorisRecord.empty; + } else if (ARROW.equals(type)) { + return arrowToDorisRecord(); + } else { + throw new IllegalArgumentException("The type " + type + " is not supported!"); + } + } + + @Override + public void close() throws Exception { + if (ARROW.equals(type)) { + arrowSerializer.close(); + } + } + + public DorisRecord arrowToDorisRecord() { + if (arrowWriteCnt == 0) { + return DorisRecord.empty; + } + arrowWriteCnt = 0; + try { + arrowSerializer.finishCurrentBatch(); + byte[] bytes = outputStream.toByteArray(); + outputStream.reset(); + arrowSerializer.resetWriter(); + return DorisRecord.of(bytes); + } catch (Exception e) { + LOG.error("Failed to convert arrow batch:", e); + } + return DorisRecord.empty; + } + public String buildJsonString(RowData record, int maxIndex) throws IOException { int fieldIndex = 0; Map<String, String> valueMap = new HashMap<>(); @@ -155,9 +228,14 @@ public class RowDataSerializer implements DorisRecordSerializer<RowData> { public RowDataSerializer build() { Preconditions.checkState( - CSV.equals(type) && fieldDelimiter != null || JSON.equals(type)); + CSV.equals(type) && fieldDelimiter != null + || JSON.equals(type) + || ARROW.equals(type)); Preconditions.checkNotNull(dataTypes); Preconditions.checkNotNull(fieldNames); + if (ARROW.equals(type)) { + Preconditions.checkArgument(!deletable); + } return new RowDataSerializer(fieldNames, dataTypes, type, fieldDelimiter, deletable); } } diff --git a/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java b/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java new file mode 100644 index 0000000..29f809f --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.serializers; + +import org.apache.flink.api.python.shaded.org.apache.arrow.memory.BufferAllocator; +import org.apache.flink.api.python.shaded.org.apache.arrow.memory.RootAllocator; +import org.apache.flink.api.python.shaded.org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.arrow.ArrowReader; +import org.apache.flink.table.runtime.arrow.ArrowUtils; +import org.apache.flink.table.runtime.arrow.ArrowWriter; +import org.apache.flink.table.types.logical.RowType; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * this code is copied from flink-python, and modified finishCurrentBatch to add end operation. + * + * <p>The base class ArrowSerializer which will serialize/deserialize RowType data to/from arrow + * bytes. + */ +public final class ArrowSerializer { + + static { + ArrowUtils.checkArrowUsable(); + } + + /** The input RowType. */ + protected final RowType inputType; + + /** The output RowType. */ + protected final RowType outputType; + + /** Allocator which is used for byte buffer allocation. */ + private transient BufferAllocator allocator; + + /** Reader which is responsible for deserialize the Arrow format data to the Flink rows. */ + private transient ArrowReader arrowReader; + + /** + * Reader which is responsible for convert the execution result from byte array to arrow format. + */ + private transient ArrowStreamReader arrowStreamReader; + + /** + * Container that holds a set of vectors for the input elements to be sent to the Python worker. + */ + transient VectorSchemaRoot rootWriter; + + /** Writer which is responsible for serialize the input elements to arrow format. */ + private transient ArrowWriter<RowData> arrowWriter; + + /** Writer which is responsible for convert the arrow format data into byte array. */ + private transient ArrowStreamWriter arrowStreamWriter; + + /** Reusable InputStream used to holding the execution results to be deserialized. */ + private transient InputStream bais; + + /** Reusable OutputStream used to holding the serialized input elements. */ + private transient OutputStream baos; + + public ArrowSerializer(RowType inputType, RowType outputType) { + this.inputType = inputType; + this.outputType = outputType; + } + + public void open(InputStream bais, OutputStream baos) throws Exception { + this.bais = bais; + this.baos = baos; + allocator = new RootAllocator(Long.MAX_VALUE); + arrowStreamReader = new ArrowStreamReader(bais, allocator); + + rootWriter = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(inputType), allocator); + arrowWriter = createArrowWriter(); + arrowStreamWriter = new ArrowStreamWriter(rootWriter, null, baos); + arrowStreamWriter.start(); + } + + public int load() throws IOException { + arrowStreamReader.loadNextBatch(); + VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); + if (arrowReader == null) { + arrowReader = createArrowReader(root); + } + return root.getRowCount(); + } + + public RowData read(int i) { + return arrowReader.read(i); + } + + public void write(RowData element) { + arrowWriter.write(element); + } + + public void close() throws Exception { + arrowStreamWriter.end(); + arrowStreamReader.close(); + rootWriter.close(); + allocator.close(); + } + + /** Creates an {@link ArrowWriter}. */ + public ArrowWriter<RowData> createArrowWriter() { + return ArrowUtils.createRowDataArrowWriter(rootWriter, inputType); + } + + public ArrowReader createArrowReader(VectorSchemaRoot root) { + return ArrowUtils.createArrowReader(root, outputType); + } + + /** + * Forces to finish the processing of the current batch of elements. It will serialize the batch + * of elements into one arrow batch. + */ + public void finishCurrentBatch() throws Exception { + arrowWriter.finish(); + arrowStreamWriter.writeBatch(); + arrowStreamWriter.end(); + arrowWriter.reset(); + } + + public void resetReader() throws IOException { + arrowReader = null; + arrowStreamReader.close(); + arrowStreamReader = new ArrowStreamReader(bais, allocator); + } + + public void resetWriter() throws IOException { + arrowStreamWriter = new ArrowStreamWriter(rootWriter, null, baos); + arrowStreamWriter.start(); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java index d1028fe..84e2971 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java @@ -19,8 +19,13 @@ package org.apache.doris.flink.sink.writer; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; +import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.RowKind; import com.fasterxml.jackson.core.type.TypeReference; @@ -30,6 +35,8 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -132,4 +139,34 @@ public class TestRowDataSerializer { Assert.assertEquals("1", serializer.parseDeleteSign(RowKind.DELETE)); Assert.assertEquals("1", serializer.parseDeleteSign(RowKind.UPDATE_BEFORE)); } + + @Test + public void testArrowType() throws Exception { + RowDataSerializer serializer = + RowDataSerializer.builder() + .setFieldNames(fieldNames) + .setFieldType(dataTypes) + .setType("arrow") + .enableDelete(false) + .build(); + + // write data to binary + serializer.initial(); + serializer.serialize(rowData); + byte[] serializedValue = serializer.flush().getRow(); + + // read data from binary + LogicalType[] logicalTypes = TypeConversions.fromDataToLogicalType(dataTypes); + RowType rowType = RowType.of(logicalTypes, fieldNames); + ArrowSerializer arrowSerializer = new ArrowSerializer(rowType, rowType); + ByteArrayInputStream input = new ByteArrayInputStream(serializedValue); + arrowSerializer.open(input, new ByteArrayOutputStream(0)); + int cnt = arrowSerializer.load(); + RowData data = arrowSerializer.read(0); + + Assert.assertEquals(1, cnt); + Assert.assertEquals(3, data.getInt(0)); + Assert.assertEquals("test", data.getString(1).toString()); + Assert.assertEquals(60.2, data.getDouble(2), 0.001); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org