This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 17b13a0  [Feature](CacheWriter) doris sink support cache record buffer 
(#193)
17b13a0 is described below

commit 17b13a09e7efbb16a9415d68cdb0936b42a69d4d
Author: GoGoWen <82132356+gogo...@users.noreply.github.com>
AuthorDate: Tue Sep 26 15:32:10 2023 +0800

    [Feature](CacheWriter) doris sink support cache record buffer (#193)
---
 .../doris/flink/catalog/DorisCatalogFactory.java   |   2 +
 .../doris/flink/cfg/DorisExecutionOptions.java     |  16 ++-
 .../doris/flink/sink/writer/CacheRecordBuffer.java | 118 +++++++++++++++++++++
 .../doris/flink/sink/writer/DorisStreamLoad.java   |  11 +-
 .../doris/flink/sink/writer/DorisWriter.java       |  56 ++++++----
 .../doris/flink/sink/writer/RecordBuffer.java      |   4 +-
 .../doris/flink/sink/writer/RecordStream.java      |  14 ++-
 .../doris/flink/table/DorisConfigOptions.java      |   6 ++
 .../flink/table/DorisDynamicTableFactory.java      |   5 +
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |   2 +
 .../flink/sink/writer/TestCacheRecordBuffer.java   |  77 ++++++++++++++
 .../flink/sink/writer/TestDorisStreamLoad.java     |   6 +-
 .../doris/flink/sink/writer/TestDorisWriter.java   |   2 +-
 13 files changed, 285 insertions(+), 34 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
index 958ce3d..c00e5f1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java
@@ -50,6 +50,7 @@ import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_USE_CACHE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
@@ -105,6 +106,7 @@ public class DorisCatalogFactory implements CatalogFactory {
         options.add(SINK_BUFFER_SIZE);
         options.add(SINK_BUFFER_COUNT);
         options.add(SINK_PARALLELISM);
+        options.add(SINK_USE_CACHE);
 
         options.add(SOURCE_USE_OLD_API);
         return options;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index ccd23f9..2422df8 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -42,6 +42,8 @@ public class DorisExecutionOptions implements Serializable {
     private final int bufferSize;
     private final int bufferCount;
     private final String labelPrefix;
+    private final boolean useCache;
+
     /**
      * Properties for the StreamLoad.
      */
@@ -62,6 +64,7 @@ public class DorisExecutionOptions implements Serializable {
                                  int bufferSize,
                                  int bufferCount,
                                  String labelPrefix,
+                                 boolean useCache,
                                  Properties streamLoadProp,
                                  Boolean enableDelete,
                                  Boolean enable2PC,
@@ -77,6 +80,7 @@ public class DorisExecutionOptions implements Serializable {
         this.bufferSize = bufferSize;
         this.bufferCount = bufferCount;
         this.labelPrefix = labelPrefix;
+        this.useCache = useCache;
         this.streamLoadProp = streamLoadProp;
         this.enableDelete = enableDelete;
         this.enable2PC = enable2PC;
@@ -132,6 +136,10 @@ public class DorisExecutionOptions implements Serializable 
{
         return labelPrefix;
     }
 
+    public boolean isUseCache () {
+        return useCache;
+    }
+
     public Properties getStreamLoadProp() {
         return streamLoadProp;
     }
@@ -177,6 +185,7 @@ public class DorisExecutionOptions implements Serializable {
         private int bufferSize = DEFAULT_BUFFER_SIZE;
         private int bufferCount = DEFAULT_BUFFER_COUNT;
         private String labelPrefix = "";
+        private boolean useCache = false;
         private Properties streamLoadProp = new Properties();
         private boolean enableDelete = true;
         private boolean enable2PC = true;
@@ -215,6 +224,11 @@ public class DorisExecutionOptions implements Serializable 
{
             return this;
         }
 
+        public Builder setUseCache(boolean useCache) {
+            this.useCache = useCache;
+            return this;
+        }
+
         public Builder setStreamLoadProp(Properties streamLoadProp) {
             this.streamLoadProp = streamLoadProp;
             return this;
@@ -262,7 +276,7 @@ public class DorisExecutionOptions implements Serializable {
         }
 
         public DorisExecutionOptions build() {
-            return new DorisExecutionOptions(checkInterval, maxRetries, 
bufferSize, bufferCount, labelPrefix,
+            return new DorisExecutionOptions(checkInterval, maxRetries, 
bufferSize, bufferCount, labelPrefix, useCache,
                     streamLoadProp, enableDelete, enable2PC, enableBatchMode, 
flushQueueSize, bufferFlushMaxRows,
                     bufferFlushMaxBytes, bufferFlushIntervalMs, 
ignoreUpdateBefore);
         }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/CacheRecordBuffer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/CacheRecordBuffer.java
new file mode 100644
index 0000000..c25928e
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/CacheRecordBuffer.java
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Channel of record stream and HTTP data stream.
+ */
+public class CacheRecordBuffer extends RecordBuffer{
+    private static final Logger LOG = 
LoggerFactory.getLogger(CacheRecordBuffer.class);
+    BlockingDeque<ByteBuffer> bufferCache;
+    LinkedBlockingQueue<ByteBuffer> bufferPool;
+
+    public CacheRecordBuffer(int capacity, int queueSize) {
+        super(capacity, queueSize);
+        bufferCache = new LinkedBlockingDeque<>();
+        bufferPool = new LinkedBlockingQueue<>();
+    }
+
+    @Override
+    public void startBufferData() throws IOException{
+        LOG.info("start buffer data, read queue size {}, write queue size {}, 
buffer cache size {}, buffer pool size {}",
+                readQueue.size(), writeQueue.size(), bufferCache.size(), 
bufferPool.size());
+        try {
+            // if the cache have data, that should be restarted from previous 
error
+            if (currentReadBuffer != null && currentReadBuffer.limit() != 0) {
+                currentReadBuffer.rewind();
+                readQueue.putFirst(currentReadBuffer);
+                currentReadBuffer = null;
+            }
+            // re-read the data in bufferCache
+            ByteBuffer buffer = bufferCache.pollFirst();
+            while (buffer != null) {
+                buffer.rewind();
+                readQueue.putFirst(buffer);
+                buffer = bufferCache.pollFirst();
+            }
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+    
+    @Override
+    public int read(byte[] buf) throws InterruptedException {
+        if (currentReadBuffer == null) {
+            currentReadBuffer = readQueue.take();
+        }
+        // add empty buffer as end flag
+        if (currentReadBuffer.limit() == 0) {
+            Preconditions.checkState(readQueue.size() == 0);
+            bufferCache.putFirst(currentReadBuffer);
+            writeQueue.offer(allocate());
+            currentReadBuffer = null;
+            return -1;
+        }
+
+        int available = currentReadBuffer.remaining();
+        int nRead = Math.min(available, buf.length);
+        currentReadBuffer.get(buf, 0, nRead);
+        if (currentReadBuffer.remaining() == 0) {
+            bufferCache.putFirst(currentReadBuffer);
+            writeQueue.offer(allocate());
+            currentReadBuffer = null;
+        }
+        return nRead;
+    }
+
+    public void recycleCache() {
+        // recycle cache buffer
+        Preconditions.checkState(readQueue.size() == 0);
+        ByteBuffer buff = bufferCache.poll();
+        while (buff != null) {
+            buff.clear();
+            bufferPool.add(buff);
+            buff = bufferCache.poll();
+        }
+    }
+
+    private ByteBuffer allocate(){
+        ByteBuffer buff = bufferPool.poll();
+        return buff != null ? buff : ByteBuffer.allocate(bufferCapacity);
+    }
+
+    @VisibleForTesting
+    public int getBufferCacheSize() {
+        return bufferCache.size();
+    }
+
+    @VisibleForTesting
+    public int getBufferPoolSize() {
+        return bufferPool.size();
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index e7ade2f..cda3c05 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -70,7 +70,7 @@ public class DorisStreamLoad implements Serializable {
 
     private String loadUrlStr;
     private String hostPort;
-    private final String abortUrlStr;
+    private String abortUrlStr;
     private final String user;
     private final String passwd;
     private final String db;
@@ -105,7 +105,7 @@ public class DorisStreamLoad implements Serializable {
         this.executorService = new ThreadPoolExecutor(1, 1,
                 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<>(), new 
ExecutorThreadFactory("stream-load-upload"));
-        this.recordStream = new RecordStream(executionOptions.getBufferSize(), 
executionOptions.getBufferCount());
+        this.recordStream = new RecordStream(executionOptions.getBufferSize(), 
executionOptions.getBufferCount(), executionOptions.isUseCache());
         lineDelimiter = 
EscapeHandler.escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT)).getBytes();
         loadBatchFirstRecord = true;
     }
@@ -121,6 +121,7 @@ public class DorisStreamLoad implements Serializable {
     public void setHostPort(String hostPort) {
         this.hostPort = hostPort;
         this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, this.db, 
this.table);
+        this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
     }
 
     public Future<CloseableHttpResponse> getPendingLoadFuture() {
@@ -226,10 +227,10 @@ public class DorisStreamLoad implements Serializable {
      * @param label
      * @throws IOException
      */
-    public void startLoad(String label) throws IOException{
-        loadBatchFirstRecord = true;
+    public void startLoad(String label, boolean isResume) throws IOException {
+        loadBatchFirstRecord = !isResume;
         HttpPutBuilder putBuilder = new HttpPutBuilder();
-        recordStream.startInput();
+        recordStream.startInput(isResume);
         LOG.info("stream load started for {} on host {}", label, hostPort);
         try {
             InputStreamEntity entity = new InputStreamEntity(recordStream);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 1f98206..230bad5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -60,6 +60,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisWriter.class);
     private static final List<String> DORIS_SUCCESS_STATUS = new 
ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
     private final long lastCheckpointId;
+    private long curCheckpointId;
     private DorisStreamLoad dorisStreamLoad;
     volatile boolean loading;
     private final DorisOptions dorisOptions;
@@ -86,6 +87,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
                 initContext
                         .getRestoredCheckpointId()
                         .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
+        this.curCheckpointId = lastCheckpointId + 1;
         LOG.info("restore checkpointId {}", lastCheckpointId);
         LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
         this.dorisWriterState = new 
DorisWriterState(executionOptions.getLabelPrefix());
@@ -113,14 +115,14 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
             // TODO: we need check and abort all pending transaction.
             //  Discard transactions that may cause the job to fail.
             if(executionOptions.enabled2PC()) {
-                dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 
1);
+                dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId);
             }
         } catch (Exception e) {
             throw new DorisRuntimeException(e);
         }
         // get main work thread.
         executorThread = Thread.currentThread();
-        this.currentLabel = labelGenerator.generateLabel(lastCheckpointId + 1);
+        this.currentLabel = labelGenerator.generateLabel(curCheckpointId);
         // when uploading data in streaming mode, we need to regularly detect 
whether there are exceptions.
         scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, 
intervalTime, TimeUnit.MILLISECONDS);
     }
@@ -134,8 +136,8 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
             return;
         }
         if(!loading) {
-            //Start streamload only when there has data
-            dorisStreamLoad.startLoad(currentLabel);
+            // start stream load only when there has data
+            dorisStreamLoad.startLoad(currentLabel, false);
             loading = true;
         }
         dorisStreamLoad.writeRecord(serialize);
@@ -167,7 +169,8 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
         Preconditions.checkState(dorisStreamLoad != null);
         // dynamic refresh BE node
         this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
-        this.currentLabel = labelGenerator.generateLabel(checkpointId + 1);
+        this.curCheckpointId = checkpointId + 1;
+        this.currentLabel = labelGenerator.generateLabel(curCheckpointId);
         return Collections.singletonList(dorisWriterState);
     }
 
@@ -182,23 +185,38 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
                 return;
             }
 
-            // double check to interrupt when loading is true and 
dorisStreamLoad.getPendingLoadFuture().isDone
-            // fix issue #139
+            // double-check the future, to avoid getting the old future
             if (dorisStreamLoad.getPendingLoadFuture() != null
                     && dorisStreamLoad.getPendingLoadFuture().isDone()) {
-                // TODO: introduce cache for reload instead of throwing 
exceptions.
-                String errorMsg;
-                try {
-                    RespContent content = 
dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get());
-                    errorMsg = content.getMessage();
-                } catch (Exception e) {
-                    errorMsg = e.getMessage();
+                // error happened when loading, now we should stop receive data
+                // and abort previous txn(stream load) and start a new 
txn(stream load)
+                // use send cached data to new txn, then notify to restart the 
stream
+                if (executionOptions.isUseCache()) {
+                    try {
+                        
this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
+                        if (executionOptions.enabled2PC()) {
+                            dorisStreamLoad.abortPreCommit(labelPrefix, 
curCheckpointId);
+                        }
+                        // start a new txn(stream load)
+                        LOG.info("getting exception, breakpoint resume for 
checkpoint ID: {}", curCheckpointId);
+                        
dorisStreamLoad.startLoad(labelGenerator.generateLabel(curCheckpointId), true);
+                    } catch (Exception e) {
+                        throw new DorisRuntimeException(e);
+                    }
+                } else {
+                    String errorMsg;
+                    try {
+                        RespContent content = 
dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get());
+                        errorMsg = content.getMessage();
+                    } catch (Exception e) {
+                        errorMsg = e.getMessage();
+                    }
+
+                    loadException = new StreamLoadException(errorMsg);
+                    LOG.error("stream load finished unexpectedly, interrupt 
worker thread! {}", errorMsg);
+                    // set the executor thread interrupted in case blocking in 
write data.
+                    executorThread.interrupt();
                 }
-
-                loadException = new StreamLoadException(errorMsg);
-                LOG.error("stream load finished unexpectedly, interrupt worker 
thread! {}", errorMsg);
-                // set the executor thread interrupted in case blocking in 
write data.
-                executorThread.interrupt();
             }
         }
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
index abe93e9..e5259cb 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java
@@ -33,7 +33,7 @@ import java.util.concurrent.LinkedBlockingDeque;
 public class RecordBuffer {
     private static final Logger LOG = 
LoggerFactory.getLogger(RecordBuffer.class);
     BlockingQueue<ByteBuffer> writeQueue;
-    BlockingQueue<ByteBuffer> readQueue;
+    LinkedBlockingDeque<ByteBuffer> readQueue;
     int bufferCapacity;
     int queueSize;
     ByteBuffer currentWriteBuffer;
@@ -52,7 +52,7 @@ public class RecordBuffer {
         this.queueSize = queueSize;
     }
 
-    public void startBufferData() {
+    public void startBufferData() throws IOException{
         LOG.info("start buffer data, read queue size {}, write queue size {}", 
readQueue.size(), writeQueue.size());
         Preconditions.checkState(readQueue.size() == 0);
         Preconditions.checkState(writeQueue.size() == queueSize);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
index fd1c6ca..baf68bf 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java
@@ -31,11 +31,19 @@ public class RecordStream extends InputStream {
         return 0;
     }
 
-    public RecordStream(int bufferSize, int bufferCount) {
-        this.recordBuffer = new RecordBuffer(bufferSize, bufferCount);
+    public RecordStream(int bufferSize, int bufferCount, boolean useCache) {
+        if (useCache) {
+            this.recordBuffer = new CacheRecordBuffer(bufferSize, bufferCount);
+        }else {
+            this.recordBuffer = new RecordBuffer(bufferSize, bufferCount);
+        }
     }
 
-    public void startInput() {
+    public void startInput(boolean isResume) throws IOException {
+        // if resume from breakpoint, do not recycle cache buffer
+        if (!isResume && recordBuffer instanceof CacheRecordBuffer) {
+            ((CacheRecordBuffer)recordBuffer).recycleCache();
+        }
         recordBuffer.startBufferData();
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 98b9a78..50b205c 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -235,6 +235,12 @@ public class DorisConfigOptions {
             .withDescription("In the CDC scenario, when the primary key of the 
upstream is inconsistent with that of the downstream, the update-before data 
needs to be passed to the downstream as deleted data, otherwise the data cannot 
be deleted.\n" +
                     "The default is to ignore, that is, perform upsert 
semantics.");
 
+    public static final ConfigOption<Boolean> SINK_USE_CACHE = ConfigOptions
+            .key("sink.use-cache")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("Whether to use buffer cache for breakpoint 
resume");
+
     // Prefix for Doris StreamLoad specific properties.
     public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 57e4fc0..9583236 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -74,6 +74,7 @@ import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_UPDATE
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_USE_CACHE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
@@ -146,6 +147,8 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         options.add(SINK_FLUSH_QUEUE_SIZE);
         options.add(SINK_BUFFER_FLUSH_INTERVAL);
 
+        options.add(SINK_USE_CACHE);
+
         options.add(SOURCE_USE_OLD_API);
         return options;
     }
@@ -223,6 +226,8 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         
builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
         
builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
         
builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+
+        builder.setUseCache(readableConfig.get(SINK_USE_CACHE));
         return builder.build();
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index e36590c..a28403e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -201,6 +201,8 @@ public abstract class DatabaseSync {
         
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES).ifPresent(executionBuilder::setBufferFlushMaxBytes);
         
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL).ifPresent(v->
 executionBuilder.setBufferFlushIntervalMs(v.toMillis()));
 
+        
sinkConfig.getOptional(DorisConfigOptions.SINK_USE_CACHE).ifPresent(executionBuilder::setUseCache);
+
         DorisExecutionOptions executionOptions = executionBuilder.build();
         builder.setDorisReadOptions(DorisReadOptions.builder().build())
                 .setDorisExecutionOptions(executionOptions)
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestCacheRecordBuffer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestCacheRecordBuffer.java
new file mode 100644
index 0000000..4af39ab
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestCacheRecordBuffer.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * test for CacheRecordBuffer.
+ */
+public class TestCacheRecordBuffer {
+
+    @Test
+    public void testRead() throws Exception {
+        CacheRecordBuffer recordBuffer = new CacheRecordBuffer(16, 4);
+        recordBuffer.startBufferData();
+        recordBuffer.write("This is Test for 
CacheRecordBuffer!".getBytes(StandardCharsets.UTF_8));
+        Assert.assertEquals(2, recordBuffer.getReadQueueSize());
+        Assert.assertEquals(1, recordBuffer.getWriteQueueSize());
+        Assert.assertEquals(0,recordBuffer.getBufferCacheSize());
+        byte[] buffer = new byte[16];
+        int nRead = recordBuffer.read(buffer);
+        Assert.assertEquals(1, recordBuffer.getReadQueueSize());
+        Assert.assertEquals(2, recordBuffer.getWriteQueueSize());
+        Assert.assertEquals(1,recordBuffer.getBufferCacheSize());
+        Assert.assertEquals(16, nRead);
+        Assert.assertArrayEquals("This is Test 
for".getBytes(StandardCharsets.UTF_8), buffer);
+
+        recordBuffer.write("Continue to write the last 
one.".getBytes(StandardCharsets.UTF_8));
+        buffer = new byte[12];
+        nRead = recordBuffer.read(buffer);
+        Assert.assertEquals(12, nRead);
+        Assert.assertArrayEquals(" 
CacheRecord".getBytes(StandardCharsets.UTF_8), buffer);
+        Assert.assertEquals(2, recordBuffer.getReadQueueSize());
+        Assert.assertEquals(0, recordBuffer.getWriteQueueSize());
+        Assert.assertEquals(1,recordBuffer.getBufferCacheSize());
+    }
+
+    @Test
+    public void testRecycleCache() throws Exception {
+        CacheRecordBuffer recordBuffer = new CacheRecordBuffer(16, 4);
+        recordBuffer.startBufferData();
+        recordBuffer.write("This is Test for 
CacheRecordBuffer.recycleCache!".getBytes(StandardCharsets.UTF_8));
+        Assert.assertEquals(3, recordBuffer.getReadQueueSize());
+        Assert.assertEquals(1, recordBuffer.getWriteQueueSize());
+        Assert.assertEquals(0,recordBuffer.getBufferCacheSize());
+
+        byte[] buffer = new byte[50];
+        recordBuffer.read(buffer);
+        recordBuffer.read(buffer);
+        recordBuffer.read(buffer);
+        Assert.assertEquals(0, recordBuffer.getReadQueueSize());
+        Assert.assertEquals(4, recordBuffer.getWriteQueueSize());
+        Assert.assertEquals(3,recordBuffer.getBufferCacheSize());
+
+        recordBuffer.recycleCache();
+        Assert.assertEquals(3,recordBuffer.getBufferPoolSize());
+        Assert.assertEquals(0,recordBuffer.getBufferCacheSize());
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index d6f0967..e295de1 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -90,7 +90,7 @@ public class TestDorisStreamLoad {
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
         byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8);
         DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
-        dorisStreamLoad.startLoad("1");
+        dorisStreamLoad.startLoad("1",false);
         dorisStreamLoad.writeRecord(writeBuffer);
         dorisStreamLoad.stopLoad("label");
         byte[] buff = new byte[4];
@@ -109,7 +109,7 @@ public class TestDorisStreamLoad {
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
         byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8);
         DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
-        dorisStreamLoad.startLoad("1");
+        dorisStreamLoad.startLoad("1", false);
         dorisStreamLoad.writeRecord(writeBuffer);
         dorisStreamLoad.writeRecord(writeBuffer);
         dorisStreamLoad.stopLoad("label");
@@ -134,7 +134,7 @@ public class TestDorisStreamLoad {
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
 
         DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", 
dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
-        dorisStreamLoad.startLoad("1");
+        dorisStreamLoad.startLoad("1", false);
         dorisStreamLoad.writeRecord("{\"id\": 
1}".getBytes(StandardCharsets.UTF_8));
         dorisStreamLoad.writeRecord("{\"id\": 
2}".getBytes(StandardCharsets.UTF_8));
         dorisStreamLoad.stopLoad("label");
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index 9e44336..e988d6b 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -62,7 +62,7 @@ public class TestDorisWriter {
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
 
         DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", 
dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
-        dorisStreamLoad.startLoad("");
+        dorisStreamLoad.startLoad("", false);
         Sink.InitContext initContext = mock(Sink.InitContext.class);
         
when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
         DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, 
Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, 
readOptions, executionOptions);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to