This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
commit ba29ce0d7c8ba8427c1d70d13b37d1592f1279df Author: wudi <676366...@qq.com> AuthorDate: Mon Aug 9 22:12:46 2021 +0800 [Feature]:Flink-connector supports streamload parameters (#6243) Flink-connector supports streamload parameters #6199 --- .../apache/doris/flink/backend/BackendClient.java | 5 +- .../doris/flink/cfg/DorisConnectionOptions.java | 2 +- .../doris/flink/cfg/DorisExecutionOptions.java | 23 +- .../org/apache/doris/flink/cfg/DorisOptions.java | 2 +- .../apache/doris/flink/cfg/DorisReadOptions.java | 8 +- .../flink/datastream/DorisSourceFunction.java | 16 +- .../SimpleListDeserializationSchema.java | 3 +- .../doris/flink/exception/DorisException.java | 8 +- .../exception/ShouldNeverHappenException.java | 3 +- .../doris/flink/exception/StreamLoadException.java | 4 + .../doris/flink/rest/PartitionDefinition.java | 2 +- .../org/apache/doris/flink/rest/RestService.java | 95 +++-- .../org/apache/doris/flink/rest/SchemaUtils.java | 3 +- .../org/apache/doris/flink/rest/models/Field.java | 3 +- .../apache/doris/flink/serialization/RowBatch.java | 8 +- .../flink/table/DorisDynamicOutputFormat.java | 57 +-- .../flink/table/DorisDynamicTableFactory.java | 427 +++++++++++---------- .../doris/flink/table/DorisDynamicTableSink.java | 2 +- .../doris/flink/table/DorisDynamicTableSource.java | 90 ++--- .../doris/flink/table/DorisRowDataInputFormat.java | 358 ++++++++--------- .../apache/doris/flink/table/DorisStreamLoad.java | 35 +- .../doris/flink/table/DorisTableInputSplit.java | 8 +- 22 files changed, 624 insertions(+), 538 deletions(-) diff --git a/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/src/main/java/org/apache/doris/flink/backend/BackendClient.java index 93b353c..40bb5c9 100644 --- a/src/main/java/org/apache/doris/flink/backend/BackendClient.java +++ b/src/main/java/org/apache/doris/flink/backend/BackendClient.java @@ -112,6 +112,7 @@ public class BackendClient { /** * Open a scanner for reading Doris data. + * * @param openParams thrift struct to required by request * @return scan open result * @throws ConnectedFailedException throw if cannot connect to Doris BE @@ -147,6 +148,7 @@ public class BackendClient { /** * get next row batch from Doris BE + * * @param nextBatchParams thrift struct to required by request * @return scan batch result * @throws ConnectedFailedException throw if cannot connect to Doris BE @@ -161,7 +163,7 @@ public class BackendClient { for (int attempt = 0; attempt < retries; ++attempt) { logger.debug("Attempt {} to getNext {}.", attempt, routing); try { - result = client.get_next(nextBatchParams); + result = client.get_next(nextBatchParams); if (result == null) { logger.warn("GetNext result from {} is null.", routing); continue; @@ -189,6 +191,7 @@ public class BackendClient { /** * close an scanner. + * * @param closeParams thrift struct to required by request */ public void closeScanner(TScanCloseParams closeParams) { diff --git a/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java b/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java index 619ce74..9b2187c 100644 --- a/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java +++ b/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java @@ -21,7 +21,7 @@ import org.apache.flink.util.Preconditions; import java.io.Serializable; /** - * Doris connection options. + * Doris connection options. */ public class DorisConnectionOptions implements Serializable { diff --git a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 330cbc9..3d035ab 100644 --- a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -21,22 +21,29 @@ import org.apache.flink.util.Preconditions; import java.io.Serializable; import java.time.Duration; +import java.util.Properties; /** * JDBC sink batch options. */ -public class DorisExecutionOptions implements Serializable { +public class DorisExecutionOptions implements Serializable { private static final long serialVersionUID = 1L; private final Integer batchSize; private final Integer maxRetries; private final Long batchIntervalMs; - public DorisExecutionOptions(Integer batchSize, Integer maxRetries,Long batchIntervalMs) { + /** + * Properties for the StreamLoad. + */ + private final Properties streamLoadProp; + + public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long batchIntervalMs, Properties streamLoadProp) { Preconditions.checkArgument(maxRetries >= 0); this.batchSize = batchSize; this.maxRetries = maxRetries; this.batchIntervalMs = batchIntervalMs; + this.streamLoadProp = streamLoadProp; } public Integer getBatchSize() { @@ -51,6 +58,10 @@ public class DorisExecutionOptions implements Serializable { return batchIntervalMs; } + public Properties getStreamLoadProp() { + return streamLoadProp; + } + public static Builder builder() { return new Builder(); } @@ -62,6 +73,7 @@ public class DorisExecutionOptions implements Serializable { private Integer batchSize; private Integer maxRetries; private Long batchIntervalMs; + private Properties streamLoadProp; public Builder setBatchSize(Integer batchSize) { this.batchSize = batchSize; @@ -78,8 +90,13 @@ public class DorisExecutionOptions implements Serializable { return this; } + public Builder setStreamLoadProp(Properties streamLoadProp) { + this.streamLoadProp = streamLoadProp; + return this; + } + public DorisExecutionOptions build() { - return new DorisExecutionOptions(batchSize,maxRetries,batchIntervalMs); + return new DorisExecutionOptions(batchSize, maxRetries, batchIntervalMs, streamLoadProp); } } diff --git a/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java index c542d6b..512d0ab 100644 --- a/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java +++ b/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java @@ -25,7 +25,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * Options for the Doris connector. */ -public class DorisOptions extends DorisConnectionOptions{ +public class DorisOptions extends DorisConnectionOptions { private static final long serialVersionUID = 1L; diff --git a/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java index 833ecf5..53cefaa 100644 --- a/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java +++ b/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java @@ -22,7 +22,7 @@ import java.io.Serializable; /** * Doris read Options */ -public class DorisReadOptions implements Serializable { +public class DorisReadOptions implements Serializable { private static final long serialVersionUID = 1L; @@ -35,7 +35,7 @@ public class DorisReadOptions implements Serializable { private Integer requestRetries; private Integer requestBatchSize; private Long execMemLimit; - private Integer deserializeQueueSize; + private Integer deserializeQueueSize; private Boolean deserializeArrowAsync; public DorisReadOptions(String readFields, String filterQuery, Integer requestTabletSize, Integer requestConnectTimeoutMs, Integer requestReadTimeoutMs, @@ -117,7 +117,7 @@ public class DorisReadOptions implements Serializable { private Integer requestRetries; private Integer requestBatchSize; private Long execMemLimit; - private Integer deserializeQueueSize; + private Integer deserializeQueueSize; private Boolean deserializeArrowAsync; @@ -177,7 +177,7 @@ public class DorisReadOptions implements Serializable { } public DorisReadOptions build() { - return new DorisReadOptions(readFields,filterQuery,requestTabletSize,requestConnectTimeoutMs,requestReadTimeoutMs,requestQueryTimeoutS,requestRetries,requestBatchSize,execMemLimit,deserializeQueueSize,deserializeArrowAsync); + return new DorisReadOptions(readFields, filterQuery, requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, deserializeQueueSize, deserializeArrowAsync); } } diff --git a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java index 5c580db..82ab224 100644 --- a/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java +++ b/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java @@ -38,15 +38,15 @@ import java.util.List; public class DorisSourceFunction<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> { - private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class); + private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class); private DorisDeserializationSchema deserializer; private DorisOptions options; private DorisReadOptions readOptions; - private List<PartitionDefinition> dorisPartitions; + private List<PartitionDefinition> dorisPartitions; private ScalaValueReader scalaValueReader; - public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) { + public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) { this.deserializer = deserializer; this.options = streamOptions.getOptions(); this.readOptions = streamOptions.getReadOptions(); @@ -55,14 +55,14 @@ public class DorisSourceFunction<T> extends RichSourceFunction<T> implements Res @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - this.dorisPartitions = RestService.findPartitions(options,readOptions,logger); + this.dorisPartitions = RestService.findPartitions(options, readOptions, logger); } @Override - public void run(SourceContext sourceContext) throws Exception{ - for(PartitionDefinition partitions : dorisPartitions){ - scalaValueReader = new ScalaValueReader(partitions, options,readOptions); - while (scalaValueReader.hasNext()){ + public void run(SourceContext sourceContext) throws Exception { + for (PartitionDefinition partitions : dorisPartitions) { + scalaValueReader = new ScalaValueReader(partitions, options, readOptions); + while (scalaValueReader.hasNext()) { Object next = scalaValueReader.next(); sourceContext.collect(next); } diff --git a/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java b/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java index 6fd68ec..7fcf2f6 100644 --- a/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java +++ b/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java @@ -18,10 +18,11 @@ package org.apache.doris.flink.deserialization; import org.apache.flink.api.common.typeinfo.TypeInformation; + import java.util.List; -public class SimpleListDeserializationSchema implements DorisDeserializationSchema{ +public class SimpleListDeserializationSchema implements DorisDeserializationSchema { @Override public TypeInformation getProducedType() { diff --git a/src/main/java/org/apache/doris/flink/exception/DorisException.java b/src/main/java/org/apache/doris/flink/exception/DorisException.java index 26e11e5..2274f87 100644 --- a/src/main/java/org/apache/doris/flink/exception/DorisException.java +++ b/src/main/java/org/apache/doris/flink/exception/DorisException.java @@ -21,18 +21,22 @@ public class DorisException extends Exception { public DorisException() { super(); } + public DorisException(String message) { super(message); } + public DorisException(String message, Throwable cause) { super(message, cause); } + public DorisException(Throwable cause) { super(cause); } + protected DorisException(String message, Throwable cause, - boolean enableSuppression, - boolean writableStackTrace) { + boolean enableSuppression, + boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } } diff --git a/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java b/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java index 307c398..a26718d 100644 --- a/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java +++ b/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java @@ -17,4 +17,5 @@ package org.apache.doris.flink.exception; -public class ShouldNeverHappenException extends DorisException { } +public class ShouldNeverHappenException extends DorisException { +} diff --git a/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java b/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java index 6d7a192..233d27e 100644 --- a/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java +++ b/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java @@ -21,15 +21,19 @@ public class StreamLoadException extends Exception { public StreamLoadException() { super(); } + public StreamLoadException(String message) { super(message); } + public StreamLoadException(String message, Throwable cause) { super(message, cause); } + public StreamLoadException(Throwable cause) { super(cause); } + protected StreamLoadException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { diff --git a/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java b/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java index 19edd21..8a66f76 100644 --- a/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java +++ b/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java @@ -103,7 +103,7 @@ public class PartitionDefinition implements Serializable, Comparable<PartitionDe similar.retainAll(o.tabletIds); diffSelf.removeAll(similar); diffOther.removeAll(similar); - if (diffSelf.size() == 0) { + if (diffSelf.size() == 0) { return 0; } long diff = Collections.min(diffSelf) - Collections.min(diffOther); diff --git a/src/main/java/org/apache/doris/flink/rest/RestService.java b/src/main/java/org/apache/doris/flink/rest/RestService.java index cd5b6d5..184afd3 100644 --- a/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -88,13 +88,14 @@ public class RestService implements Serializable { /** * send request to Doris FE and get response json string. + * * @param options configuration of request * @param request {@link HttpRequestBase} real request - * @param logger {@link Logger} + * @param logger {@link Logger} * @return Doris FE response in json string * @throws ConnectedFailedException throw when cannot connect to Doris FE */ - private static String send(DorisOptions options,DorisReadOptions readOptions, HttpRequestBase request, Logger logger) throws + private static String send(DorisOptions options, DorisReadOptions readOptions, HttpRequestBase request, Logger logger) throws ConnectedFailedException { int connectTimeout = readOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : readOptions.getRequestConnectTimeoutMs(); int socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : readOptions.getRequestReadTimeoutMs(); @@ -116,10 +117,10 @@ public class RestService implements Serializable { logger.debug("Attempt {} to request {}.", attempt, request.getURI()); try { String response; - if (request instanceof HttpGet){ - response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(),logger); + if (request instanceof HttpGet) { + response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(), logger); } else { - response = getConnectionPost(request, options.getUsername(), options.getPassword(),logger); + response = getConnectionPost(request, options.getUsername(), options.getPassword(), logger); } if (response == null) { logger.warn("Failed to get response from Doris FE {}, http code is {}", @@ -147,14 +148,14 @@ public class RestService implements Serializable { throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex); } - private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException { + private static String getConnectionPost(HttpRequestBase request, String user, String passwd, Logger logger) throws IOException { URL url = new URL(request.getURI().toString()); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setInstanceFollowRedirects(false); conn.setRequestMethod(request.getMethod()); String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); conn.setRequestProperty("Authorization", "Basic " + authEncoding); - InputStream content = ((HttpPost)request).getEntity().getContent(); + InputStream content = ((HttpPost) request).getEntity().getContent(); String res = IOUtils.toString(content); conn.setDoOutput(true); conn.setDoInput(true); @@ -164,21 +165,21 @@ public class RestService implements Serializable { // flush out.flush(); // read response - return parseResponse(conn,logger); + return parseResponse(conn, logger); } - private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException { + private static String getConnectionGet(String request, String user, String passwd, Logger logger) throws IOException { URL realUrl = new URL(request); // open connection - HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection(); + HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection(); String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); connection.setRequestProperty("Authorization", "Basic " + authEncoding); connection.connect(); - return parseResponse(connection,logger); + return parseResponse(connection, logger); } - private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException { + private static String parseResponse(HttpURLConnection connection, Logger logger) throws IOException { if (connection.getResponseCode() != HttpStatus.SC_OK) { logger.warn("Failed to get response from Doris {}, http code is {}", connection.getURL(), connection.getResponseCode()); @@ -198,8 +199,9 @@ public class RestService implements Serializable { /** * parse table identifier to array. + * * @param tableIdentifier table identifier string - * @param logger {@link Logger} + * @param logger {@link Logger} * @return first element is db name, second element is table name * @throws IllegalArgumentException table identifier is illegal */ @@ -220,8 +222,9 @@ public class RestService implements Serializable { /** * choice a Doris FE node to request. + * * @param feNodes Doris FE node list, separate be comma - * @param logger slf4j logger + * @param logger slf4j logger * @return the chosen one Doris FE node * @throws IllegalArgumentException fe nodes is illegal */ @@ -239,14 +242,15 @@ public class RestService implements Serializable { /** * choice a Doris BE node to request. + * * @param options configuration of request - * @param logger slf4j logger + * @param logger slf4j logger * @return the chosen one Doris BE node * @throws IllegalArgumentException BE nodes is illegal */ @VisibleForTesting - public static String randomBackend(DorisOptions options,DorisReadOptions readOptions ,Logger logger) throws DorisException, IOException { - List<BackendRow> backends = getBackends(options,readOptions, logger); + public static String randomBackend(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException { + List<BackendRow> backends = getBackends(options, readOptions, logger); logger.trace("Parse beNodes '{}'.", backends); if (backends == null || backends.isEmpty()) { logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends); @@ -259,19 +263,20 @@ public class RestService implements Serializable { /** * get Doris BE nodes to request. + * * @param options configuration of request - * @param logger slf4j logger + * @param logger slf4j logger * @return the chosen one Doris BE node * @throws IllegalArgumentException BE nodes is illegal */ @VisibleForTesting - static List<BackendRow> getBackends(DorisOptions options,DorisReadOptions readOptions, Logger logger) throws DorisException, IOException { + static List<BackendRow> getBackends(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException { String feNodes = options.getFenodes(); String feNode = randomEndpoint(feNodes, logger); String beUrl = "http://" + feNode + BACKENDS; HttpGet httpGet = new HttpGet(beUrl); - String response = send(options, readOptions,httpGet, logger); - logger.info("Backend Info:{}",response); + String response = send(options, readOptions, httpGet, logger); + logger.info("Backend Info:{}", response); List<BackendRow> backends = parseBackend(response, logger); return backends; } @@ -306,8 +311,9 @@ public class RestService implements Serializable { /** * get a valid URI to connect Doris FE. + * * @param options configuration of request - * @param logger {@link Logger} + * @param logger {@link Logger} * @return uri string * @throws IllegalArgumentException throw when configuration is illegal */ @@ -323,24 +329,26 @@ public class RestService implements Serializable { /** * discover Doris table schema from Doris FE. + * * @param options configuration of request - * @param logger slf4j logger + * @param logger slf4j logger * @return Doris table schema * @throws DorisException throw when discover failed */ - public static Schema getSchema(DorisOptions options,DorisReadOptions readOptions, Logger logger) + public static Schema getSchema(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException { logger.trace("Finding schema."); HttpGet httpGet = new HttpGet(getUriStr(options, logger) + SCHEMA); - String response = send(options, readOptions,httpGet, logger); + String response = send(options, readOptions, httpGet, logger); logger.debug("Find schema response is '{}'.", response); return parseSchema(response, logger); } /** * translate Doris FE response to inner {@link Schema} struct. + * * @param response Doris FE response - * @param logger {@link Logger} + * @param logger {@link Logger} * @return inner {@link Schema} struct * @throws DorisException throw when translate failed */ @@ -381,14 +389,15 @@ public class RestService implements Serializable { /** * find Doris RDD partitions from Doris FE. + * * @param options configuration of request - * @param logger {@link Logger} + * @param logger {@link Logger} * @return an list of Doris RDD partitions * @throws DorisException throw when find partition failed */ public static List<PartitionDefinition> findPartitions(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException { String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger); - String readFields = StringUtils.isBlank(readOptions.getReadFields()) ? "*" :readOptions.getReadFields(); + String readFields = StringUtils.isBlank(readOptions.getReadFields()) ? "*" : readOptions.getReadFields(); String sql = "select " + readFields + " from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`"; if (!StringUtils.isEmpty(readOptions.getFilterQuery())) { @@ -397,14 +406,14 @@ public class RestService implements Serializable { logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql); HttpPost httpPost = new HttpPost(getUriStr(options, logger) + QUERY_PLAN); - String entity = "{\"sql\": \""+ sql +"\"}"; + String entity = "{\"sql\": \"" + sql + "\"}"; logger.debug("Post body Sending to Doris FE is: '{}'.", entity); StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8); stringEntity.setContentEncoding("UTF-8"); stringEntity.setContentType("application/json"); httpPost.setEntity(stringEntity); - String resStr = send(options, readOptions,httpPost, logger); + String resStr = send(options, readOptions, httpPost, logger); logger.debug("Find partition response is '{}'.", resStr); QueryPlan queryPlan = getQueryPlan(resStr, logger); Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger); @@ -420,8 +429,9 @@ public class RestService implements Serializable { /** * translate Doris FE response string to inner {@link QueryPlan} struct. + * * @param response Doris FE response string - * @param logger {@link Logger} + * @param logger {@link Logger} * @return inner {@link QueryPlan} struct * @throws DorisException throw when translate failed. */ @@ -461,13 +471,14 @@ public class RestService implements Serializable { /** * select which Doris BE to get tablet data. + * * @param queryPlan {@link QueryPlan} translated from Doris FE response - * @param logger {@link Logger} + * @param logger {@link Logger} * @return BE to tablets {@link Map} * @throws DorisException throw when select failed. */ @VisibleForTesting - static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException { + static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException { Map<String, List<Long>> be2Tablets = new HashMap<>(); for (Map.Entry<String, Tablet> part : queryPlan.getPartitions().entrySet()) { logger.debug("Parse tablet info: '{}'.", part); @@ -512,8 +523,9 @@ public class RestService implements Serializable { /** * tablet count limit for one Doris RDD partition + * * @param readOptions configuration of request - * @param logger {@link Logger} + * @param logger {@link Logger} * @return tablet count limit */ @VisibleForTesting @@ -533,18 +545,19 @@ public class RestService implements Serializable { /** * translate BE tablets map to Doris RDD partition. - * @param options configuration of request - * @param be2Tablets BE to tablets {@link Map} + * + * @param options configuration of request + * @param be2Tablets BE to tablets {@link Map} * @param opaquedQueryPlan Doris BE execute plan getting from Doris FE - * @param database database name of Doris table - * @param table table name of Doris table - * @param logger {@link Logger} + * @param database database name of Doris table + * @param table table name of Doris table + * @param logger {@link Logger} * @return Doris RDD partition {@link List} * @throws IllegalArgumentException throw when translate failed */ @VisibleForTesting - static List<PartitionDefinition> tabletsMapToPartition(DorisOptions options,DorisReadOptions readOptions, Map<String, List<Long>> be2Tablets, - String opaquedQueryPlan, String database, String table, Logger logger) + static List<PartitionDefinition> tabletsMapToPartition(DorisOptions options, DorisReadOptions readOptions, Map<String, List<Long>> be2Tablets, + String opaquedQueryPlan, String database, String table, Logger logger) throws IllegalArgumentException { int tabletsSize = tabletCountLimitForOnePartition(readOptions, logger); List<PartitionDefinition> partitions = new ArrayList<>(); diff --git a/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java b/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java index 46c54c0..13bde01 100644 --- a/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java +++ b/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java @@ -27,10 +27,11 @@ public class SchemaUtils { /** * convert Doris return schema to inner schema struct. + * * @param tscanColumnDescs Doris BE return schema * @return inner schema struct */ - public static Schema convertToSchema(List<TScanColumnDesc> tscanColumnDescs ){ + public static Schema convertToSchema(List<TScanColumnDesc> tscanColumnDescs) { Schema schema = new Schema(tscanColumnDescs.size()); tscanColumnDescs.stream().forEach(desc -> schema.put(new Field(desc.getName(), desc.getType().name(), "", 0, 0))); return schema; diff --git a/src/main/java/org/apache/doris/flink/rest/models/Field.java b/src/main/java/org/apache/doris/flink/rest/models/Field.java index 4ac66be..9a58180 100644 --- a/src/main/java/org/apache/doris/flink/rest/models/Field.java +++ b/src/main/java/org/apache/doris/flink/rest/models/Field.java @@ -26,7 +26,8 @@ public class Field { private int precision; private int scale; - public Field() { } + public Field() { + } public Field(String name, String type, String comment, int precision, int scale) { this.name = name; diff --git a/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index 8d81a37..00c699b 100644 --- a/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -63,7 +63,7 @@ public class RowBatch { this.cols = new ArrayList<>(colCount); } - public List<Object> getCols() { + public List<Object> getCols() { return cols; } @@ -87,13 +87,13 @@ public class RowBatch { return rowBatch; } - public RowBatch(TScanBatchResult nextResult, Schema schema){ + public RowBatch(TScanBatchResult nextResult, Schema schema) { this.schema = schema; this.rootAllocator = new RootAllocator(Integer.MAX_VALUE); this.arrowStreamReader = new ArrowStreamReader( new ByteArrayInputStream(nextResult.getRows()), rootAllocator - ); + ); this.offsetInRowBatch = 0; } @@ -243,7 +243,7 @@ public class RowBatch { continue; } BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros(); - addValueToRow(rowIndex, DecimalData.fromBigDecimal(value,value.precision(),value.scale())); + addValueToRow(rowIndex, DecimalData.fromBigDecimal(value, value.precision(), value.scale())); } break; case "DATE": diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index 33f5c85..77b53ba 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.StringJoiner; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -44,17 +45,23 @@ import java.util.concurrent.TimeUnit; /** * DorisDynamicOutputFormat **/ -public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { +public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicOutputFormat.class); - - private DorisOptions options ; - private DorisReadOptions readOptions; - private DorisExecutionOptions executionOptions; + private static final String FIELD_DELIMITER_KEY = "column_separator"; + private static final String FIELD_DELIMITER_DEFAULT = "\t"; + private static final String LINE_DELIMITER_KEY = "line_delimiter"; + private static final String LINE_DELIMITER_DEFAULT = "\n"; + private static final String NULL_VALUE = "\\N"; + private final String fieldDelimiter; + private final String lineDelimiter; + + private DorisOptions options; + private DorisReadOptions readOptions; + private DorisExecutionOptions executionOptions; private DorisStreamLoad dorisStreamLoad; - private final String fieldDelimiter = "\t"; - private final String lineDelimiter = "\n"; - private final String NULL_VALUE = "\\N"; + + private final List<String> batch = new ArrayList<>(); private transient volatile boolean closed = false; @@ -62,15 +69,16 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { private transient ScheduledFuture<?> scheduledFuture; private transient volatile Exception flushException; - public DorisDynamicOutputFormat(DorisOptions option,DorisReadOptions readOptions,DorisExecutionOptions executionOptions) { + public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions) { this.options = option; this.readOptions = readOptions; this.executionOptions = executionOptions; + this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT); + this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT); } @Override public void configure(Configuration configuration) { - } @Override @@ -80,8 +88,9 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { options.getTableIdentifier().split("\\.")[0], options.getTableIdentifier().split("\\.")[1], options.getUsername(), - options.getPassword()); - LOG.info("Streamload BE:{}",dorisStreamLoad.getLoadUrlStr()); + options.getPassword(), + executionOptions.getStreamLoadProp()); + LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr()); if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output-format")); @@ -118,12 +127,12 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { private void addBatch(RowData row) { StringJoiner value = new StringJoiner(this.fieldDelimiter); GenericRowData rowData = (GenericRowData) row; - for(int i = 0; i < row.getArity(); ++i) { + for (int i = 0; i < row.getArity(); ++i) { Object field = rowData.getField(i); - if(field != null){ + if (field != null) { value.add(field.toString()); - }else{ - value.add(this.NULL_VALUE); + } else { + value.add(NULL_VALUE); } } batch.add(value.toString()); @@ -151,12 +160,12 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { public synchronized void flush() throws IOException { checkFlushException(); - if(batch.isEmpty()){ + if (batch.isEmpty()) { return; } for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { try { - dorisStreamLoad.load(String.join(lineDelimiter,batch)); + dorisStreamLoad.load(String.join(this.lineDelimiter, batch)); batch.clear(); break; } catch (StreamLoadException e) { @@ -166,7 +175,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { } try { dorisStreamLoad.setHostPort(getBackend()); - LOG.warn("streamload error,switch be: {}",dorisStreamLoad.getLoadUrlStr(), e); + LOG.warn("streamload error,switch be: {}", dorisStreamLoad.getLoadUrlStr(), e); Thread.sleep(1000 * i); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); @@ -177,10 +186,10 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { } - private String getBackend() throws IOException{ + private String getBackend() throws IOException { try { //get be url from fe - return RestService.randomBackend(options,readOptions, LOG); + return RestService.randomBackend(options, readOptions, LOG); } catch (IOException | DorisException e) { LOG.error("get backends info fail"); throw new IOException(e); @@ -202,8 +211,8 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { */ public static class Builder { private DorisOptions.Builder optionsBuilder; - private DorisReadOptions readOptions; - private DorisExecutionOptions executionOptions; + private DorisReadOptions readOptions; + private DorisExecutionOptions executionOptions; public Builder() { this.optionsBuilder = DorisOptions.builder(); @@ -241,7 +250,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { public DorisDynamicOutputFormat build() { return new DorisDynamicOutputFormat( - optionsBuilder.build(),readOptions,executionOptions + optionsBuilder.build(), readOptions, executionOptions ); } } diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 27b6f97..92d69e6 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -34,6 +34,8 @@ import org.apache.flink.table.utils.TableSchemaUtils; import java.time.Duration; import java.util.HashSet; +import java.util.Map; +import java.util.Properties; import java.util.Set; import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; @@ -52,209 +54,224 @@ import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_ * <p>Because the table source requires a decoding format, we are discovering the format using the * provided {@link FactoryUtil} for convenience. */ -public final class DorisDynamicTableFactory implements DynamicTableSourceFactory , DynamicTableSinkFactory { - - public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address."); - public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the jdbc table name."); - public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name."); - public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password."); - - // doris options - private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions - .key("doris.read.field") - .stringType() - .noDefaultValue() - .withDescription("List of column names in the Doris table, separated by commas"); - - private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions - .key("doris.filter.query") - .stringType() - .noDefaultValue() - .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering"); - - private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions - .key("doris.request.tablet.size") - .intType() - .defaultValue(DORIS_TABLET_SIZE_DEFAULT) - .withDescription(""); - - private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions - .key("doris.request.connect.timeout.ms") - .intType() - .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) - .withDescription(""); - - private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions - .key("doris.request.read.timeout.ms") - .intType() - .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) - .withDescription(""); - - private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions - .key("doris.request.query.timeout.s") - .intType() - .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT) - .withDescription(""); - - private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions - .key("doris.request.retries") - .intType() - .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT) - .withDescription(""); - - private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions - .key("doris.deserialize.arrow.async") - .booleanType() - .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) - .withDescription(""); - - private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions - .key("doris.request.retriesdoris.deserialize.queue.size") - .intType() - .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) - .withDescription(""); - - - private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions - .key("doris.batch.size") - .intType() - .defaultValue(DORIS_BATCH_SIZE_DEFAULT) - .withDescription(""); - - private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions - .key("doris.exec.mem.limit") - .longType() - .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT) - .withDescription(""); - - // flink write config options - private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions - .key("sink.batch.size") - .intType() - .defaultValue(100) - .withDescription("the flush max size (includes all append, upsert and delete records), over this number" + - " of records, will flush data. The default value is 100."); - - private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions - .key("sink.max-retries") - .intType() - .defaultValue(3) - .withDescription("the max retry times if writing records to database failed."); - - private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions - .key("sink.batch.interval") - .durationType() - .defaultValue(Duration.ofSeconds(1)) - .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " + - "default value is 1s."); - - - @Override - public String factoryIdentifier() { - return "doris"; // used for matching to `connector = '...'` - } - - @Override - public Set<ConfigOption<?>> requiredOptions() { - final Set<ConfigOption<?>> options = new HashSet<>(); - options.add(FENODES); - options.add(TABLE_IDENTIFIER); - return options; - } - - @Override - public Set<ConfigOption<?>> optionalOptions() { - final Set<ConfigOption<?>> options = new HashSet<>(); - options.add(FENODES); - options.add(TABLE_IDENTIFIER); - options.add(USERNAME); - options.add(PASSWORD); - - options.add(DORIS_READ_FIELD); - options.add(DORIS_FILTER_QUERY); - options.add(DORIS_TABLET_SIZE); - options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS); - options.add(DORIS_REQUEST_READ_TIMEOUT_MS); - options.add(DORIS_REQUEST_QUERY_TIMEOUT_S); - options.add(DORIS_REQUEST_RETRIES); - options.add(DORIS_DESERIALIZE_ARROW_ASYNC); - options.add(DORIS_DESERIALIZE_QUEUE_SIZE); - options.add(DORIS_BATCH_SIZE); - options.add(DORIS_EXEC_MEM_LIMIT); - - options.add(SINK_BUFFER_FLUSH_MAX_ROWS); - options.add(SINK_MAX_RETRIES); - options.add(SINK_BUFFER_FLUSH_INTERVAL); - return options; - } - - @Override - public DynamicTableSource createDynamicTableSource(Context context) { - // either implement your custom validation logic here ... - // or use the provided helper utility - final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - // validate all options - helper.validate(); - // get the validated options - final ReadableConfig options = helper.getOptions(); - // derive the produced data type (excluding computed columns) from the catalog table - final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); - TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); - // create and return dynamic table source - return new DorisDynamicTableSource( - getDorisOptions(helper.getOptions()), - getDorisReadOptions(helper.getOptions()), - physicalSchema); - } - - private DorisOptions getDorisOptions(ReadableConfig readableConfig) { - final String fenodes = readableConfig.get(FENODES); - final DorisOptions.Builder builder = DorisOptions.builder() - .setFenodes(fenodes) - .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER)); - - readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername); - readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword); - return builder.build(); - } - - private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) { - final DorisReadOptions.Builder builder = DorisReadOptions.builder(); - builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC)) - .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE)) - .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT)) - .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY)) - .setReadFields(readableConfig.get(DORIS_READ_FIELD)) - .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S)) - .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE)) - .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS)) - .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS)) - .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES)) - .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE)); - return builder.build(); - } - - private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableConfig) { - final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder(); - builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS)); - builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES)); - builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); - return builder.build(); - } - - - @Override - public DynamicTableSink createDynamicTableSink(Context context) { - // either implement your custom validation logic here ... - // or use the provided helper utility - final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - // validate all options - helper.validate(); - // create and return dynamic table source - return new DorisDynamicTableSink( - getDorisOptions(helper.getOptions()), - getDorisReadOptions(helper.getOptions()), - getDorisExecutionOptions(helper.getOptions()) - ); - } +public final class DorisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address."); + public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the jdbc table name."); + public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name."); + public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password."); + + // doris options + private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions + .key("doris.read.field") + .stringType() + .noDefaultValue() + .withDescription("List of column names in the Doris table, separated by commas"); + + private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions + .key("doris.filter.query") + .stringType() + .noDefaultValue() + .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering"); + + private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions + .key("doris.request.tablet.size") + .intType() + .defaultValue(DORIS_TABLET_SIZE_DEFAULT) + .withDescription(""); + + private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions + .key("doris.request.connect.timeout.ms") + .intType() + .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) + .withDescription(""); + + private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions + .key("doris.request.read.timeout.ms") + .intType() + .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) + .withDescription(""); + + private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions + .key("doris.request.query.timeout.s") + .intType() + .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT) + .withDescription(""); + + private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions + .key("doris.request.retries") + .intType() + .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT) + .withDescription(""); + + private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions + .key("doris.deserialize.arrow.async") + .booleanType() + .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) + .withDescription(""); + + private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions + .key("doris.request.retriesdoris.deserialize.queue.size") + .intType() + .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) + .withDescription(""); + + + private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions + .key("doris.batch.size") + .intType() + .defaultValue(DORIS_BATCH_SIZE_DEFAULT) + .withDescription(""); + + private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions + .key("doris.exec.mem.limit") + .longType() + .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT) + .withDescription(""); + + // flink write config options + private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions + .key("sink.batch.size") + .intType() + .defaultValue(100) + .withDescription("the flush max size (includes all append, upsert and delete records), over this number" + + " of records, will flush data. The default value is 100."); + + private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions + .key("sink.max-retries") + .intType() + .defaultValue(3) + .withDescription("the max retry times if writing records to database failed."); + + private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions + .key("sink.batch.interval") + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " + + "default value is 1s."); + + + // Prefix for Doris StreamLoad specific properties. + public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; + + @Override + public String factoryIdentifier() { + return "doris"; // used for matching to `connector = '...'` + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(FENODES); + options.add(TABLE_IDENTIFIER); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(FENODES); + options.add(TABLE_IDENTIFIER); + options.add(USERNAME); + options.add(PASSWORD); + + options.add(DORIS_READ_FIELD); + options.add(DORIS_FILTER_QUERY); + options.add(DORIS_TABLET_SIZE); + options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS); + options.add(DORIS_REQUEST_READ_TIMEOUT_MS); + options.add(DORIS_REQUEST_QUERY_TIMEOUT_S); + options.add(DORIS_REQUEST_RETRIES); + options.add(DORIS_DESERIALIZE_ARROW_ASYNC); + options.add(DORIS_DESERIALIZE_QUEUE_SIZE); + options.add(DORIS_BATCH_SIZE); + options.add(DORIS_EXEC_MEM_LIMIT); + + options.add(SINK_BUFFER_FLUSH_MAX_ROWS); + options.add(SINK_MAX_RETRIES); + options.add(SINK_BUFFER_FLUSH_INTERVAL); + return options; + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + // either implement your custom validation logic here ... + // or use the provided helper utility + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + // validate all options + helper.validate(); + // get the validated options + final ReadableConfig options = helper.getOptions(); + // derive the produced data type (excluding computed columns) from the catalog table + final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); + TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + // create and return dynamic table source + return new DorisDynamicTableSource( + getDorisOptions(helper.getOptions()), + getDorisReadOptions(helper.getOptions()), + physicalSchema); + } + + private DorisOptions getDorisOptions(ReadableConfig readableConfig) { + final String fenodes = readableConfig.get(FENODES); + final DorisOptions.Builder builder = DorisOptions.builder() + .setFenodes(fenodes) + .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER)); + + readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername); + readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword); + return builder.build(); + } + + private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) { + final DorisReadOptions.Builder builder = DorisReadOptions.builder(); + builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC)) + .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE)) + .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT)) + .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY)) + .setReadFields(readableConfig.get(DORIS_READ_FIELD)) + .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S)) + .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE)) + .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS)) + .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS)) + .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES)) + .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE)); + return builder.build(); + } + + private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableConfig, Properties streamLoadProp) { + final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder(); + builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS)); + builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES)); + builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); + builder.setStreamLoadProp(streamLoadProp); + return builder.build(); + } + + private Properties getStreamLoadProp(Map<String, String> tableOptions) { + final Properties streamLoadProp = new Properties(); + + for (Map.Entry<String, String> entry : tableOptions.entrySet()) { + if (entry.getKey().startsWith(STREAM_LOAD_PROP_PREFIX)) { + String subKey = entry.getKey().substring(STREAM_LOAD_PROP_PREFIX.length()); + streamLoadProp.put(subKey, entry.getValue()); + } + } + return streamLoadProp; + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + // validate all options + helper.validateExcept(STREAM_LOAD_PROP_PREFIX); + + Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions()); + // create and return dynamic table source + return new DorisDynamicTableSink( + getDorisOptions(helper.getOptions()), + getDorisReadOptions(helper.getOptions()), + getDorisExecutionOptions(helper.getOptions(), streamLoadProp) + ); + } } diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index feeab96..dc710d7 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -63,7 +63,7 @@ public class DorisDynamicTableSink implements DynamicTableSink { @Override public DynamicTableSink copy() { - return new DorisDynamicTableSink(options,readOptions,executionOptions); + return new DorisDynamicTableSink(options, readOptions, executionOptions); } @Override diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index ab523aa..43d9e5f 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -46,56 +46,56 @@ import java.util.List; * where we instantiate the required {@link SourceFunction} and its {@link DeserializationSchema} for * runtime. Both instances are parameterized to return internal data structures (i.e. {@link RowData}). */ -public final class DorisDynamicTableSource implements ScanTableSource ,LookupTableSource { +public final class DorisDynamicTableSource implements ScanTableSource, LookupTableSource { - private final DorisOptions options; - private final DorisReadOptions readOptions; - private TableSchema physicalSchema; - private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class); + private final DorisOptions options; + private final DorisReadOptions readOptions; + private TableSchema physicalSchema; + private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class); - public DorisDynamicTableSource(DorisOptions options, DorisReadOptions readOptions,TableSchema physicalSchema) { - this.options = options; - this.readOptions = readOptions; - this.physicalSchema = physicalSchema; - } + public DorisDynamicTableSource(DorisOptions options, DorisReadOptions readOptions, TableSchema physicalSchema) { + this.options = options; + this.readOptions = readOptions; + this.physicalSchema = physicalSchema; + } - @Override - public ChangelogMode getChangelogMode() { - // in our example the format decides about the changelog mode - // but it could also be the source itself - return ChangelogMode.insertOnly(); - } + @Override + public ChangelogMode getChangelogMode() { + // in our example the format decides about the changelog mode + // but it could also be the source itself + return ChangelogMode.insertOnly(); + } - @Override - public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { - List<PartitionDefinition> dorisPartitions ; - try { - dorisPartitions = RestService.findPartitions(options,readOptions,LOG); - } catch (DorisException e) { - throw new RuntimeException("can not fetch partitions"); - } - DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder() - .setFenodes(options.getFenodes()) - .setUsername(options.getUsername()) - .setPassword(options.getPassword()) - .setTableIdentifier(options.getTableIdentifier()) - .setPartitions(dorisPartitions) - .setReadOptions(readOptions); - return InputFormatProvider.of(builder.build()); - } + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + List<PartitionDefinition> dorisPartitions; + try { + dorisPartitions = RestService.findPartitions(options, readOptions, LOG); + } catch (DorisException e) { + throw new RuntimeException("can not fetch partitions"); + } + DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder() + .setFenodes(options.getFenodes()) + .setUsername(options.getUsername()) + .setPassword(options.getPassword()) + .setTableIdentifier(options.getTableIdentifier()) + .setPartitions(dorisPartitions) + .setReadOptions(readOptions); + return InputFormatProvider.of(builder.build()); + } - @Override - public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) { - return null; - } + @Override + public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) { + return null; + } - @Override - public DynamicTableSource copy() { - return new DorisDynamicTableSource(options,readOptions,physicalSchema); - } + @Override + public DynamicTableSource copy() { + return new DorisDynamicTableSource(options, readOptions, physicalSchema); + } - @Override - public String asSummaryString() { - return "Doris Table Source"; - } + @Override + public String asSummaryString() { + return "Doris Table Source"; + } } diff --git a/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java index 75e7fc9..c75a88f 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java +++ b/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java @@ -45,183 +45,183 @@ import java.util.List; @Internal public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTableInputSplit> implements ResultTypeQueryable<RowData> { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class); - - private DorisOptions options; - private DorisReadOptions readOptions; - private List<PartitionDefinition> dorisPartitions; - private TypeInformation<RowData> rowDataTypeInfo; - - private ScalaValueReader scalaValueReader; - private transient boolean hasNext; - - public DorisRowDataInputFormat(DorisOptions options,List<PartitionDefinition> dorisPartitions,DorisReadOptions readOptions) { - this.options = options; - this.dorisPartitions = dorisPartitions; - this.readOptions = readOptions; - } - - @Override - public void configure(Configuration parameters) { - //do nothing here - } - - @Override - public void openInputFormat() { - //called once per inputFormat (on open) - } - - @Override - public void closeInputFormat() { - //called once per inputFormat (on close) - } - - /** - * Connects to the source database and executes the query in a <b>parallel - * fashion</b> if - * this {@link InputFormat} is built using a parameterized query (i.e. using - * a {@link PreparedStatement}) - * and a proper {@link }, in a <b>non-parallel - * fashion</b> otherwise. - * - * @param inputSplit which is ignored if this InputFormat is executed as a - * non-parallel source, - * a "hook" to the query parameters otherwise (using its - * <i>splitNumber</i>) - * @throws IOException if there's an error during the execution of the query - */ - @Override - public void open(DorisTableInputSplit inputSplit) throws IOException { - scalaValueReader = new ScalaValueReader(inputSplit.partition, options,readOptions); - hasNext = scalaValueReader.hasNext(); - } - - /** - * Closes all resources used. - * - * @throws IOException Indicates that a resource could not be closed. - */ - @Override - public void close() throws IOException { - - } - - @Override - public TypeInformation<RowData> getProducedType() { - return rowDataTypeInfo; - } - - /** - * Checks whether all data has been read. - * - * @return boolean value indication whether all data has been read. - * @throws IOException - */ - @Override - public boolean reachedEnd() throws IOException { - return !hasNext; - } - - /** - * Stores the next resultSet row in a tuple. - * - * @param reuse row to be reused. - * @return row containing next {@link RowData} - * @throws IOException - */ - @Override - public RowData nextRecord(RowData reuse) throws IOException { - if (!hasNext) { - return null; - } - List next = (List)scalaValueReader.next(); - GenericRowData genericRowData = new GenericRowData(next.size()); - for(int i =0;i<next.size();i++){ - genericRowData.setField(i, next.get(i)); - } - //update hasNext after we've read the record - hasNext = scalaValueReader.hasNext(); - return genericRowData; - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { - return cachedStatistics; - } - - @Override - public DorisTableInputSplit[] createInputSplits(int minNumSplits) throws IOException { - List<DorisTableInputSplit> dorisSplits = new ArrayList<>(); - int splitNum = 0; - for (PartitionDefinition partition : dorisPartitions) { - dorisSplits.add(new DorisTableInputSplit(splitNum++,partition)); - } - LOG.info("DorisTableInputSplit Num:{}",dorisSplits.size()); - return dorisSplits.toArray(new DorisTableInputSplit[0]); - } - - @Override - public InputSplitAssigner getInputSplitAssigner(DorisTableInputSplit[] inputSplits) { - return new DefaultInputSplitAssigner(inputSplits); - } - - /** - * A builder used to set parameters to the output format's configuration in a fluent way. - * - * @return builder - */ - public static Builder builder() { - return new Builder(); - } - - /** - * Builder for {@link DorisRowDataInputFormat}. - */ - public static class Builder { - private DorisOptions.Builder optionsBuilder; - private List<PartitionDefinition> partitions; - private DorisReadOptions readOptions; - - - public Builder() { - this.optionsBuilder = DorisOptions.builder(); - } - - public Builder setFenodes(String fenodes) { - this.optionsBuilder.setFenodes(fenodes); - return this; - } - - public Builder setUsername(String username) { - this.optionsBuilder.setUsername(username); - return this; - } - - public Builder setPassword(String password) { - this.optionsBuilder.setPassword(password); - return this; - } - - public Builder setTableIdentifier(String tableIdentifier) { - this.optionsBuilder.setTableIdentifier(tableIdentifier); - return this; - } - - public Builder setPartitions(List<PartitionDefinition> partitions) { - this.partitions = partitions; - return this; - } - - public Builder setReadOptions(DorisReadOptions readOptions) { - this.readOptions = readOptions; - return this; - } - - public DorisRowDataInputFormat build() { - return new DorisRowDataInputFormat( - optionsBuilder.build(),partitions,readOptions - ); - } - } + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class); + + private DorisOptions options; + private DorisReadOptions readOptions; + private List<PartitionDefinition> dorisPartitions; + private TypeInformation<RowData> rowDataTypeInfo; + + private ScalaValueReader scalaValueReader; + private transient boolean hasNext; + + public DorisRowDataInputFormat(DorisOptions options, List<PartitionDefinition> dorisPartitions, DorisReadOptions readOptions) { + this.options = options; + this.dorisPartitions = dorisPartitions; + this.readOptions = readOptions; + } + + @Override + public void configure(Configuration parameters) { + //do nothing here + } + + @Override + public void openInputFormat() { + //called once per inputFormat (on open) + } + + @Override + public void closeInputFormat() { + //called once per inputFormat (on close) + } + + /** + * Connects to the source database and executes the query in a <b>parallel + * fashion</b> if + * this {@link InputFormat} is built using a parameterized query (i.e. using + * a {@link PreparedStatement}) + * and a proper {@link }, in a <b>non-parallel + * fashion</b> otherwise. + * + * @param inputSplit which is ignored if this InputFormat is executed as a + * non-parallel source, + * a "hook" to the query parameters otherwise (using its + * <i>splitNumber</i>) + * @throws IOException if there's an error during the execution of the query + */ + @Override + public void open(DorisTableInputSplit inputSplit) throws IOException { + scalaValueReader = new ScalaValueReader(inputSplit.partition, options, readOptions); + hasNext = scalaValueReader.hasNext(); + } + + /** + * Closes all resources used. + * + * @throws IOException Indicates that a resource could not be closed. + */ + @Override + public void close() throws IOException { + + } + + @Override + public TypeInformation<RowData> getProducedType() { + return rowDataTypeInfo; + } + + /** + * Checks whether all data has been read. + * + * @return boolean value indication whether all data has been read. + * @throws IOException + */ + @Override + public boolean reachedEnd() throws IOException { + return !hasNext; + } + + /** + * Stores the next resultSet row in a tuple. + * + * @param reuse row to be reused. + * @return row containing next {@link RowData} + * @throws IOException + */ + @Override + public RowData nextRecord(RowData reuse) throws IOException { + if (!hasNext) { + return null; + } + List next = (List) scalaValueReader.next(); + GenericRowData genericRowData = new GenericRowData(next.size()); + for (int i = 0; i < next.size(); i++) { + genericRowData.setField(i, next.get(i)); + } + //update hasNext after we've read the record + hasNext = scalaValueReader.hasNext(); + return genericRowData; + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return cachedStatistics; + } + + @Override + public DorisTableInputSplit[] createInputSplits(int minNumSplits) throws IOException { + List<DorisTableInputSplit> dorisSplits = new ArrayList<>(); + int splitNum = 0; + for (PartitionDefinition partition : dorisPartitions) { + dorisSplits.add(new DorisTableInputSplit(splitNum++, partition)); + } + LOG.info("DorisTableInputSplit Num:{}", dorisSplits.size()); + return dorisSplits.toArray(new DorisTableInputSplit[0]); + } + + @Override + public InputSplitAssigner getInputSplitAssigner(DorisTableInputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + /** + * A builder used to set parameters to the output format's configuration in a fluent way. + * + * @return builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for {@link DorisRowDataInputFormat}. + */ + public static class Builder { + private DorisOptions.Builder optionsBuilder; + private List<PartitionDefinition> partitions; + private DorisReadOptions readOptions; + + + public Builder() { + this.optionsBuilder = DorisOptions.builder(); + } + + public Builder setFenodes(String fenodes) { + this.optionsBuilder.setFenodes(fenodes); + return this; + } + + public Builder setUsername(String username) { + this.optionsBuilder.setUsername(username); + return this; + } + + public Builder setPassword(String password) { + this.optionsBuilder.setPassword(password); + return this; + } + + public Builder setTableIdentifier(String tableIdentifier) { + this.optionsBuilder.setTableIdentifier(tableIdentifier); + return this; + } + + public Builder setPartitions(List<PartitionDefinition> partitions) { + this.partitions = partitions; + return this; + } + + public Builder setReadOptions(DorisReadOptions readOptions) { + this.readOptions = readOptions; + return this; + } + + public DorisRowDataInputFormat build() { + return new DorisRowDataInputFormat( + optionsBuilder.build(), partitions, readOptions + ); + } + } } diff --git a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java index ef16f33..b9a7708 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.table; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.exception.StreamLoadException; import org.apache.doris.flink.rest.models.RespContent; import org.slf4j.Logger; @@ -31,17 +32,21 @@ import java.io.Serializable; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.Calendar; +import java.util.Date; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.UUID; /** * DorisStreamLoad **/ -public class DorisStreamLoad implements Serializable{ +public class DorisStreamLoad implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); @@ -54,8 +59,9 @@ public class DorisStreamLoad implements Serializable{ private String db; private String tbl; private String authEncoding; + private Properties streamLoadProp; - public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) { + public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd, Properties streamLoadProp) { this.hostPort = hostPort; this.db = db; this.tbl = tbl; @@ -63,6 +69,7 @@ public class DorisStreamLoad implements Serializable{ this.passwd = passwd; this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl); this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); + this.streamLoadProp = streamLoadProp; } public String getLoadUrlStr() { @@ -89,6 +96,9 @@ public class DorisStreamLoad implements Serializable{ conn.addRequestProperty("Expect", "100-continue"); conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8"); conn.addRequestProperty("label", label); + for (Map.Entry<Object, Object> entry : streamLoadProp.entrySet()) { + conn.addRequestProperty(String.valueOf(entry.getKey()), String.valueOf(entry.getValue())); + } conn.setDoOutput(true); conn.setDoInput(true); return conn; @@ -104,6 +114,7 @@ public class DorisStreamLoad implements Serializable{ this.respMsg = respMsg; this.respContent = respContent; } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -116,14 +127,14 @@ public class DorisStreamLoad implements Serializable{ public void load(String value) throws StreamLoadException { LoadResponse loadResponse = loadBatch(value); - LOG.info("Streamload Response:{}",loadResponse); - if(loadResponse.status != 200){ + LOG.info("Streamload Response:{}", loadResponse); + if (loadResponse.status != 200) { throw new StreamLoadException("stream load error: " + loadResponse.respContent); - }else{ + } else { ObjectMapper obj = new ObjectMapper(); try { RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class); - if(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())){ + if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { throw new StreamLoadException("stream load error: " + respContent.getMessage()); } } catch (IOException e) { @@ -133,11 +144,13 @@ public class DorisStreamLoad implements Serializable{ } private LoadResponse loadBatch(String value) { - Calendar calendar = Calendar.getInstance(); - String label = String.format("flink_connector_%s%02d%02d_%02d%02d%02d_%s", - calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), - calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), - UUID.randomUUID().toString().replaceAll("-", "")); + String label = streamLoadProp.getProperty("label"); + if (StringUtils.isBlank(label)) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd_HHmmss"); + String formatDate = sdf.format(new Date()); + label = String.format("flink_connector_%s_%s",formatDate, + UUID.randomUUID().toString().replaceAll("-", "")); + } HttpURLConnection feConn = null; HttpURLConnection beConn = null; diff --git a/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java b/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java index f245dac..5e81cc1 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java +++ b/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java @@ -22,14 +22,16 @@ import org.apache.flink.core.io.InputSplit; /** * DorisTableInputSplit **/ -public class DorisTableInputSplit implements InputSplit, java.io.Serializable{ +public class DorisTableInputSplit implements InputSplit, java.io.Serializable { - /** The number of the split. */ + /** + * The number of the split. + */ private final int splitNumber; protected final PartitionDefinition partition; - public DorisTableInputSplit(int splitNumber,PartitionDefinition partition) { + public DorisTableInputSplit(int splitNumber, PartitionDefinition partition) { super(); this.splitNumber = splitNumber; this.partition = partition; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org