This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 81952c0 [optimization] add disable 2pc config (#34) 81952c0 is described below commit 81952c0af2024d3484f15106f4588fd18da725ae Author: madong <44109015+mado...@users.noreply.github.com> AuthorDate: Thu Jun 30 22:56:33 2022 +0800 [optimization] add disable 2pc config (#34) --- .../doris/flink/cfg/DorisExecutionOptions.java | 18 ++++++++++-- .../doris/flink/sink/writer/DorisStreamLoad.java | 19 +++++++----- .../doris/flink/sink/writer/DorisWriter.java | 21 ++++++++----- .../doris/flink/sink/writer/LabelGenerator.java | 34 ++++++++++++++++++++++ .../flink/table/DorisDynamicTableFactory.java | 10 +++++++ .../flink/sink/writer/TestDorisStreamLoad.java | 14 ++++----- .../doris/flink/sink/writer/TestDorisWriter.java | 6 ++-- 7 files changed, 95 insertions(+), 27 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index cc4b203..2daf5e1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -45,13 +45,16 @@ public class DorisExecutionOptions implements Serializable { private final Boolean enableDelete; + private final Boolean enable2PC; public DorisExecutionOptions(int checkInterval, int maxRetries, int bufferSize, int bufferCount, String labelPrefix, - Properties streamLoadProp, Boolean enableDelete) { + Properties streamLoadProp, + Boolean enableDelete, + Boolean enable2PC) { Preconditions.checkArgument(maxRetries >= 0); this.checkInterval = checkInterval; this.maxRetries = maxRetries; @@ -60,6 +63,7 @@ public class DorisExecutionOptions implements Serializable { this.labelPrefix = labelPrefix; this.streamLoadProp = streamLoadProp; this.enableDelete = enableDelete; + this.enable2PC = enable2PC; } public static Builder builder() { @@ -105,6 +109,9 @@ public class DorisExecutionOptions implements Serializable { return enableDelete; } + public Boolean enabled2PC() { + return enable2PC; + } /** * Builder of {@link DorisExecutionOptions}. */ @@ -117,6 +124,8 @@ public class DorisExecutionOptions implements Serializable { private Properties streamLoadProp = new Properties(); private boolean enableDelete = false; + private boolean enable2PC = true; + public Builder setCheckInterval(Integer checkInterval) { this.checkInterval = checkInterval; return this; @@ -152,8 +161,13 @@ public class DorisExecutionOptions implements Serializable { return this; } + public Builder disable2PC() { + this.enable2PC = false; + return this; + } + public DorisExecutionOptions build() { - return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, streamLoadProp, enableDelete); + return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, streamLoadProp, enableDelete, enable2PC); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index b468cc8..dfb9cb7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -61,7 +61,7 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KE public class DorisStreamLoad implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private final String labelSuffix; + private final LabelGenerator labelGenerator; private final byte[] lineDelimiter; private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc"; @@ -74,6 +74,7 @@ public class DorisStreamLoad implements Serializable { private final String passwd; private final String db; private final String table; + private final boolean enable2PC; private final Properties streamLoadProp; private final RecordStream recordStream; private Future<CloseableHttpResponse> pendingLoadFuture; @@ -84,7 +85,7 @@ public class DorisStreamLoad implements Serializable { public DorisStreamLoad(String hostPort, DorisOptions dorisOptions, DorisExecutionOptions executionOptions, - String labelSuffix, + LabelGenerator labelGenerator, CloseableHttpClient httpClient) { this.hostPort = hostPort; String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); @@ -92,9 +93,10 @@ public class DorisStreamLoad implements Serializable { this.table = tableInfo[1]; this.user = dorisOptions.getUsername(); this.passwd = dorisOptions.getPassword(); - this.labelSuffix = labelSuffix; + this.labelGenerator = labelGenerator; this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table); this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db); + this.enable2PC = executionOptions.enabled2PC(); this.streamLoadProp = executionOptions.getStreamLoadProp(); this.httpClient = httpClient; this.executorService = new ThreadPoolExecutor(1, 1, @@ -133,7 +135,7 @@ public class DorisStreamLoad implements Serializable { LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID); while (true) { try { - String label = labelSuffix + "_" + startChkID; + String label = labelGenerator.generateLabel(startChkID); HttpPutBuilder builder = new HttpPutBuilder(); builder.setUrl(loadUrlStr) .baseAuth(user, passwd) @@ -218,12 +220,11 @@ public class DorisStreamLoad implements Serializable { /** * start write data for new checkpoint. - * @param chkID + * @param label * @throws IOException */ - public void startLoad(long chkID) throws IOException{ + public void startLoad(String label) throws IOException{ loadBatchFirstRecord = true; - String label = labelSuffix + "_" + chkID; HttpPutBuilder putBuilder = new HttpPutBuilder(); recordStream.startInput(); LOG.info("stream load started for {}", label); @@ -232,10 +233,12 @@ public class DorisStreamLoad implements Serializable { putBuilder.setUrl(loadUrlStr) .baseAuth(user, passwd) .addCommonHeader() - .enable2PC() .setLabel(label) .setEntity(entity) .addProperties(streamLoadProp); + if (enable2PC) { + putBuilder.enable2PC(); + } pendingLoadFuture = executorService.submit(() -> { LOG.info("start execute load"); return httpClient.execute(putBuilder.build()); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 026f54e..86ed9ed 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -66,6 +66,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr private final DorisReadOptions dorisReadOptions; private final DorisExecutionOptions executionOptions; private final String labelPrefix; + private final LabelGenerator labelGenerator; private final int intervalTime; private final DorisWriterState dorisWriterState; private final DorisRecordSerializer<IN> serializer; @@ -87,6 +88,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr LOG.info("labelPrefix " + executionOptions.getLabelPrefix()); this.dorisWriterState = new DorisWriterState(executionOptions.getLabelPrefix()); this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId(); + this.labelGenerator = new LabelGenerator(labelPrefix, executionOptions.enabled2PC()); this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-check")); this.serializer = serializer; this.dorisOptions = dorisOptions; @@ -98,20 +100,22 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr public void initializeLoad(List<DorisWriterState> state) throws IOException { try { - this.dorisStreamLoad = new DorisStreamLoad( + this.dorisStreamLoad = new DorisStreamLoad( RestService.getBackend(dorisOptions, dorisReadOptions, LOG), dorisOptions, executionOptions, - labelPrefix, new HttpUtil().getHttpClient()); + labelGenerator, new HttpUtil().getHttpClient()); // TODO: we need check and abort all pending transaction. // Discard transactions that may cause the job to fail. - dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1); + if(executionOptions.enabled2PC()) { + dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1); + } } catch (Exception e) { throw new DorisRuntimeException(e); } // get main work thread. executorThread = Thread.currentThread(); - dorisStreamLoad.startLoad(lastCheckpointId + 1); + dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1)); // when uploading data in streaming mode, we need to regularly detect whether there are exceptions. scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS); } @@ -124,6 +128,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr @Override public List<DorisCommittable> prepareCommit(boolean flush) throws IOException { + // disable exception checker before stop load. loading = false; Preconditions.checkState(dorisStreamLoad != null); RespContent respContent = dorisStreamLoad.stopLoad(); @@ -131,7 +136,9 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL()); throw new DorisRuntimeException(errMsg); } - + if (!executionOptions.enabled2PC()) { + return Collections.emptyList(); + } long txnId = respContent.getTxnId(); return ImmutableList.of(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId)); @@ -140,7 +147,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr @Override public List<DorisWriterState> snapshotState(long checkpointId) throws IOException { Preconditions.checkState(dorisStreamLoad != null); - this.dorisStreamLoad.startLoad(checkpointId + 1); + this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1)); this.loading = true; return Collections.singletonList(dorisWriterState); } @@ -173,7 +180,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr private void checkLoadException() { if (loadException != null) { - throw new RuntimeException("error while load exception.", loadException); + throw new RuntimeException("error while loading data.", loadException); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java new file mode 100644 index 0000000..436d709 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink.sink.writer; + +/** + * Generator label for stream load. + */ +public class LabelGenerator { + private String labelPrefix; + private boolean enable2PC; + + public LabelGenerator(String labelPrefix, boolean enable2PC) { + this.labelPrefix = labelPrefix; + this.enable2PC = enable2PC; + } + + public String generateLabel(long chkId) { + return enable2PC ? labelPrefix + "_" + chkId : labelPrefix + "_" + System.currentTimeMillis(); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 7d6455e..be00cff 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -118,6 +118,12 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT) .withDescription(""); // flink write config options + private static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions + .key("sink.enable-2pc") + .booleanType() + .defaultValue(true) + .withDescription("enable 2PC while loading"); + private static final ConfigOption<Integer> SINK_CHECK_INTERVAL = ConfigOptions .key("sink.check-interval") .intType() @@ -195,6 +201,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory options.add(DORIS_EXEC_MEM_LIMIT); options.add(SINK_CHECK_INTERVAL); + options.add(SINK_ENABLE_2PC); options.add(SINK_MAX_RETRIES); options.add(SINK_BUFFER_FLUSH_INTERVAL); options.add(SINK_ENABLE_DELETE); @@ -262,6 +269,9 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory builder.setLabelPrefix(readableConfig.get(SINK_LABEL_PREFIX)); builder.setStreamLoadProp(streamLoadProp); builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE)); + if (!readableConfig.get(SINK_ENABLE_2PC)) { + builder.disable2PC(); + } return builder.build(); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java index b401a5b..b4532ee 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java @@ -57,7 +57,7 @@ public class TestDorisStreamLoad { CloseableHttpResponse abortSuccessResponse = HttpTestUtil.getResponse(HttpTestUtil.ABORT_SUCCESS_RESPONSE, true); CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true); when(httpClient.execute(any())).thenReturn(existLabelResponse, abortSuccessResponse, preCommitResponse); - DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, "", httpClient); + DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001_0", true), httpClient); dorisStreamLoad.abortPreCommit("test001_0", 1); } @@ -67,8 +67,8 @@ public class TestDorisStreamLoad { CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true); when(httpClient.execute(any())).thenReturn(preCommitResponse); byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8); - DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, "", httpClient); - dorisStreamLoad.startLoad(1); + DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient); + dorisStreamLoad.startLoad("1"); dorisStreamLoad.writeRecord(writeBuffer); dorisStreamLoad.stopLoad(); byte[] buff = new byte[4]; @@ -86,8 +86,8 @@ public class TestDorisStreamLoad { CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true); when(httpClient.execute(any())).thenReturn(preCommitResponse); byte[] writeBuffer = "test".getBytes(StandardCharsets.UTF_8); - DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, "", httpClient); - dorisStreamLoad.startLoad(1); + DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient); + dorisStreamLoad.startLoad("1"); dorisStreamLoad.writeRecord(writeBuffer); dorisStreamLoad.writeRecord(writeBuffer); dorisStreamLoad.stopLoad(); @@ -111,8 +111,8 @@ public class TestDorisStreamLoad { CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true); when(httpClient.execute(any())).thenReturn(preCommitResponse); - DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, "", httpClient); - dorisStreamLoad.startLoad(1); + DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient); + dorisStreamLoad.startLoad("1"); dorisStreamLoad.writeRecord("{\"id\": 1}".getBytes(StandardCharsets.UTF_8)); dorisStreamLoad.writeRecord("{\"id\": 2}".getBytes(StandardCharsets.UTF_8)); dorisStreamLoad.stopLoad(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java index 094e8a4..3bf32b4 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java @@ -62,8 +62,8 @@ public class TestDorisWriter { CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true); when(httpClient.execute(any())).thenReturn(preCommitResponse); - DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, "", httpClient); - dorisStreamLoad.startLoad(1); + DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient); + dorisStreamLoad.startLoad(""); Sink.InitContext initContext = mock(Sink.InitContext.class); when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1)); DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions); @@ -83,7 +83,7 @@ public class TestDorisWriter { CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true); when(httpClient.execute(any())).thenReturn(preCommitResponse); - DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, "", httpClient); + DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient); Sink.InitContext initContext = mock(Sink.InitContext.class); when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1)); DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org