This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5c48e6fe6 [INLONG-6345][Sort] Add metrics for Doris connector (#6346) 5c48e6fe6 is described below commit 5c48e6fe67fd635237e4ac1133cbf167a4427ec0 Author: kuansix <490305...@qq.com> AuthorDate: Tue Nov 1 19:56:15 2022 +0800 [INLONG-6345][Sort] Add metrics for Doris connector (#6346) --- .../sort/protocol/node/load/DorisLoadNode.java | 3 +- .../doris/internal/GenericDorisSinkFunction.java | 78 ++++++++++++++++++ .../inlong/sort/doris/model/RespContent.java | 96 ++++++++++++++++++++++ .../table/DorisDynamicSchemaOutputFormat.java | 87 ++++++++++++++++++-- .../sort/doris/table/DorisDynamicTableFactory.java | 10 ++- .../sort/doris/table/DorisDynamicTableSink.java | 23 +++++- .../inlong/sort/doris/table/DorisStreamLoad.java | 5 +- licenses/inlong-sort-connectors/LICENSE | 1 + 8 files changed, 290 insertions(+), 13 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java index 583970430..a281f24fe 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java @@ -27,6 +27,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInc import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.InlongMetric; import org.apache.inlong.sort.protocol.constant.DorisConstant; import org.apache.inlong.sort.protocol.enums.FilterStrategy; import org.apache.inlong.sort.protocol.node.LoadNode; @@ -53,7 +54,7 @@ import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIP @JsonInclude(Include.NON_NULL) @Data @NoArgsConstructor -public class DorisLoadNode extends LoadNode implements Serializable { +public class DorisLoadNode extends LoadNode implements InlongMetric, Serializable { private static final long serialVersionUID = -8002903269814211382L; diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/internal/GenericDorisSinkFunction.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/internal/GenericDorisSinkFunction.java new file mode 100644 index 000000000..74671fb37 --- /dev/null +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/internal/GenericDorisSinkFunction.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.doris.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.Preconditions; +import org.apache.inlong.sort.doris.table.DorisDynamicSchemaOutputFormat; + +import javax.annotation.Nonnull; +import java.io.IOException; + +/** + * A generic SinkFunction for Doris. + * + * Add an option `inlong.metric` to support metrics. + */ +@Internal +public class GenericDorisSinkFunction<T> extends RichSinkFunction<T> + implements CheckpointedFunction { + + private final DorisDynamicSchemaOutputFormat<T> outputFormat; + + public GenericDorisSinkFunction(@Nonnull DorisDynamicSchemaOutputFormat<T> outputFormat) { + this.outputFormat = Preconditions.checkNotNull(outputFormat); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + RuntimeContext ctx = getRuntimeContext(); + outputFormat.setRuntimeContext(ctx); + outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); + } + + @Override + public void invoke(T value, Context context) throws IOException { + outputFormat.writeRecord(value); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + outputFormat.setRuntimeContext(getRuntimeContext()); + outputFormat.initializeState(context); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + outputFormat.flush(); + outputFormat.snapshotState(context); + } + + @Override + public void close() throws IOException { + outputFormat.close(); + } +} diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/model/RespContent.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/model/RespContent.java new file mode 100644 index 000000000..5a43f05aa --- /dev/null +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/model/RespContent.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.doris.model; + +import lombok.Data; +import org.apache.doris.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.doris.shaded.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.doris.shaded.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper; + +/** + * RespContent copy from {@link org.apache.doris.flink.rest.models.RespContent} + **/ +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class RespContent { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @JsonProperty(value = "TxnId") + private int txnId; + + @JsonProperty(value = "Label") + private String label; + + @JsonProperty(value = "Status") + private String status; + + @JsonProperty(value = "ExistingJobStatus") + private String existingJobStatus; + + @JsonProperty(value = "Message") + private String message; + + @JsonProperty(value = "NumberTotalRows") + private long numberTotalRows; + + @JsonProperty(value = "NumberLoadedRows") + private long numberLoadedRows; + + @JsonProperty(value = "NumberFilteredRows") + private int numberFilteredRows; + + @JsonProperty(value = "NumberUnselectedRows") + private int numberUnselectedRows; + + @JsonProperty(value = "LoadBytes") + private long loadBytes; + + @JsonProperty(value = "LoadTimeMs") + private int loadTimeMs; + + @JsonProperty(value = "BeginTxnTimeMs") + private int beginTxnTimeMs; + + @JsonProperty(value = "StreamLoadPutTimeMs") + private int streamLoadPutTimeMs; + + @JsonProperty(value = "ReadDataTimeMs") + private int readDataTimeMs; + + @JsonProperty(value = "WriteDataTimeMs") + private int writeDataTimeMs; + + @JsonProperty(value = "CommitAndPublishTimeMs") + private int commitAndPublishTimeMs; + + @JsonProperty(value = "ErrorURL") + private String errorURL; + + @Override + public String toString() { + try { + return OBJECT_MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + return ""; + } + } +} + diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java index 9e8142ae7..8e4100eb6 100644 --- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java @@ -26,13 +26,24 @@ import org.apache.doris.flink.exception.StreamLoadException; import org.apache.doris.flink.rest.RestService; import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; +import org.apache.inlong.sort.doris.model.RespContent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +60,10 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + /** * DorisDynamicSchemaOutputFormat, copy from {@link org.apache.doris.flink.table.DorisDynamicOutputFormat} * It is used in the multiple sink scenario, in this scenario, we directly convert the data format by @@ -91,13 +106,21 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { private transient ScheduledFuture<?> scheduledFuture; private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat; + private final String inlongMetric; + private final String auditHostAndPorts; + private transient SinkMetricData metricData; + private transient ListState<MetricState> metricStateListState; + private transient MetricState metricState; + public DorisDynamicSchemaOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions, String dynamicSchemaFormat, String databasePattern, String tablePattern, - boolean ignoreSingleTableErrors) { + boolean ignoreSingleTableErrors, + String inlongMetric, + String auditHostAndPorts) { this.options = option; this.readOptions = readOptions; this.executionOptions = executionOptions; @@ -105,6 +128,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { this.databasePattern = databasePattern; this.tablePattern = tablePattern; this.ignoreSingleTableErrors = ignoreSingleTableErrors; + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; } /** @@ -133,6 +158,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { executionOptions.getStreamLoadProp()); jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(auditHostAndPorts) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) + .withRegisterMetric(MetricOption.RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); + } if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("doris-streamload-output-format")); @@ -276,7 +311,14 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { String loadValue = null; try { loadValue = OBJECT_MAPPER.writeValueAsString(kvs.getValue()); - load(kvs.getKey(), loadValue); + RespContent respContent = load(kvs.getKey(), loadValue); + try { + if (null != metricData && null != respContent) { + metricData.invoke(respContent.getNumberLoadedRows(), respContent.getLoadBytes()); + } + } catch (Exception e) { + LOG.warn("metricData invoke get err:", e); + } LOG.info("load {} records to tableIdentifier: {}", kvs.getValue().size(), kvs.getKey()); writeOutNum.addAndGet(kvs.getValue().size()); // Clean the data that has been loaded. @@ -320,11 +362,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { return hasRecords; } - private void load(String tableIdentifier, String result) throws IOException { + private RespContent load(String tableIdentifier, String result) throws IOException { String[] tableWithDb = tableIdentifier.split("\\."); + RespContent respContent = null; for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { try { - dorisStreamLoad.load(tableWithDb[0], tableWithDb[1], result); + respContent = dorisStreamLoad.load(tableWithDb[0], tableWithDb[1], result); break; } catch (StreamLoadException e) { LOG.error("doris sink error, retry times = {}", i, e); @@ -342,6 +385,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { } } } + return respContent; } private String getBackend() throws IOException { @@ -354,6 +398,26 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { } } + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (metricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, + getRuntimeContext().getIndexOfThisSubtask()); + } + } + + public void initializeState(FunctionInitializationContext context) throws Exception { + if (this.inlongMetric != null) { + this.metricStateListState = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + } + if (context.isRestored()) { + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } + } + /** * Builder for {@link DorisDynamicSchemaOutputFormat}. */ @@ -366,6 +430,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { private String databasePattern; private String tablePattern; private boolean ignoreSingleTableErrors; + private String inlongMetric; + private String auditHostAndPorts; public Builder() { this.optionsBuilder = DorisOptions.builder().setTableIdentifier(""); @@ -417,11 +483,22 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { return this; } + public DorisDynamicSchemaOutputFormat.Builder setInlongMetric(String inlongMetric) { + this.inlongMetric = inlongMetric; + return this; + } + + public DorisDynamicSchemaOutputFormat.Builder setAuditHostAndPorts(String auditHostAndPorts) { + this.auditHostAndPorts = auditHostAndPorts; + return this; + } + @SuppressWarnings({"rawtypes"}) public DorisDynamicSchemaOutputFormat build() { return new DorisDynamicSchemaOutputFormat( optionsBuilder.build(), readOptions, executionOptions, - dynamicSchemaFormat, databasePattern, tablePattern, ignoreSingleTableErrors); + dynamicSchemaFormat, databasePattern, tablePattern, + ignoreSingleTableErrors, inlongMetric, auditHostAndPorts); } } } \ No newline at end of file diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java index 98c08c753..f3c263f93 100644 --- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java @@ -59,6 +59,8 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; +import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; /** * This class copy from {@link org.apache.doris.flink.table.DorisDynamicTableFactory} @@ -210,6 +212,9 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory options.add(SINK_MULTIPLE_TABLE_PATTERN); options.add(SINK_MULTIPLE_ENABLE); options.add(SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS); + options.add(INLONG_METRIC); + options.add(INLONG_AUDIT); + options.add(FactoryUtil.SINK_PARALLELISM); return options; } @@ -296,13 +301,16 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory String sinkMultipleFormat = helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null); validateSinkMultiple(physicalSchema.toPhysicalRowDataType(), multipleSink, sinkMultipleFormat, databasePattern, tablePattern); + String inlongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(INLONG_METRIC.defaultValue()); + String auditHostAndPorts = helper.getOptions().getOptional(INLONG_AUDIT).orElse(INLONG_AUDIT.defaultValue()); + Integer parallelism = helper.getOptions().getOptional(FactoryUtil.SINK_PARALLELISM).orElse(1); // create and return dynamic table sink return new DorisDynamicTableSink( getDorisOptions(helper.getOptions()), getDorisReadOptions(helper.getOptions()), getDorisExecutionOptions(helper.getOptions(), streamLoadProp), physicalSchema, multipleSink, sinkMultipleFormat, databasePattern, - tablePattern, ignoreSingleTableErrors); + tablePattern, ignoreSingleTableErrors, inlongMetric, auditHostAndPorts, parallelism); } private void validateSinkMultiple(DataType physicalDataType, boolean multipleSink, String sinkMultipleFormat, diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java index bc847a398..8d961fdb5 100644 --- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java @@ -25,7 +25,9 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.OutputFormatProvider; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; import org.apache.flink.types.RowKind; +import org.apache.inlong.sort.doris.internal.GenericDorisSinkFunction; /** * DorisDynamicTableSink copy from {@link org.apache.doris.flink.table.DorisDynamicTableSink} @@ -42,6 +44,9 @@ public class DorisDynamicTableSink implements DynamicTableSink { private final String databasePattern; private final String tablePattern; private final boolean ignoreSingleTableErrors; + private final String inlongMetric; + private final String auditHostAndPorts; + private final Integer parallelism; public DorisDynamicTableSink(DorisOptions options, DorisReadOptions readOptions, @@ -51,7 +56,10 @@ public class DorisDynamicTableSink implements DynamicTableSink { String sinkMultipleFormat, String databasePattern, String tablePattern, - boolean ignoreSingleTableErrors) { + boolean ignoreSingleTableErrors, + String inlongMetric, + String auditHostAndPorts, + Integer parallelism) { this.options = options; this.readOptions = readOptions; this.executionOptions = executionOptions; @@ -61,6 +69,9 @@ public class DorisDynamicTableSink implements DynamicTableSink { this.databasePattern = databasePattern; this.tablePattern = tablePattern; this.ignoreSingleTableErrors = ignoreSingleTableErrors; + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; + this.parallelism = parallelism; } @Override @@ -96,14 +107,18 @@ public class DorisDynamicTableSink implements DynamicTableSink { .setDatabasePattern(databasePattern) .setTablePattern(tablePattern) .setDynamicSchemaFormat(sinkMultipleFormat) - .setIgnoreSingleTableErrors(ignoreSingleTableErrors); - return OutputFormatProvider.of(builder.build()); + .setIgnoreSingleTableErrors(ignoreSingleTableErrors) + .setInlongMetric(inlongMetric) + .setAuditHostAndPorts(auditHostAndPorts); + return SinkFunctionProvider.of( + new GenericDorisSinkFunction<>(builder.build()), parallelism); } @Override public DynamicTableSink copy() { return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema, - multipleSink, sinkMultipleFormat, databasePattern, tablePattern, ignoreSingleTableErrors); + multipleSink, sinkMultipleFormat, databasePattern, tablePattern, + ignoreSingleTableErrors, inlongMetric, auditHostAndPorts, parallelism); } @Override diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java index 94b075ef5..1f688bebb 100644 --- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java @@ -19,7 +19,6 @@ package org.apache.inlong.sort.doris.table; import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.exception.StreamLoadException; -import org.apache.doris.flink.rest.models.RespContent; import org.apache.doris.shaded.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.shaded.org.apache.commons.codec.binary.Base64; @@ -32,6 +31,7 @@ import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; +import org.apache.inlong.sort.doris.model.RespContent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +80,7 @@ public class DorisStreamLoad implements Serializable { this.httpClient = httpClientBuilder.build(); } - public void load(String db, String tbl, String value) throws StreamLoadException { + public RespContent load(String db, String tbl, String value) throws StreamLoadException { LoadResponse loadResponse = loadBatch(db, tbl, value); LOG.info("Streamload Response:{}", loadResponse); if (loadResponse.status != 200) { @@ -93,6 +93,7 @@ public class DorisStreamLoad implements Serializable { respContent.getErrorURL()); throw new StreamLoadException(errMsg); } + return respContent; } catch (IOException e) { throw new StreamLoadException(e); } diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 53beba554..95793e251 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -608,6 +608,7 @@ inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java + inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/model/RespContent.java Source : org.apache.doris:flink-doris-connector-1.13_2.11:1.0.3 (Please note that the software have been modified.) License : https://github.com/apache/doris-flink-connector/blob/1.13_2.11-1.0.3/LICENSE.txt