This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new bad24da  [feature]add arrow type for streamload (#265)
bad24da is described below

commit bad24da688fce7001825f0e2d8b0385176c5423a
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Wed Dec 27 11:03:34 2023 +0800

    [feature]add arrow type for streamload (#265)
---
 .github/workflows/build-extension.yml              |  23 +++-
 flink-doris-connector/build.sh                     |   4 +-
 flink-doris-connector/pom.xml                      |  21 ++-
 .../doris/flink/sink/batch/BatchRecordBuffer.java  |   4 +-
 .../flink/sink/batch/DorisBatchStreamLoad.java     |  16 ++-
 .../doris/flink/sink/batch/DorisBatchWriter.java   |  29 ++--
 .../doris/flink/sink/writer/DorisStreamLoad.java   |  23 ++--
 .../doris/flink/sink/writer/DorisWriter.java       |  24 ++--
 .../doris/flink/sink/writer/LoadConstants.java     |   1 +
 .../flink/sink/writer/serializer/DorisRecord.java  |   3 +
 .../writer/serializer/DorisRecordSerializer.java   |   8 ++
 .../sink/writer/serializer/RowDataSerializer.java  |  80 ++++++++++-
 .../runtime/arrow/serializers/ArrowSerializer.java | 153 +++++++++++++++++++++
 .../flink/sink/writer/TestRowDataSerializer.java   |  37 +++++
 14 files changed, 376 insertions(+), 50 deletions(-)

diff --git a/.github/workflows/build-extension.yml 
b/.github/workflows/build-extension.yml
index 2038d21..7259bb4 100644
--- a/.github/workflows/build-extension.yml
+++ b/.github/workflows/build-extension.yml
@@ -42,5 +42,26 @@ jobs:
       run: |
         cd flink-doris-connector && mvn clean package \
           -Dflink.version=1.15.0 \
-          -Dflink.minor.version=1.15
+          -Dflink.minor.version=1.15 \
+          -Dflink.python.id=flink-python_2.12
 
+    - name: Build flink connector 1.16
+      run: |
+        cd flink-doris-connector && mvn clean package \
+          -Dflink.version=1.16.0 \
+          -Dflink.minor.version=1.16 \
+          -Dflink.python.id=flink-python
+
+    - name: Build flink connector 1.17
+      run: |
+        cd flink-doris-connector && mvn clean package \
+          -Dflink.version=1.17.0 \
+          -Dflink.minor.version=1.17 \
+          -Dflink.python.id=flink-python
+
+    - name: Build flink connector 1.18
+      run: |
+        cd flink-doris-connector && mvn clean package \
+          -Dflink.version=1.18.0 \
+          -Dflink.minor.version=1.18 \
+          -Dflink.python.id=flink-python
diff --git a/flink-doris-connector/build.sh b/flink-doris-connector/build.sh
index a646f58..1c67f2b 100755
--- a/flink-doris-connector/build.sh
+++ b/flink-doris-connector/build.sh
@@ -142,8 +142,10 @@ selectFlink() {
 FLINK_VERSION=0
 selectFlink
 flinkVer=$?
+FLINK_PYTHON_ID="flink-python"
 if [ ${flinkVer} -eq 1 ]; then
     FLINK_VERSION="1.15.0"
+    FLINK_PYTHON_ID="flink-python_2.12"
 elif [ ${flinkVer} -eq 2 ]; then
     FLINK_VERSION="1.16.0"
 elif [ ${flinkVer} -eq 3 ]; then
@@ -160,7 +162,7 @@ FLINK_MAJOR_VERSION=0
 echo_g " flink version: ${FLINK_VERSION}, major version: 
${FLINK_MAJOR_VERSION}"
 echo_g " build starting..."
 
-${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} 
-Dflink.major.version=${FLINK_MAJOR_VERSION} "$@"
+${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} 
-Dflink.major.version=${FLINK_MAJOR_VERSION} 
-Dflink.python.id=${FLINK_PYTHON_ID} "$@"
 
 EXIT_CODE=$?
 if [ $EXIT_CODE -eq 0 ]; then
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index a54562e..aedb352 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -71,8 +71,9 @@ under the License.
         <flink.version>1.18.0</flink.version>
         <flink.major.version>1.18</flink.major.version>
         <flink.sql.cdc.version>2.4.2</flink.sql.cdc.version>
+        <flink.python.id>flink-python</flink.python.id>
         <libthrift.version>0.16.0</libthrift.version>
-        <arrow.version>5.0.0</arrow.version>
+        <arrow.version>13.0.0</arrow.version>
         <maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
         <maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version>
         <maven-source-plugin.version>3.2.1</maven-source-plugin.version>
@@ -84,7 +85,6 @@ under the License.
         <spotless.version>2.4.2</spotless.version>
         <httpcomponents.version>4.5.13</httpcomponents.version>
         <commons-codec.version>1.15</commons-codec.version>
-        <netty.version>4.1.77.Final</netty.version>
         <fasterxml.version>2.13.3</fasterxml.version>
         <guava.version>31.1-jre</guava.version>
         <slf4j.version>1.7.25</slf4j.version>
@@ -137,6 +137,13 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>${flink.python.id}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.thrift</groupId>
             <artifactId>libthrift</artifactId>
@@ -195,19 +202,9 @@ under the License.
                     <groupId>com.fasterxml.jackson.core</groupId>
                     <artifactId>jackson-databind</artifactId>
                 </exclusion>
-                <exclusion>
-                    <groupId>io.netty</groupId>
-                    <artifactId>netty-common</artifactId>
-                </exclusion>
             </exclusions>
         </dependency>
 
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-common</artifactId>
-            <version>${netty.version}</version>
-        </dependency>
-
         <!--  jackson  -->
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
index 297cb18..df40e7a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
@@ -57,7 +57,7 @@ public class BatchRecordBuffer {
         ensureCapacity(record.length);
         if (loadBatchFirstRecord) {
             loadBatchFirstRecord = false;
-        } else {
+        } else if (lineDelimiter != null) {
             this.buffer.put(this.lineDelimiter);
         }
         this.buffer.put(record);
@@ -67,7 +67,7 @@ public class BatchRecordBuffer {
 
     @VisibleForTesting
     public void ensureCapacity(int length) {
-        int lineDelimiterSize = this.lineDelimiter.length;
+        int lineDelimiterSize = this.lineDelimiter == null ? 0 : 
this.lineDelimiter.length;
         if (buffer.remaining() - lineDelimiterSize >= length) {
             return;
         }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 7ca0cda..f32ce2c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -61,6 +61,9 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
 import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
+import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
+import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
+import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
 
@@ -105,10 +108,15 @@ public class DorisBatchStreamLoad implements Serializable 
{
         this.password = dorisOptions.getPassword();
         this.loadProps = executionOptions.getStreamLoadProp();
         this.labelGenerator = labelGenerator;
-        this.lineDelimiter =
-                EscapeHandler.escapeString(
-                                loadProps.getProperty(LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT))
-                        .getBytes();
+        if (loadProps.getProperty(FORMAT_KEY, CSV).equals(ARROW)) {
+            this.lineDelimiter = null;
+        } else {
+            this.lineDelimiter =
+                    EscapeHandler.escapeString(
+                                    loadProps.getProperty(
+                                            LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT))
+                            .getBytes();
+        }
         this.executionOptions = executionOptions;
         this.flushQueue = new 
LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
         if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
index 43aff7f..6a6576c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -77,14 +77,15 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN> 
{
         this.dorisReadOptions = dorisReadOptions;
         this.executionOptions = executionOptions;
         this.flushIntervalMs = executionOptions.getBufferFlushIntervalMs();
+        serializer.initial();
     }
 
     public void initializeLoad() throws IOException {
         this.batchStreamLoad =
                 new DorisBatchStreamLoad(
                         dorisOptions, dorisReadOptions, executionOptions, 
labelGenerator);
-        // when uploading data in streaming mode,
-        // we need to regularly detect whether there areexceptions.
+        // when uploading data in streaming mode, we need to regularly detect 
whether there are
+        // exceptions.
         scheduledExecutorService.scheduleWithFixedDelay(
                 this::intervalFlush, flushIntervalMs, flushIntervalMs, 
TimeUnit.MILLISECONDS);
     }
@@ -101,13 +102,24 @@ public class DorisBatchWriter<IN> implements 
SinkWriter<IN> {
     @Override
     public void write(IN in, Context context) throws IOException, 
InterruptedException {
         checkFlushException();
-        String db = this.database;
-        String tbl = this.table;
-        DorisRecord record = serializer.serialize(in);
+        writeOneDorisRecord(serializer.serialize(in));
+    }
+
+    @Override
+    public void flush(boolean flush) throws IOException, InterruptedException {
+        checkFlushException();
+        writeOneDorisRecord(serializer.flush());
+        LOG.info("checkpoint flush triggered.");
+        batchStreamLoad.flush(null, true);
+    }
+
+    public void writeOneDorisRecord(DorisRecord record) throws 
InterruptedException {
         if (record == null || record.getRow() == null) {
             // ddl or value is null
             return;
         }
+        String db = this.database;
+        String tbl = this.table;
         // multi table load
         if (record.getTableIdentifier() != null) {
             db = record.getDatabase();
@@ -116,13 +128,6 @@ public class DorisBatchWriter<IN> implements 
SinkWriter<IN> {
         batchStreamLoad.writeRecord(db, tbl, record.getRow());
     }
 
-    @Override
-    public void flush(boolean flush) throws IOException, InterruptedException {
-        checkFlushException();
-        LOG.info("checkpoint flush triggered.");
-        batchStreamLoad.flush(null, true);
-    }
-
     @Override
     public void close() throws Exception {
         LOG.info("DorisBatchWriter Close");
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 30ff365..b095eb9 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -54,6 +54,9 @@ import java.util.regex.Matcher;
 import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
 import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
 import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
+import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
+import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
+import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
 
@@ -115,11 +118,15 @@ public class DorisStreamLoad implements Serializable {
                         executionOptions.getBufferSize(),
                         executionOptions.getBufferCount(),
                         executionOptions.isUseCache());
-        lineDelimiter =
-                EscapeHandler.escapeString(
-                                streamLoadProp.getProperty(
-                                        LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT))
-                        .getBytes();
+        if (streamLoadProp.getProperty(FORMAT_KEY, CSV).equals(ARROW)) {
+            lineDelimiter = null;
+        } else {
+            lineDelimiter =
+                    EscapeHandler.escapeString(
+                                    streamLoadProp.getProperty(
+                                            LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT))
+                            .getBytes();
+        }
         loadBatchFirstRecord = true;
     }
 
@@ -157,8 +164,8 @@ public class DorisStreamLoad implements Serializable {
         LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, 
chkID);
         while (true) {
             try {
-                // TODO: According to label abort txn. Currently,
-                //  it can only be aborted based on txnid,
+                // TODO: According to label abort txn. Currently, it can only 
be aborted based on
+                // txnid,
                 //  so we must first request a streamload based on the label 
to get the txnid.
                 String label = labelGenerator.generateTableLabel(startChkID);
                 HttpPutBuilder builder = new HttpPutBuilder();
@@ -218,7 +225,7 @@ public class DorisStreamLoad implements Serializable {
     public void writeRecord(byte[] record) throws IOException {
         if (loadBatchFirstRecord) {
             loadBatchFirstRecord = false;
-        } else {
+        } else if (lineDelimiter != null) {
             recordStream.write(lineDelimiter);
         }
         recordStream.write(record);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 3fec941..e8cc1ff 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -109,6 +109,7 @@ public class DorisWriter<IN>
         this.globalLoading = false;
 
         initializeLoad(state);
+        serializer.initial();
     }
 
     public void initializeLoad(Collection<DorisWriterState> state) {
@@ -123,8 +124,8 @@ public class DorisWriter<IN>
         }
         // get main work thread.
         executorThread = Thread.currentThread();
-        // when uploading data in streaming mode,
-        // we need to regularly detect whether there are exceptions.
+        // when uploading data in streaming mode, we need to regularly detect 
whether there are
+        // exceptions.
         scheduledExecutorService.scheduleWithFixedDelay(
                 this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
     }
@@ -167,14 +168,23 @@ public class DorisWriter<IN>
     @Override
     public void write(IN in, Context context) throws IOException {
         checkLoadException();
-        String tableKey = dorisOptions.getTableIdentifier();
+        writeOneDorisRecord(serializer.serialize(in));
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+        writeOneDorisRecord(serializer.flush());
+    }
+
+    public void writeOneDorisRecord(DorisRecord record) throws IOException {
 
-        DorisRecord record = serializer.serialize(in);
         if (record == null || record.getRow() == null) {
             // ddl or value is null
             return;
         }
+
         // multi table load
+        String tableKey = dorisOptions.getTableIdentifier();
         if (record.getTableIdentifier() != null) {
             tableKey = record.getTableIdentifier();
         }
@@ -191,11 +201,6 @@ public class DorisWriter<IN>
         streamLoader.writeRecord(record.getRow());
     }
 
-    @Override
-    public void flush(boolean flush) throws IOException, InterruptedException {
-        // No action is triggered, everything is in the precommit method
-    }
-
     @Override
     public Collection<DorisCommittable> prepareCommit() throws IOException, 
InterruptedException {
         // Verify whether data is written during a checkpoint
@@ -369,5 +374,6 @@ public class DorisWriter<IN>
                 dorisStreamLoad.close();
             }
         }
+        serializer.close();
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
index 7b0d1d0..2e5d29a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
@@ -27,6 +27,7 @@ public class LoadConstants {
     public static final String FORMAT_KEY = "format";
     public static final String JSON = "json";
     public static final String CSV = "csv";
+    public static final String ARROW = "arrow";
     public static final String NULL_VALUE = "\\N";
     public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
     public static final String READ_JSON_BY_LINE = "read_json_by_line";
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
index d15f07c..6a5bdde 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java
@@ -20,6 +20,9 @@ package org.apache.doris.flink.sink.writer.serializer;
 import java.io.Serializable;
 
 public class DorisRecord implements Serializable {
+
+    public static DorisRecord empty = new DorisRecord();
+
     private String database;
     private String table;
     private byte[] row;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
index c1135fa..5582ea5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java
@@ -35,4 +35,12 @@ public interface DorisRecordSerializer<T> extends 
Serializable {
      * @throws IOException
      */
     DorisRecord serialize(T record) throws IOException;
+
+    default void initial() {}
+
+    default DorisRecord flush() {
+        return DorisRecord.empty;
+    }
+
+    default void close() throws Exception {}
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
index 2615fb9..f7d9874 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java
@@ -18,20 +18,29 @@
 package org.apache.doris.flink.sink.writer.serializer;
 
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
 import org.apache.doris.flink.sink.EscapeHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.StringJoiner;
 
+import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
 import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
 import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
@@ -39,12 +48,18 @@ import static 
org.apache.doris.flink.sink.writer.LoadConstants.NULL_VALUE;
 
 /** Serializer for RowData. */
 public class RowDataSerializer implements DorisRecordSerializer<RowData> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RowDataSerializer.class);
     String[] fieldNames;
     String type;
     private ObjectMapper objectMapper;
     private final String fieldDelimiter;
     private final boolean enableDelete;
     private final DorisRowConverter rowConverter;
+    private ArrowSerializer arrowSerializer;
+    ByteArrayOutputStream outputStream;
+    private final int arrowBatchCnt = 1000;
+    private int arrowWriteCnt = 0;
+    private final DataType[] dataTypes;
 
     private RowDataSerializer(
             String[] fieldNames,
@@ -59,9 +74,25 @@ public class RowDataSerializer implements 
DorisRecordSerializer<RowData> {
         if (JSON.equals(type)) {
             objectMapper = new ObjectMapper();
         }
+        this.dataTypes = dataTypes;
         this.rowConverter = new 
DorisRowConverter().setExternalConverter(dataTypes);
     }
 
+    @Override
+    public void initial() {
+        if (ARROW.equals(type)) {
+            LogicalType[] logicalTypes = 
TypeConversions.fromDataToLogicalType(dataTypes);
+            RowType rowType = RowType.of(logicalTypes, fieldNames);
+            arrowSerializer = new ArrowSerializer(rowType, rowType);
+            outputStream = new ByteArrayOutputStream();
+            try {
+                arrowSerializer.open(new ByteArrayInputStream(new byte[0]), 
outputStream);
+            } catch (Exception e) {
+                throw new RuntimeException("failed to open arrow serializer:", 
e);
+            }
+        }
+    }
+
     @Override
     public DorisRecord serialize(RowData record) throws IOException {
         int maxIndex = Math.min(record.getArity(), fieldNames.length);
@@ -70,12 +101,54 @@ public class RowDataSerializer implements 
DorisRecordSerializer<RowData> {
             valString = buildJsonString(record, maxIndex);
         } else if (CSV.equals(type)) {
             valString = buildCSVString(record, maxIndex);
+        } else if (ARROW.equals(type)) {
+            arrowWriteCnt += 1;
+            arrowSerializer.write(record);
+            if (arrowWriteCnt < arrowBatchCnt) {
+                return DorisRecord.empty;
+            }
+            return arrowToDorisRecord();
         } else {
             throw new IllegalArgumentException("The type " + type + " is not 
supported!");
         }
         return DorisRecord.of(valString.getBytes(StandardCharsets.UTF_8));
     }
 
+    @Override
+    public DorisRecord flush() {
+        if (JSON.equals(type) || CSV.equals(type)) {
+            return DorisRecord.empty;
+        } else if (ARROW.equals(type)) {
+            return arrowToDorisRecord();
+        } else {
+            throw new IllegalArgumentException("The type " + type + " is not 
supported!");
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (ARROW.equals(type)) {
+            arrowSerializer.close();
+        }
+    }
+
+    public DorisRecord arrowToDorisRecord() {
+        if (arrowWriteCnt == 0) {
+            return DorisRecord.empty;
+        }
+        arrowWriteCnt = 0;
+        try {
+            arrowSerializer.finishCurrentBatch();
+            byte[] bytes = outputStream.toByteArray();
+            outputStream.reset();
+            arrowSerializer.resetWriter();
+            return DorisRecord.of(bytes);
+        } catch (Exception e) {
+            LOG.error("Failed to convert arrow batch:", e);
+        }
+        return DorisRecord.empty;
+    }
+
     public String buildJsonString(RowData record, int maxIndex) throws 
IOException {
         int fieldIndex = 0;
         Map<String, String> valueMap = new HashMap<>();
@@ -155,9 +228,14 @@ public class RowDataSerializer implements 
DorisRecordSerializer<RowData> {
 
         public RowDataSerializer build() {
             Preconditions.checkState(
-                    CSV.equals(type) && fieldDelimiter != null || 
JSON.equals(type));
+                    CSV.equals(type) && fieldDelimiter != null
+                            || JSON.equals(type)
+                            || ARROW.equals(type));
             Preconditions.checkNotNull(dataTypes);
             Preconditions.checkNotNull(fieldNames);
+            if (ARROW.equals(type)) {
+                Preconditions.checkArgument(!deletable);
+            }
             return new RowDataSerializer(fieldNames, dataTypes, type, 
fieldDelimiter, deletable);
         }
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
new file mode 100644
index 0000000..29f809f
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow.serializers;
+
+import 
org.apache.flink.api.python.shaded.org.apache.arrow.memory.BufferAllocator;
+import 
org.apache.flink.api.python.shaded.org.apache.arrow.memory.RootAllocator;
+import 
org.apache.flink.api.python.shaded.org.apache.arrow.vector.VectorSchemaRoot;
+import 
org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.ArrowStreamReader;
+import 
org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * this code is copied from flink-python, and modified finishCurrentBatch to 
add end operation.
+ *
+ * <p>The base class ArrowSerializer which will serialize/deserialize RowType 
data to/from arrow
+ * bytes.
+ */
+public final class ArrowSerializer {
+
+    static {
+        ArrowUtils.checkArrowUsable();
+    }
+
+    /** The input RowType. */
+    protected final RowType inputType;
+
+    /** The output RowType. */
+    protected final RowType outputType;
+
+    /** Allocator which is used for byte buffer allocation. */
+    private transient BufferAllocator allocator;
+
+    /** Reader which is responsible for deserialize the Arrow format data to 
the Flink rows. */
+    private transient ArrowReader arrowReader;
+
+    /**
+     * Reader which is responsible for convert the execution result from byte 
array to arrow format.
+     */
+    private transient ArrowStreamReader arrowStreamReader;
+
+    /**
+     * Container that holds a set of vectors for the input elements to be sent 
to the Python worker.
+     */
+    transient VectorSchemaRoot rootWriter;
+
+    /** Writer which is responsible for serialize the input elements to arrow 
format. */
+    private transient ArrowWriter<RowData> arrowWriter;
+
+    /** Writer which is responsible for convert the arrow format data into 
byte array. */
+    private transient ArrowStreamWriter arrowStreamWriter;
+
+    /** Reusable InputStream used to holding the execution results to be 
deserialized. */
+    private transient InputStream bais;
+
+    /** Reusable OutputStream used to holding the serialized input elements. */
+    private transient OutputStream baos;
+
+    public ArrowSerializer(RowType inputType, RowType outputType) {
+        this.inputType = inputType;
+        this.outputType = outputType;
+    }
+
+    public void open(InputStream bais, OutputStream baos) throws Exception {
+        this.bais = bais;
+        this.baos = baos;
+        allocator = new RootAllocator(Long.MAX_VALUE);
+        arrowStreamReader = new ArrowStreamReader(bais, allocator);
+
+        rootWriter = 
VectorSchemaRoot.create(ArrowUtils.toArrowSchema(inputType), allocator);
+        arrowWriter = createArrowWriter();
+        arrowStreamWriter = new ArrowStreamWriter(rootWriter, null, baos);
+        arrowStreamWriter.start();
+    }
+
+    public int load() throws IOException {
+        arrowStreamReader.loadNextBatch();
+        VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
+        if (arrowReader == null) {
+            arrowReader = createArrowReader(root);
+        }
+        return root.getRowCount();
+    }
+
+    public RowData read(int i) {
+        return arrowReader.read(i);
+    }
+
+    public void write(RowData element) {
+        arrowWriter.write(element);
+    }
+
+    public void close() throws Exception {
+        arrowStreamWriter.end();
+        arrowStreamReader.close();
+        rootWriter.close();
+        allocator.close();
+    }
+
+    /** Creates an {@link ArrowWriter}. */
+    public ArrowWriter<RowData> createArrowWriter() {
+        return ArrowUtils.createRowDataArrowWriter(rootWriter, inputType);
+    }
+
+    public ArrowReader createArrowReader(VectorSchemaRoot root) {
+        return ArrowUtils.createArrowReader(root, outputType);
+    }
+
+    /**
+     * Forces to finish the processing of the current batch of elements. It 
will serialize the batch
+     * of elements into one arrow batch.
+     */
+    public void finishCurrentBatch() throws Exception {
+        arrowWriter.finish();
+        arrowStreamWriter.writeBatch();
+        arrowStreamWriter.end();
+        arrowWriter.reset();
+    }
+
+    public void resetReader() throws IOException {
+        arrowReader = null;
+        arrowStreamReader.close();
+        arrowStreamReader = new ArrowStreamReader(bais, allocator);
+    }
+
+    public void resetWriter() throws IOException {
+        arrowStreamWriter = new ArrowStreamWriter(rootWriter, null, baos);
+        arrowStreamWriter.start();
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
index d1028fe..84e2971 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java
@@ -19,8 +19,13 @@ package org.apache.doris.flink.sink.writer;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.RowKind;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -30,6 +35,8 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
@@ -132,4 +139,34 @@ public class TestRowDataSerializer {
         Assert.assertEquals("1", serializer.parseDeleteSign(RowKind.DELETE));
         Assert.assertEquals("1", 
serializer.parseDeleteSign(RowKind.UPDATE_BEFORE));
     }
+
+    @Test
+    public void testArrowType() throws Exception {
+        RowDataSerializer serializer =
+                RowDataSerializer.builder()
+                        .setFieldNames(fieldNames)
+                        .setFieldType(dataTypes)
+                        .setType("arrow")
+                        .enableDelete(false)
+                        .build();
+
+        // write data to binary
+        serializer.initial();
+        serializer.serialize(rowData);
+        byte[] serializedValue = serializer.flush().getRow();
+
+        // read data from binary
+        LogicalType[] logicalTypes = 
TypeConversions.fromDataToLogicalType(dataTypes);
+        RowType rowType = RowType.of(logicalTypes, fieldNames);
+        ArrowSerializer arrowSerializer = new ArrowSerializer(rowType, 
rowType);
+        ByteArrayInputStream input = new ByteArrayInputStream(serializedValue);
+        arrowSerializer.open(input, new ByteArrayOutputStream(0));
+        int cnt = arrowSerializer.load();
+        RowData data = arrowSerializer.read(0);
+
+        Assert.assertEquals(1, cnt);
+        Assert.assertEquals(3, data.getInt(0));
+        Assert.assertEquals("test", data.getString(1).toString());
+        Assert.assertEquals(60.2, data.getDouble(2), 0.001);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to