This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 17b13a0 [Feature](CacheWriter) doris sink support cache record buffer (#193) 17b13a0 is described below commit 17b13a09e7efbb16a9415d68cdb0936b42a69d4d Author: GoGoWen <82132356+gogo...@users.noreply.github.com> AuthorDate: Tue Sep 26 15:32:10 2023 +0800 [Feature](CacheWriter) doris sink support cache record buffer (#193) --- .../doris/flink/catalog/DorisCatalogFactory.java | 2 + .../doris/flink/cfg/DorisExecutionOptions.java | 16 ++- .../doris/flink/sink/writer/CacheRecordBuffer.java | 118 +++++++++++++++++++++ .../doris/flink/sink/writer/DorisStreamLoad.java | 11 +- .../doris/flink/sink/writer/DorisWriter.java | 56 ++++++---- .../doris/flink/sink/writer/RecordBuffer.java | 4 +- .../doris/flink/sink/writer/RecordStream.java | 14 ++- .../doris/flink/table/DorisConfigOptions.java | 6 ++ .../flink/table/DorisDynamicTableFactory.java | 5 + .../apache/doris/flink/tools/cdc/DatabaseSync.java | 2 + .../flink/sink/writer/TestCacheRecordBuffer.java | 77 ++++++++++++++ .../flink/sink/writer/TestDorisStreamLoad.java | 6 +- .../doris/flink/sink/writer/TestDorisWriter.java | 2 +- 13 files changed, 285 insertions(+), 34 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java index 958ce3d..c00e5f1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java @@ -50,6 +50,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_USE_CACHE; import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API; import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER; @@ -105,6 +106,7 @@ public class DorisCatalogFactory implements CatalogFactory { options.add(SINK_BUFFER_SIZE); options.add(SINK_BUFFER_COUNT); options.add(SINK_PARALLELISM); + options.add(SINK_USE_CACHE); options.add(SOURCE_USE_OLD_API); return options; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index ccd23f9..2422df8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -42,6 +42,8 @@ public class DorisExecutionOptions implements Serializable { private final int bufferSize; private final int bufferCount; private final String labelPrefix; + private final boolean useCache; + /** * Properties for the StreamLoad. */ @@ -62,6 +64,7 @@ public class DorisExecutionOptions implements Serializable { int bufferSize, int bufferCount, String labelPrefix, + boolean useCache, Properties streamLoadProp, Boolean enableDelete, Boolean enable2PC, @@ -77,6 +80,7 @@ public class DorisExecutionOptions implements Serializable { this.bufferSize = bufferSize; this.bufferCount = bufferCount; this.labelPrefix = labelPrefix; + this.useCache = useCache; this.streamLoadProp = streamLoadProp; this.enableDelete = enableDelete; this.enable2PC = enable2PC; @@ -132,6 +136,10 @@ public class DorisExecutionOptions implements Serializable { return labelPrefix; } + public boolean isUseCache () { + return useCache; + } + public Properties getStreamLoadProp() { return streamLoadProp; } @@ -177,6 +185,7 @@ public class DorisExecutionOptions implements Serializable { private int bufferSize = DEFAULT_BUFFER_SIZE; private int bufferCount = DEFAULT_BUFFER_COUNT; private String labelPrefix = ""; + private boolean useCache = false; private Properties streamLoadProp = new Properties(); private boolean enableDelete = true; private boolean enable2PC = true; @@ -215,6 +224,11 @@ public class DorisExecutionOptions implements Serializable { return this; } + public Builder setUseCache(boolean useCache) { + this.useCache = useCache; + return this; + } + public Builder setStreamLoadProp(Properties streamLoadProp) { this.streamLoadProp = streamLoadProp; return this; @@ -262,7 +276,7 @@ public class DorisExecutionOptions implements Serializable { } public DorisExecutionOptions build() { - return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, + return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, useCache, streamLoadProp, enableDelete, enable2PC, enableBatchMode, flushQueueSize, bufferFlushMaxRows, bufferFlushMaxBytes, bufferFlushIntervalMs, ignoreUpdateBefore); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/CacheRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/CacheRecordBuffer.java new file mode 100644 index 0000000..c25928e --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/CacheRecordBuffer.java @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Channel of record stream and HTTP data stream. + */ +public class CacheRecordBuffer extends RecordBuffer{ + private static final Logger LOG = LoggerFactory.getLogger(CacheRecordBuffer.class); + BlockingDeque<ByteBuffer> bufferCache; + LinkedBlockingQueue<ByteBuffer> bufferPool; + + public CacheRecordBuffer(int capacity, int queueSize) { + super(capacity, queueSize); + bufferCache = new LinkedBlockingDeque<>(); + bufferPool = new LinkedBlockingQueue<>(); + } + + @Override + public void startBufferData() throws IOException{ + LOG.info("start buffer data, read queue size {}, write queue size {}, buffer cache size {}, buffer pool size {}", + readQueue.size(), writeQueue.size(), bufferCache.size(), bufferPool.size()); + try { + // if the cache have data, that should be restarted from previous error + if (currentReadBuffer != null && currentReadBuffer.limit() != 0) { + currentReadBuffer.rewind(); + readQueue.putFirst(currentReadBuffer); + currentReadBuffer = null; + } + // re-read the data in bufferCache + ByteBuffer buffer = bufferCache.pollFirst(); + while (buffer != null) { + buffer.rewind(); + readQueue.putFirst(buffer); + buffer = bufferCache.pollFirst(); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @Override + public int read(byte[] buf) throws InterruptedException { + if (currentReadBuffer == null) { + currentReadBuffer = readQueue.take(); + } + // add empty buffer as end flag + if (currentReadBuffer.limit() == 0) { + Preconditions.checkState(readQueue.size() == 0); + bufferCache.putFirst(currentReadBuffer); + writeQueue.offer(allocate()); + currentReadBuffer = null; + return -1; + } + + int available = currentReadBuffer.remaining(); + int nRead = Math.min(available, buf.length); + currentReadBuffer.get(buf, 0, nRead); + if (currentReadBuffer.remaining() == 0) { + bufferCache.putFirst(currentReadBuffer); + writeQueue.offer(allocate()); + currentReadBuffer = null; + } + return nRead; + } + + public void recycleCache() { + // recycle cache buffer + Preconditions.checkState(readQueue.size() == 0); + ByteBuffer buff = bufferCache.poll(); + while (buff != null) { + buff.clear(); + bufferPool.add(buff); + buff = bufferCache.poll(); + } + } + + private ByteBuffer allocate(){ + ByteBuffer buff = bufferPool.poll(); + return buff != null ? buff : ByteBuffer.allocate(bufferCapacity); + } + + @VisibleForTesting + public int getBufferCacheSize() { + return bufferCache.size(); + } + + @VisibleForTesting + public int getBufferPoolSize() { + return bufferPool.size(); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index e7ade2f..cda3c05 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -70,7 +70,7 @@ public class DorisStreamLoad implements Serializable { private String loadUrlStr; private String hostPort; - private final String abortUrlStr; + private String abortUrlStr; private final String user; private final String passwd; private final String db; @@ -105,7 +105,7 @@ public class DorisStreamLoad implements Serializable { this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ExecutorThreadFactory("stream-load-upload")); - this.recordStream = new RecordStream(executionOptions.getBufferSize(), executionOptions.getBufferCount()); + this.recordStream = new RecordStream(executionOptions.getBufferSize(), executionOptions.getBufferCount(), executionOptions.isUseCache()); lineDelimiter = EscapeHandler.escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)).getBytes(); loadBatchFirstRecord = true; } @@ -121,6 +121,7 @@ public class DorisStreamLoad implements Serializable { public void setHostPort(String hostPort) { this.hostPort = hostPort; this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, this.db, this.table); + this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db); } public Future<CloseableHttpResponse> getPendingLoadFuture() { @@ -226,10 +227,10 @@ public class DorisStreamLoad implements Serializable { * @param label * @throws IOException */ - public void startLoad(String label) throws IOException{ - loadBatchFirstRecord = true; + public void startLoad(String label, boolean isResume) throws IOException { + loadBatchFirstRecord = !isResume; HttpPutBuilder putBuilder = new HttpPutBuilder(); - recordStream.startInput(); + recordStream.startInput(isResume); LOG.info("stream load started for {} on host {}", label, hostPort); try { InputStreamEntity entity = new InputStreamEntity(recordStream); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 1f98206..230bad5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -60,6 +60,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.class); private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); private final long lastCheckpointId; + private long curCheckpointId; private DorisStreamLoad dorisStreamLoad; volatile boolean loading; private final DorisOptions dorisOptions; @@ -86,6 +87,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr initContext .getRestoredCheckpointId() .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1); + this.curCheckpointId = lastCheckpointId + 1; LOG.info("restore checkpointId {}", lastCheckpointId); LOG.info("labelPrefix " + executionOptions.getLabelPrefix()); this.dorisWriterState = new DorisWriterState(executionOptions.getLabelPrefix()); @@ -113,14 +115,14 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr // TODO: we need check and abort all pending transaction. // Discard transactions that may cause the job to fail. if(executionOptions.enabled2PC()) { - dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1); + dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId); } } catch (Exception e) { throw new DorisRuntimeException(e); } // get main work thread. executorThread = Thread.currentThread(); - this.currentLabel = labelGenerator.generateLabel(lastCheckpointId + 1); + this.currentLabel = labelGenerator.generateLabel(curCheckpointId); // when uploading data in streaming mode, we need to regularly detect whether there are exceptions. scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS); } @@ -134,8 +136,8 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr return; } if(!loading) { - //Start streamload only when there has data - dorisStreamLoad.startLoad(currentLabel); + // start stream load only when there has data + dorisStreamLoad.startLoad(currentLabel, false); loading = true; } dorisStreamLoad.writeRecord(serialize); @@ -167,7 +169,8 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr Preconditions.checkState(dorisStreamLoad != null); // dynamic refresh BE node this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); - this.currentLabel = labelGenerator.generateLabel(checkpointId + 1); + this.curCheckpointId = checkpointId + 1; + this.currentLabel = labelGenerator.generateLabel(curCheckpointId); return Collections.singletonList(dorisWriterState); } @@ -182,23 +185,38 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr return; } - // double check to interrupt when loading is true and dorisStreamLoad.getPendingLoadFuture().isDone - // fix issue #139 + // double-check the future, to avoid getting the old future if (dorisStreamLoad.getPendingLoadFuture() != null && dorisStreamLoad.getPendingLoadFuture().isDone()) { - // TODO: introduce cache for reload instead of throwing exceptions. - String errorMsg; - try { - RespContent content = dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get()); - errorMsg = content.getMessage(); - } catch (Exception e) { - errorMsg = e.getMessage(); + // error happened when loading, now we should stop receive data + // and abort previous txn(stream load) and start a new txn(stream load) + // use send cached data to new txn, then notify to restart the stream + if (executionOptions.isUseCache()) { + try { + this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); + if (executionOptions.enabled2PC()) { + dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId); + } + // start a new txn(stream load) + LOG.info("getting exception, breakpoint resume for checkpoint ID: {}", curCheckpointId); + dorisStreamLoad.startLoad(labelGenerator.generateLabel(curCheckpointId), true); + } catch (Exception e) { + throw new DorisRuntimeException(e); + } + } else { + String errorMsg; + try { + RespContent content = dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get()); + errorMsg = content.getMessage(); + } catch (Exception e) { + errorMsg = e.getMessage(); + } + + loadException = new StreamLoadException(errorMsg); + LOG.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg); + // set the executor thread interrupted in case blocking in write data. + executorThread.interrupt(); } - - loadException = new StreamLoadException(errorMsg); - LOG.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg); - // set the executor thread interrupted in case blocking in write data. - executorThread.interrupt(); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java index abe93e9..e5259cb 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordBuffer.java @@ -33,7 +33,7 @@ import java.util.concurrent.LinkedBlockingDeque; public class RecordBuffer { private static final Logger LOG = LoggerFactory.getLogger(RecordBuffer.class); BlockingQueue<ByteBuffer> writeQueue; - BlockingQueue<ByteBuffer> readQueue; + LinkedBlockingDeque<ByteBuffer> readQueue; int bufferCapacity; int queueSize; ByteBuffer currentWriteBuffer; @@ -52,7 +52,7 @@ public class RecordBuffer { this.queueSize = queueSize; } - public void startBufferData() { + public void startBufferData() throws IOException{ LOG.info("start buffer data, read queue size {}, write queue size {}", readQueue.size(), writeQueue.size()); Preconditions.checkState(readQueue.size() == 0); Preconditions.checkState(writeQueue.size() == queueSize); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java index fd1c6ca..baf68bf 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RecordStream.java @@ -31,11 +31,19 @@ public class RecordStream extends InputStream { return 0; } - public RecordStream(int bufferSize, int bufferCount) { - this.recordBuffer = new RecordBuffer(bufferSize, bufferCount); + public RecordStream(int bufferSize, int bufferCount, boolean useCache) { + if (useCache) { + this.recordBuffer = new CacheRecordBuffer(bufferSize, bufferCount); + }else { + this.recordBuffer = new RecordBuffer(bufferSize, bufferCount); + } } - public void startInput() { + public void startInput(boolean isResume) throws IOException { + // if resume from breakpoint, do not recycle cache buffer + if (!isResume && recordBuffer instanceof CacheRecordBuffer) { + ((CacheRecordBuffer)recordBuffer).recycleCache(); + } recordBuffer.startBufferData(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index 98b9a78..50b205c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -235,6 +235,12 @@ public class DorisConfigOptions { .withDescription("In the CDC scenario, when the primary key of the upstream is inconsistent with that of the downstream, the update-before data needs to be passed to the downstream as deleted data, otherwise the data cannot be deleted.\n" + "The default is to ignore, that is, perform upsert semantics."); + public static final ConfigOption<Boolean> SINK_USE_CACHE = ConfigOptions + .key("sink.use-cache") + .booleanType() + .defaultValue(false) + .withDescription("Whether to use buffer cache for breakpoint resume"); + // Prefix for Doris StreamLoad specific properties. public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 57e4fc0..9583236 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -74,6 +74,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_UPDATE import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_USE_CACHE; import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API; import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER; @@ -146,6 +147,8 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory options.add(SINK_FLUSH_QUEUE_SIZE); options.add(SINK_BUFFER_FLUSH_INTERVAL); + options.add(SINK_USE_CACHE); + options.add(SOURCE_USE_OLD_API); return options; } @@ -223,6 +226,8 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS)); builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES)); builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); + + builder.setUseCache(readableConfig.get(SINK_USE_CACHE)); return builder.build(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index e36590c..a28403e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -201,6 +201,8 @@ public abstract class DatabaseSync { sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES).ifPresent(executionBuilder::setBufferFlushMaxBytes); sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL).ifPresent(v-> executionBuilder.setBufferFlushIntervalMs(v.toMillis())); + sinkConfig.getOptional(DorisConfigOptions.SINK_USE_CACHE).ifPresent(executionBuilder::setUseCache); + DorisExecutionOptions executionOptions = executionBuilder.build(); builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisExecutionOptions(executionOptions) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestCacheRecordBuffer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestCacheRecordBuffer.java new file mode 100644 index 0000000..4af39ab --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestCacheRecordBuffer.java @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; + +/** + * test for CacheRecordBuffer. + */ +public class TestCacheRecordBuffer { + + @Test + public void testRead() throws Exception { + CacheRecordBuffer recordBuffer = new CacheRecordBuffer(16, 4); + recordBuffer.startBufferData(); + recordBuffer.write("This is Test for CacheRecordBuffer!".getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(2, recordBuffer.getReadQueueSize()); + Assert.assertEquals(1, recordBuffer.getWriteQueueSize()); + Assert.assertEquals(0,recordBuffer.getBufferCacheSize()); + byte[] buffer = new byte[16]; + int nRead = recordBuffer.read(buffer); + Assert.assertEquals(1, recordBuffer.getReadQueueSize()); + Assert.assertEquals(2, recordBuffer.getWriteQueueSize()); + Assert.assertEquals(1,recordBuffer.getBufferCacheSize()); + Assert.assertEquals(16, nRead); + Assert.assertArrayEquals("This is Test for".getBytes(StandardCharsets.UTF_8), buffer); + + recordBuffer.write("Continue to write the last one.".getBytes(StandardCharsets.UTF_8)); + buffer = new byte[12]; + nRead = recordBuffer.read(buffer); + Assert.assertEquals(12, nRead); + Assert.assertArrayEquals(" CacheRecord".getBytes(StandardCharsets.UTF_8), buffer); + Assert.assertEquals(2, recordBuffer.getReadQueueSize()); + Assert.assertEquals(0, recordBuffer.getWriteQueueSize()); + Assert.assertEquals(1,recordBuffer.getBufferCacheSize()); + } + + @Test + public void testRecycleCache() throws Exception { + CacheRecordBuffer recordBuffer = new CacheRecordBuffer(16, 4); + recordBuffer.startBufferData(); + recordBuffer.write("This is Test for CacheRecordBuffer.recycleCache!".getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(3, recordBuffer.getReadQueueSize()); + Assert.assertEquals(1, recordBuffer.getWriteQueueSize()); + Assert.assertEquals(0,recordBuffer.getBufferCacheSize()); + + byte[] buffer = new byte[50]; + recordBuffer.read(buffer); + recordBuffer.read(buffer); + recordBuffer.read(buffer); + Assert.assertEquals(0, recordBuffer.getReadQueueSize()); + Assert.assertEquals(4, recordBuffer.getWriteQueueSize()); + Assert.assertEquals(3,recordBuffer.getBufferCacheSize()); + + recordBuffer.recycleCache(); + Assert.assertEquals(3,recordBuffer.getBufferPoolSize()); + Assert.assertEquals(0,recordBuffer.getBufferCacheSize()); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java index d6f0967..e295de1 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java @@ -90,7 +90,7 @@ public class TestDorisStreamLoad { when(httpClient.execute(any())).thenReturn(preCommitResponse); byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8); DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient); - dorisStreamLoad.startLoad("1"); + dorisStreamLoad.startLoad("1",false); dorisStreamLoad.writeRecord(writeBuffer); dorisStreamLoad.stopLoad("label"); byte[] buff = new byte[4]; @@ -109,7 +109,7 @@ public class TestDorisStreamLoad { when(httpClient.execute(any())).thenReturn(preCommitResponse); byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8); DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient); - dorisStreamLoad.startLoad("1"); + dorisStreamLoad.startLoad("1", false); dorisStreamLoad.writeRecord(writeBuffer); dorisStreamLoad.writeRecord(writeBuffer); dorisStreamLoad.stopLoad("label"); @@ -134,7 +134,7 @@ public class TestDorisStreamLoad { when(httpClient.execute(any())).thenReturn(preCommitResponse); DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient); - dorisStreamLoad.startLoad("1"); + dorisStreamLoad.startLoad("1", false); dorisStreamLoad.writeRecord("{\"id\": 1}".getBytes(StandardCharsets.UTF_8)); dorisStreamLoad.writeRecord("{\"id\": 2}".getBytes(StandardCharsets.UTF_8)); dorisStreamLoad.stopLoad("label"); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java index 9e44336..e988d6b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java @@ -62,7 +62,7 @@ public class TestDorisWriter { when(httpClient.execute(any())).thenReturn(preCommitResponse); DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient); - dorisStreamLoad.startLoad(""); + dorisStreamLoad.startLoad("", false); Sink.InitContext initContext = mock(Sink.InitContext.class); when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1)); DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org