[doris-flink-connector] branch schemachange-1.14 created (now 81952c0)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch schemachange-1.14 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git at 81952c0 [optimization] add disable 2pc config (#34) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-flink-connector] branch master updated: [improvement] fix uniq delete operation (#63)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 4d07fb6 [improvement] fix uniq delete operation (#63) 4d07fb6 is described below commit 4d07fb6f7804fb4fe7522e69cac06f3b99e3e7de Author: wudi <676366...@qq.com> AuthorDate: Mon Sep 26 09:40:12 2022 +0800 [improvement] fix uniq delete operation (#63) fix uniq delete operation --- .../src/main/java/org/apache/doris/flink/rest/RestService.java | 2 +- .../main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index 2f93f95..21900b4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -440,7 +440,7 @@ public class RestService implements Serializable { public static boolean isUniqueKeyType(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisRuntimeException { try { -return "UNIQUE_KEYS_TYPE".equals(getSchema(options, readOptions, logger).getKeysType()); +return UNIQUE_KEYS_TYPE.equals(getSchema(options, readOptions, logger).getKeysType()); } catch (Exception e) { throw new DorisRuntimeException(e); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index 53d727f..572e09c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -77,7 +77,7 @@ public class DorisDynamicTableSink implements DynamicTableSink { @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { Properties loadProperties = executionOptions.getStreamLoadProp(); -boolean deletable = RestService.isUniqueKeyType(options, readOptions, LOG) || executionOptions.getDeletable(); +boolean deletable = RestService.isUniqueKeyType(options, readOptions, LOG) && executionOptions.getDeletable(); if (!loadProperties.containsKey(COLUMNS_KEY)) { String[] fieldNames = tableSchema.getFieldNames(); Preconditions.checkState(fieldNames != null && fieldNames.length > 0); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-flink-connector] branch master updated: [Enhancement] ADD RowSerializer for doris flink connector (#71)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 8d7b7b7 [Enhancement] ADD RowSerializer for doris flink connector (#71) 8d7b7b7 is described below commit 8d7b7b7db59f142cd19683e91c686cc2ed96008d Author: DingGeGe <109070189+dinggege1...@users.noreply.github.com> AuthorDate: Mon Oct 10 16:44:19 2022 +0800 [Enhancement] ADD RowSerializer for doris flink connector (#71) * [Enhancement] ADD RowSerializer for doris flink connector --- .../doris/flink/sink/writer/RowSerializer.java | 107 + .../doris/flink/sink/writer/TestRowSerializer.java | 97 +++ 2 files changed, 204 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java new file mode 100644 index 000..3a07951 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowSerializer.java @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.RowRowConverter; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; +import java.io.IOException; +import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; +import static org.apache.doris.flink.sink.writer.LoadConstants.JSON; + +/** + * Serializer for {@link Row}. + * Quick way to support RowSerializer on existing code + * TODO: support original Doris to Row serializer + */ +public class RowSerializer implements DorisRecordSerializer { +/** + * converter {@link Row} to {@link RowData} + */ +private final RowRowConverter rowRowConverter; +private final RowDataSerializer rowDataSerializer; + +private RowSerializer(String[] fieldNames, DataType[] dataTypes, String type, String fieldDelimiter, + boolean enableDelete) { +this.rowRowConverter = RowRowConverter.create(DataTypes.ROW(dataTypes)); +this.rowDataSerializer = RowDataSerializer.builder() +.setFieldNames(fieldNames) +.setFieldType(dataTypes) +.setType(type) +.setFieldDelimiter(fieldDelimiter) +.enableDelete(enableDelete) +.build(); +} + +@Override +public byte[] serialize(Row record) throws IOException{ +RowData rowDataRecord = this.rowRowConverter.toInternal(record); +return this.rowDataSerializer.serialize(rowDataRecord); +} + +public static Builder builder() { +return new Builder(); +} + +/** + * Builder for RowSerializer. + */ +public static class Builder { +private String[] fieldNames; +private DataType[] dataTypes; +private String type; +private String fieldDelimiter; +private boolean deletable; + +public Builder setFieldNames(String[] fieldNames) { +this.fieldNames = fieldNames; +return this; +} + +public Builder setFieldType(DataType[] dataTypes) { +this.dataTypes = dataTypes; +return this; +} + +public Builder setType(String type) { +this.type = type; +return this; +} + +public Builder setFieldDelimiter(String fieldDelimiter) { +this.fieldDelimiter = fieldDelimiter; +return this; +} + +public Builder enableDelete(boolean deletable) { +this.deletable = deletable; +return this; +} + +public RowSerializer build() { +Preconditions.checkState(CSV.equals(type) && fieldDelimiter != null || JSON.equals(type)); +
[doris-flink-connector] branch master updated: [Feature]Support 'sink.parallelism' configuration (#72)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 7bba8aa [Feature]Support 'sink.parallelism' configuration (#72) 7bba8aa is described below commit 7bba8aa4e69650ff61752c06a8aadaa26b0f12ad Author: huyuanfeng2018 <731030...@qq.com> AuthorDate: Mon Oct 17 10:10:54 2022 +0800 [Feature]Support 'sink.parallelism' configuration (#72) Co-authored-by: huyuanfeng --- .../org/apache/doris/flink/catalog/DorisCatalogFactory.java | 2 ++ .../org/apache/doris/flink/table/DorisConfigOptions.java| 3 +++ .../apache/doris/flink/table/DorisDynamicTableFactory.java | 7 ++- .../org/apache/doris/flink/table/DorisDynamicTableSink.java | 13 - 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java index f23f0dc..9a80370 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java @@ -48,6 +48,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM; import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API; import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER; @@ -102,6 +103,7 @@ public class DorisCatalogFactory implements CatalogFactory { options.add(SINK_LABEL_PREFIX); options.add(SINK_BUFFER_SIZE); options.add(SINK_BUFFER_COUNT); +options.add(SINK_PARALLELISM); options.add(SOURCE_USE_OLD_API); return options; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index 02a4d22..391df7d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -19,6 +19,7 @@ package org.apache.doris.flink.table; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.factories.FactoryUtil; import java.time.Duration; @@ -163,6 +164,8 @@ public class DorisConfigOptions { .defaultValue(true) .withDescription("whether to enable the delete function"); +public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; + // Prefix for Doris StreamLoad specific properties. public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties."; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 5a11605..521724b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -60,6 +60,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_2PC; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM; import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API; import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER; @@ -117,6 +118,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory options.add(SINK_LABEL_PREFIX); options.add(SINK_BUFFER_SIZE); options.add(SINK_BUFFER_COUNT); +options.add(SINK_PARALLELISM); options.add(SOURCE_USE_OLD_API); return options; @@ -213,6 +215,8 @@ public final class DorisDynamicTableFactory implements Dy
[doris] branch master updated: improve the outfile doc (#13569)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 6ef891870f improve the outfile doc (#13569) 6ef891870f is described below commit 6ef891870fb87edb3bdb43a1c5d3ce0b7f60dc0b Author: zy-kkk AuthorDate: Sat Oct 22 23:21:12 2022 +0800 improve the outfile doc (#13569) --- .../sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md| 2 +- .../sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md index 676949581f..65410eeba6 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md @@ -206,7 +206,7 @@ illustrate: Verify on cos 1. A path that does not exist will be automatically created - 2. Access.key/secret.key/endpoint needs to be confirmed with cos students. Especially the value of endpoint does not need to fill in bucket_name. + 2. Access.key/secret.key/endpoint needs to be confirmed with students of cos. Especially the value of endpoint does not need to fill in bucket_name. 6. Use the s3 protocol to export to bos, and enable concurrent export. diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md index 026d83321c..7afc83093e 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md @@ -204,7 +204,7 @@ INTO OUTFILE "file_path" 在cos上验证 1. 不存在的path会自动创建 -2. access.key/secret.key/endpoint需要和cos的同学确认。尤其是endpoint的值,不需要填写bucket_name。 +2. access.key/secret.key/endpoint需要和cos的同学确认。尤其是endpoint的值,不需要填写bucket_name。 6. 使用 s3 协议导出到 bos,并且并发导出开启。 - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated: fix english number of tpch (#16116)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 956070e17f fix english number of tpch (#16116) 956070e17f is described below commit 956070e17ff4c34bdada02132a309c1feb8689a2 Author: lsy3993 <110876560+lsy3...@users.noreply.github.com> AuthorDate: Fri Jan 20 17:27:10 2023 +0800 fix english number of tpch (#16116) --- docs/en/docs/benchmark/tpch.md | 20 ++-- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/en/docs/benchmark/tpch.md b/docs/en/docs/benchmark/tpch.md index 33ac84becf..e5923a4bec 100644 --- a/docs/en/docs/benchmark/tpch.md +++ b/docs/en/docs/benchmark/tpch.md @@ -58,16 +58,16 @@ On 22 queries on the TPC-H standard test data set, we conducted a comparison tes The TPCH 100G data generated by the simulation of the entire test are respectively imported into Apache Doris 1.2.0-rc01, Apache Doris 1.1.3 and Apache Doris 0.15.0 RC04 for testing. The following is the relevant description and data volume of the table. -| TPC-H Table Name | Rows| Size after Import | Annotation | -| :--- | :-- | -- | :- | -| REGION | 5 | 400KB | Region | -| NATION | 25 | 7.714 KB | Nation | -| SUPPLIER | 1,000,000 | 85.528 MB | Supplier | -| PART | 20,000,000 | 752.330 MB | Parts | -| PARTSUPP | 20,000,000 | 4.375 GB | Parts Supply | -| CUSTOMER | 15,000,000 | 1.317 GB | Customer| -| ORDERS | 1,500,000,000 | 6.301 GB | Orders| -| LINEITEM | 6,000,000,000 | 20.882 GB | Order Details | +| TPC-H Table Name | Rows | Size after Import | Annotation | +| :--- |:--| -- | :- | +| REGION | 5 | 400KB | Region | +| NATION | 25| 7.714 KB | Nation | +| SUPPLIER | 1,000,000 | 85.528 MB | Supplier | +| PART | 20,000,000| 752.330 MB | Parts | +| PARTSUPP | 20,000,000| 4.375 GB | Parts Supply | +| CUSTOMER | 15,000,000| 1.317 GB | Customer| +| ORDERS | 1,50,000,000 | 6.301 GB | Orders| +| LINEITEM | 6,00,000,000 | 20.882 GB | Order Details | ## 4. Test SQL - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-spark-connector] branch master updated: [improve]Return specific error information after optimizing the exception (#64)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 97d [improve]Return specific error information after optimizing the exception (#64) 97d is described below commit 97d153f57db4b5616b36c18a7711e5ebcbe4 Author: caoliang-web <71004656+caoliang-...@users.noreply.github.com> AuthorDate: Sat Jan 28 10:17:34 2023 +0800 [improve]Return specific error information after optimizing the exception (#64) * Return specific error information after optimizing the exception --- .../org/apache/doris/spark/DorisStreamLoad.java| 49 +++--- .../doris/spark/sql/TestSparkConnector.scala | 4 +- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index 25ed7b1..351ef23 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -50,7 +50,7 @@ import java.util.concurrent.TimeUnit; /** * DorisStreamLoad **/ -public class DorisStreamLoad implements Serializable{ +public class DorisStreamLoad implements Serializable { public static final String FIELD_DELIMITER = "\t"; public static final String LINE_DELIMITER = "\n"; public static final String NULL_VALUE = "\\N"; @@ -68,7 +68,7 @@ public class DorisStreamLoad implements Serializable{ private String columns; private String[] dfColumns; private String maxFilterRatio; -private Map streamLoadProp; +private Map streamLoadProp; private static final long cacheExpireTimeout = 4 * 60; private LoadingCache> cache; @@ -91,7 +91,7 @@ public class DorisStreamLoad implements Serializable{ this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS); this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO); -this.streamLoadProp=getStreamLoadProp(settings); +this.streamLoadProp = getStreamLoadProp(settings); cache = CacheBuilder.newBuilder() .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES) .build(new BackendCacheLoader(settings)); @@ -110,7 +110,7 @@ public class DorisStreamLoad implements Serializable{ this.dfColumns = dfColumns; this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO); -this.streamLoadProp=getStreamLoadProp(settings); +this.streamLoadProp = getStreamLoadProp(settings); cache = CacheBuilder.newBuilder() .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES) .build(new BackendCacheLoader(settings)); @@ -142,15 +142,15 @@ public class DorisStreamLoad implements Serializable{ conn.setDoOutput(true); conn.setDoInput(true); -if(streamLoadProp != null ){ -streamLoadProp.forEach((k,v) -> { -if(streamLoadProp.containsKey("format")){ +if (streamLoadProp != null) { +streamLoadProp.forEach((k, v) -> { +if (streamLoadProp.containsKey("format")) { return; } -if(streamLoadProp.containsKey("strip_outer_array")) { +if (streamLoadProp.containsKey("strip_outer_array")) { return; } -if(streamLoadProp.containsKey("read_json_by_line")){ +if (streamLoadProp.containsKey("read_json_by_line")) { return; } conn.addRequestProperty(k, v); @@ -171,6 +171,7 @@ public class DorisStreamLoad implements Serializable{ this.respMsg = respMsg; this.respContent = respContent; } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -199,10 +200,10 @@ public class DorisStreamLoad implements Serializable{ public void loadV2(List> rows) throws StreamLoadException, JsonProcessingException { -List> dataList = new ArrayList<>(); +List> dataList = new ArrayList<>(); try { for (List row : rows) { -Map dataMap = new HashMap<>(); +Map dataMap = new HashMap<>(); if (dfColumns.length == row.size()) { for (int i = 0; i < dfColumns.length; i++) { dataMap.put(dfColumns[i], row.get(i)); @@ -222,18 +223,18 @@ public class D
[doris-spark-connector] branch master updated: Update doris build image (#58)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 5eb68a1 Update doris build image (#58) 5eb68a1 is described below commit 5eb68a110fd020b93a5522573313293219d8c5d7 Author: jiachuan.zhu AuthorDate: Sat Jan 28 10:18:23 2023 +0800 Update doris build image (#58) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b93eecf..70c0a77 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ More information about compilation and usage, please visit [Spark Doris Connecto 1. download and compile Spark Doris Connector from https://github.com/apache/doris-spark-connector, we suggest compile Spark Doris Connector by Doris offfcial image。 ```bash -$ docker pull apache/incubator-doris:build-env-ldb-toolchain-latest +$ docker pull apache/doris:build-env-ldb-toolchain-latest ``` 2. the result of compile jar is like:spark-doris-connector-3.1_2.12-1.0.0-SNAPSHOT.jar - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-spark-connector] branch master updated: [improvement] Adapt to the load format of csv (#65)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 0bb76ed [improvement] Adapt to the load format of csv (#65) 0bb76ed is described below commit 0bb76ed0480398583b2541596fece161ea402984 Author: Hong Liu <844981...@qq.com> AuthorDate: Sat Jan 28 11:16:40 2023 +0800 [improvement] Adapt to the load format of csv (#65) Co-authored-by: smallhibiscus <844981280> --- .../org/apache/doris/spark/DorisStreamLoad.java| 56 ++ .../doris/spark/sql/DorisSourceProvider.scala | 2 +- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index 351ef23..3ada398 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -50,10 +50,10 @@ import java.util.concurrent.TimeUnit; /** * DorisStreamLoad **/ -public class DorisStreamLoad implements Serializable { -public static final String FIELD_DELIMITER = "\t"; -public static final String LINE_DELIMITER = "\n"; -public static final String NULL_VALUE = "\\N"; +public class DorisStreamLoad implements Serializable{ +private String FIELD_DELIMITER; +private String LINE_DELIMITER; +private String NULL_VALUE = "\\N"; private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); @@ -71,6 +71,7 @@ public class DorisStreamLoad implements Serializable { private Map streamLoadProp; private static final long cacheExpireTimeout = 4 * 60; private LoadingCache> cache; +private String fileType; public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) { this.db = db; @@ -114,6 +115,11 @@ public class DorisStreamLoad implements Serializable { cache = CacheBuilder.newBuilder() .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES) .build(new BackendCacheLoader(settings)); +fileType = this.streamLoadProp.get("format") == null ? "csv" : this.streamLoadProp.get("format"); +if (fileType.equals("csv")){ +FIELD_DELIMITER = this.streamLoadProp.get("column_separator") == null ? "\t" : this.streamLoadProp.get("column_separator"); +LINE_DELIMITER = this.streamLoadProp.get("line_delimiter") == null ? "\n" : this.streamLoadProp.get("line_delimiter"); +} } public String getLoadUrlStr() { @@ -156,8 +162,10 @@ public class DorisStreamLoad implements Serializable { conn.addRequestProperty(k, v); }); } -conn.addRequestProperty("format", "json"); -conn.addRequestProperty("strip_outer_array", "true"); +if (fileType.equals("json")){ +conn.addRequestProperty("format", "json"); +conn.addRequestProperty("strip_outer_array", "true"); +} return conn; } @@ -200,24 +208,30 @@ public class DorisStreamLoad implements Serializable { public void loadV2(List> rows) throws StreamLoadException, JsonProcessingException { -List> dataList = new ArrayList<>(); -try { -for (List row : rows) { -Map dataMap = new HashMap<>(); -if (dfColumns.length == row.size()) { -for (int i = 0; i < dfColumns.length; i++) { -dataMap.put(dfColumns[i], row.get(i)); +if (fileType.equals("csv")) { +load(listToString(rows)); +} else if(fileType.equals("json")) { +List> dataList = new ArrayList<>(); +try { +for (List row : rows) { +Map dataMap = new HashMap<>(); +if (dfColumns.length == row.size()) { +for (int i = 0; i < dfColumns.length; i++) { +dataMap.put(dfColumns[i], row.get(i)); +} } +dataList.add(dataMap); } -dataList.add(dataMap); +} catch (Exception e) { +throw new StreamLoadException("The number of configured columns does not match the number of data columns."); } -} catch (Exception e) { -throw new StreamLoadException("
[doris-spark-connector] branch master updated: [feature] support datev2/datatimev2/decimalv3 type (#66)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new b189e5e [feature] support datev2/datatimev2/decimalv3 type (#66) b189e5e is described below commit b189e5eb7d9194c628b42ebdcc8d699ec327e63a Author: gnehil AuthorDate: Sat Jan 28 12:00:48 2023 +0800 [feature] support datev2/datatimev2/decimalv3 type (#66) * [feature] support datev2/datatimev2/decimalv3 type * remove debug code --- .../java/org/apache/doris/spark/serialization/RowBatch.java | 5 + .../main/scala/org/apache/doris/spark/sql/SchemaUtils.scala | 5 + spark-doris-connector/src/main/thrift/doris/Types.thrift| 13 +++-- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index bcc76d5..f04cc5d 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -239,6 +239,9 @@ public class RowBatch { } break; case "DECIMALV2": +case "DECIMAL32": +case "DECIMAL64": +case "DECIMAL128I": Preconditions.checkArgument(mt.equals(Types.MinorType.DECIMAL), typeMismatchMessage(currentType, mt)); DecimalVector decimalVector = (DecimalVector) curFieldVector; @@ -252,7 +255,9 @@ public class RowBatch { } break; case "DATE": +case "DATEV2": case "DATETIME": +case "DATETIMEV2": case "LARGEINT": case "CHAR": case "VARCHAR": diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index 26d5c82..2caf162 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -97,13 +97,18 @@ private[spark] object SchemaUtils { case "FLOAT" => DataTypes.FloatType case "DOUBLE" => DataTypes.DoubleType case "DATE"=> DataTypes.StringType + case "DATEV2" => DataTypes.StringType case "DATETIME"=> DataTypes.StringType + case "DATETIMEV2" => DataTypes.StringType case "BINARY" => DataTypes.BinaryType case "DECIMAL" => DecimalType(precision, scale) case "CHAR"=> DataTypes.StringType case "LARGEINT"=> DataTypes.StringType case "VARCHAR" => DataTypes.StringType case "DECIMALV2" => DecimalType(precision, scale) + case "DECIMAL32" => DecimalType(precision, scale) + case "DECIMAL64" => DecimalType(precision, scale) + case "DECIMAL128I" => DecimalType(precision, scale) case "TIME"=> DataTypes.DoubleType case "STRING" => DataTypes.StringType case "HLL" => diff --git a/spark-doris-connector/src/main/thrift/doris/Types.thrift b/spark-doris-connector/src/main/thrift/doris/Types.thrift index 44ce606..a30afde 100644 --- a/spark-doris-connector/src/main/thrift/doris/Types.thrift +++ b/spark-doris-connector/src/main/thrift/doris/Types.thrift @@ -66,7 +66,7 @@ enum TPrimitiveType { DATE, DATETIME, BINARY, - DECIMAL, + DECIMAL_DEPRACTED, // not used now, only for place holder // CHAR(n). Currently only supported in UDAs CHAR, LARGEINT, @@ -79,7 +79,16 @@ enum TPrimitiveType { MAP, STRUCT, STRING, - ALL + ALL, + QUANTILE_STATE, + DATEV2, + DATETIMEV2, + TIMEV2, + DECIMAL32, + DECIMAL64, + DECIMAL128I, + JSONB, + UNSUPPORTED } enum TTypeNodeType { - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated: fix date_sub function doc (#16250)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 8b2aa9f3a2 fix date_sub function doc (#16250) 8b2aa9f3a2 is described below commit 8b2aa9f3a227c5977b77904a9a79f03a09366c41 Author: yongkang.zhong AuthorDate: Mon Jan 30 21:53:45 2023 +0800 fix date_sub function doc (#16250) --- docs/en/docs/sql-manual/sql-functions/date-time-functions/date_sub.md | 2 +- .../zh-CN/docs/sql-manual/sql-functions/date-time-functions/date_sub.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-functions/date-time-functions/date_sub.md b/docs/en/docs/sql-manual/sql-functions/date-time-functions/date_sub.md index 8f1b4459c8..db90993e21 100644 --- a/docs/en/docs/sql-manual/sql-functions/date-time-functions/date_sub.md +++ b/docs/en/docs/sql-manual/sql-functions/date-time-functions/date_sub.md @@ -28,7 +28,7 @@ under the License. ### Description Syntax -`INT DATE_SUB(DATETIME date,INTERVAL expr type)` +`DATETIME DATE_SUB(DATETIME date,INTERVAL expr type)` Subtract the specified time interval from the date diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/date-time-functions/date_sub.md b/docs/zh-CN/docs/sql-manual/sql-functions/date-time-functions/date_sub.md index 6ca944be36..53a8c89dd2 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/date-time-functions/date_sub.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/date-time-functions/date_sub.md @@ -28,7 +28,7 @@ under the License. ### description Syntax -`INT DATE_SUB(DATETIME date,INTERVAL expr type)` +`DATETIME DATE_SUB(DATETIME date,INTERVAL expr type)` 从日期减去指定的时间间隔 - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-flink-connector] branch master updated: [Feature](Type)add new type of doris-flink-connector (#98)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new a979fb3 [Feature](Type)add new type of doris-flink-connector (#98) a979fb3 is described below commit a979fb3932518717cfe6e90e4c33274cb7ff396a Author: lsy3993 <110876560+lsy3...@users.noreply.github.com> AuthorDate: Tue Jan 31 16:44:00 2023 +0800 [Feature](Type)add new type of doris-flink-connector (#98) --- .../apache/doris/flink/catalog/DorisTypeMapper.java | 12 .../apache/doris/flink/serialization/RowBatch.java| 19 +++ .../src/main/thrift/doris/Types.thrift| 13 +++-- 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java index f7a16ce..4805704 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java @@ -28,6 +28,10 @@ public class DorisTypeMapper { private static final String DORIS_BIGINT = "BIGINT"; private static final String DORIS_LARGEINT = "BIGINT UNSIGNED"; private static final String DORIS_DECIMAL = "DECIMAL"; +private static final String DORIS_DECIMALV2 = "DECIMALV2"; +private static final String DORIS_DECIMAL32 = "DECIMAL32"; +private static final String DORIS_DECIMAL64 = "DECIMAL64"; +private static final String DORIS_DECIMAL128I = "DECIMAL128I"; private static final String DORIS_FLOAT = "FLOAT"; private static final String DORIS_DOUBLE = "DOUBLE"; @@ -39,7 +43,9 @@ public class DorisTypeMapper { // --time- private static final String DORIS_DATE = "DATE"; +private static final String DORIS_DATEV2 = "DATEV2"; private static final String DORIS_DATETIME = "DATETIME"; +private static final String DORIS_DATETIMEV2 = "DATETIMEV2"; //--bool private static final String DORIS_BOOLEAN = "BOOLEAN"; @@ -64,6 +70,10 @@ public class DorisTypeMapper { case DORIS_BIGINT: return DataTypes.BIGINT(); case DORIS_DECIMAL: +case DORIS_DECIMALV2: +case DORIS_DECIMAL32: +case DORIS_DECIMAL64: +case DORIS_DECIMAL128I: return DataTypes.DECIMAL(precision, scale); case DORIS_FLOAT: return DataTypes.FLOAT(); @@ -77,8 +87,10 @@ public class DorisTypeMapper { case DORIS_TEXT: return DataTypes.STRING(); case DORIS_DATE: +case DORIS_DATEV2: return DataTypes.DATE(); case DORIS_DATETIME: +case DORIS_DATETIMEV2: return DataTypes.TIMESTAMP(0); default: throw new UnsupportedOperationException( diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index e83300a..1cd9fdc 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -85,6 +85,7 @@ public class RowBatch { private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss"); private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("-MM-dd"); +private final DateTimeFormatter dateTimeV2Formatter = DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss.SS"); public List getRowBatch() { return rowBatch; @@ -237,6 +238,9 @@ public class RowBatch { break; case "DECIMAL": case "DECIMALV2": +case "DECIMAL32": +case "DECIMAL64": +case "DECIMAL128I": Preconditions.checkArgument(mt.equals(Types.MinorType.DECIMAL), typeMismatchMessage(currentType, mt)); DecimalVector decimalVector = (DecimalVector) curFieldVector; @@ -250,6 +254,7 @@ public class RowBatch { } break; case "DATE": +c
[doris] branch master updated: [fix](hive-udf) delete Logger to avoid Kryo serialize problem. (#25312)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new b83e4126237 [fix](hive-udf) delete Logger to avoid Kryo serialize problem. (#25312) b83e4126237 is described below commit b83e4126237016e3de7b2ede3e310754c6a39355 Author: yagagagaga AuthorDate: Mon Oct 16 16:10:06 2023 +0800 [fix](hive-udf) delete Logger to avoid Kryo serialize problem. (#25312) --- fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapAndUDF.java | 4 fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapCountUDF.java | 4 fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapOrUDF.java| 4 fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapXorUDF.java | 4 4 files changed, 16 deletions(-) diff --git a/fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapAndUDF.java b/fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapAndUDF.java index cc84c5afafc..8b5044b51f3 100644 --- a/fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapAndUDF.java +++ b/fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapAndUDF.java @@ -27,15 +27,12 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.io.IOException; @Description(name = "bitmap_and", value = "a _FUNC_ b - Compute intersection of two or more input bitmaps," + " return the new bitmap") public class BitmapAndUDF extends GenericUDF { -private static final Logger LOG = LogManager.getLogger(BitmapAndUDF.class); private transient BinaryObjectInspector inputOI0; private transient BinaryObjectInspector inputOI1; @@ -68,7 +65,6 @@ public class BitmapAndUDF extends GenericUDF { bitmapValue0.and(bitmapValue1); return BitmapValueUtil.serializeToBytes(bitmapValue1); } catch (IOException ioException) { -LOG.warn("", ioException); throw new RuntimeException(ioException); } } diff --git a/fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapCountUDF.java b/fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapCountUDF.java index 28475610ec8..408c6c1d470 100644 --- a/fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapCountUDF.java +++ b/fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapCountUDF.java @@ -27,15 +27,12 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.io.IOException; @Description(name = "bitmap_count", value = "a _FUNC_ b - Returns the number of distinct integers" + " added to the bitmap (e.g., number of bits set)") public class BitmapCountUDF extends GenericUDF { -private static final Logger LOG = LogManager.getLogger(BitmapCountUDF.class); private transient BinaryObjectInspector inputOI; @Override @@ -62,7 +59,6 @@ public class BitmapCountUDF extends GenericUDF { BitmapValue bitmapValue = BitmapValueUtil.deserializeToBitmap(inputBytes); return bitmapValue.cardinality(); } catch (IOException ioException) { -LOG.warn("", ioException); throw new HiveException(ioException); } } diff --git a/fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapOrUDF.java b/fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapOrUDF.java index 197e137946e..9ed1446d3fb 100644 --- a/fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapOrUDF.java +++ b/fe/hive-udf/src/main/java/org/apache/doris/udf/BitmapOrUDF.java @@ -27,15 +27,12 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.io.IOException; @Description(name = "bitmap_or", value = "a _FUNC_ b - Compute" + " union of two or more input bitmaps, returns the new bitmap") public class BitmapOrUDF extends Gener
[doris] branch master updated: [typo](docs) Fix some ambiguous descriptions (#23912)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 5eff36417a1 [typo](docs) Fix some ambiguous descriptions (#23912) 5eff36417a1 is described below commit 5eff36417a1766612c7097aedf2b4063debda35f Author: yagagagaga AuthorDate: Mon Oct 16 16:44:11 2023 +0800 [typo](docs) Fix some ambiguous descriptions (#23912) --- docs/en/docs/data-operate/import/import-way/broker-load-manual.md | 6 -- docs/zh-CN/docs/admin-manual/config/fe-config.md| 2 +- .../zh-CN/docs/data-operate/import/import-way/broker-load-manual.md | 6 -- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/docs/en/docs/data-operate/import/import-way/broker-load-manual.md b/docs/en/docs/data-operate/import/import-way/broker-load-manual.md index 401a1906425..c059c05c68f 100644 --- a/docs/en/docs/data-operate/import/import-way/broker-load-manual.md +++ b/docs/en/docs/data-operate/import/import-way/broker-load-manual.md @@ -351,7 +351,8 @@ Only the case of a single BE is discussed here. If the user cluster has multiple For example, for a 100G file, the number of BEs in the cluster is 10 max_broker_concurrency = 10 - max_bytes_per_broker_scanner >= 10G = 100G / 10 + # >= 10G = 100G / 10 + max_bytes_per_broker_scanner = 1069547520 After modification, all BEs will process the import task concurrently, each BE processing part of the original file. @@ -364,7 +365,8 @@ Only the case of a single BE is discussed here. If the user cluster has multiple The amount of data processed by a single BE of the current import task / the slowest import speed of the user Doris cluster (MB/s) >= the timeout time of the current import task >= the amount of data processed by a single BE of the current import task / 10M/s For example, for a 100G file, the number of BEs in the cluster is 10 - timeout >= 1000s = 10G / 10M/s + # >= 1000s = 10G / 10M/s + timeout = 1000 3. When the user finds that the timeout time calculated in the second step exceeds the default import timeout time of 4 hours diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index 80bea30ef98..f5515588e55 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -1159,7 +1159,7 @@ current running txns on db xxx is xx, larger than limit xx `max_bytes_per_broker_scanner` -默认值:500 * 1024 * 1024 * 1024L (500G) +默认值:`500 * 1024 * 1024 * 1024L` (500G) 是否可以动态配置:true diff --git a/docs/zh-CN/docs/data-operate/import/import-way/broker-load-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/broker-load-manual.md index cd85958e29c..e0a26843360 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/broker-load-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/broker-load-manual.md @@ -351,7 +351,8 @@ Broker Load 需要借助 Broker 进程访问远端存储,不同的 Broker 需 比如一个 100G 的文件,集群的 BE 个数为 10 个 max_broker_concurrency = 10 - max_bytes_per_broker_scanner >= 10G = 100G / 10 + # >= 10G = 100G / 10 + max_bytes_per_broker_scanner = 1069547520 ``` 修改后,所有的 BE 会并发的处理导入任务,每个 BE 处理原始文件的一部分。 @@ -364,7 +365,8 @@ Broker Load 需要借助 Broker 进程访问远端存储,不同的 Broker 需 当前导入任务单个 BE 处理的数据量 / 用户 Doris 集群最慢导入速度(MB/s) >= 当前导入任务的 timeout 时间 >= 当前导入任务单个 BE 处理的数据量 / 10M/s 比如一个 100G 的文件,集群的 BE 个数为 10个 - timeout >= 1000s = 10G / 10M/s + # >= 1000s = 10G / 10M/s + timeout = 1000 ``` 3. 当用户发现第二步计算出的 timeout 时间超过系统默认的导入最大超时时间 4小时 - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated (5eff36417a1 -> 4c42f3b783e)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from 5eff36417a1 [typo](docs) Fix some ambiguous descriptions (#23912) add 4c42f3b783e [Improvement](hive-udf)(doc) minimize hive-udf and add some docs. (#24786) No new revisions were added by this update. Summary of changes: docs/en/docs/ecosystem/hive-bitmap-udf.md| 56 - docs/zh-CN/docs/ecosystem/hive-bitmap-udf.md | 56 - fe/hive-udf/pom.xml | 75 3 files changed, 163 insertions(+), 24 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-flink-connector] branch master updated: [improve] update flink cdc pom.xml (#209)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 3946599 [improve] update flink cdc pom.xml (#209) 3946599 is described below commit 39465998add63518c2c86a0f55e7385f028e9acc Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Mon Oct 16 22:47:51 2023 -0500 [improve] update flink cdc pom.xml (#209) --- flink-doris-connector/pom.xml | 9 + 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index e52a07a..c96a833 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -70,6 +70,7 @@ under the License. 1.5.0-SNAPSHOT 1.16.0 1.16 +2.4.1 0.16.0 5.0.0 3.10.1 @@ -251,25 +252,25 @@ under the License. com.ververica flink-sql-connector-mysql-cdc -2.4.1 +${flink.sql.cdc.version} provided com.ververica flink-sql-connector-oracle-cdc -2.4.1 +${flink.sql.cdc.version} provided com.ververica flink-sql-connector-postgres-cdc -2.4.1 +${flink.sql.cdc.version} provided com.ververica flink-sql-connector-sqlserver-cdc -2.4.1 +${flink.sql.cdc.version} provided - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-spark-connector] branch master updated: [feature] support read map and struct type (#116)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 7a45abe [feature] support read map and struct type (#116) 7a45abe is described below commit 7a45abe3f955e938f6161e95c43a911464996b5a Author: gnehil AuthorDate: Tue Oct 17 11:58:35 2023 +0800 [feature] support read map and struct type (#116) --- .../apache/doris/spark/serialization/RowBatch.java | 37 + .../org/apache/doris/spark/sql/SchemaUtils.scala | 3 +- .../doris/spark/serialization/TestRowBatch.java| 180 +++-- .../doris/spark/sql/TestSparkConnector.scala | 1 + 4 files changed, 203 insertions(+), 18 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index 3d66db5..b43b0a2 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -37,12 +37,16 @@ import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.complex.impl.UnionMapReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types; import org.apache.commons.lang3.ArrayUtils; import org.apache.spark.sql.types.Decimal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -52,7 +56,9 @@ import java.nio.charset.StandardCharsets; import java.sql.Date; import java.time.LocalDate; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; /** @@ -338,6 +344,37 @@ public class RowBatch { addValueToRow(rowIndex, value); } break; +case "MAP": + Preconditions.checkArgument(mt.equals(Types.MinorType.MAP), +typeMismatchMessage(currentType, mt)); +MapVector mapVector = (MapVector) curFieldVector; +UnionMapReader reader = mapVector.getReader(); +for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { +if (mapVector.isNull(rowIndex)) { +addValueToRow(rowIndex, null); +continue; +} +reader.setPosition(rowIndex); +Map value = new HashMap<>(); +while (reader.next()) { + value.put(reader.key().readObject().toString(), reader.value().readObject().toString()); +} +addValueToRow(rowIndex, JavaConverters.mapAsScalaMapConverter(value).asScala()); +} +break; +case "STRUCT": + Preconditions.checkArgument(mt.equals(Types.MinorType.STRUCT), +typeMismatchMessage(currentType, mt)); +StructVector structVector = (StructVector) curFieldVector; +for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { +if (structVector.isNull(rowIndex)) { +addValueToRow(rowIndex, null); +continue; +} +String value = structVector.getObject(rowIndex).toString(); +addValueToRow(rowIndex, value); +} +break; default: String errMsg = "Unsupported type " + schema.get(col).getType(); logger.error(errMsg); diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index 677cc2e..44baa95 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory i
[doris] branch master updated: [Fix](show-frontends-disk)Fix NPE and macOS compatibility (#25565)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 8d65a627058 [Fix](show-frontends-disk)Fix NPE and macOS compatibility (#25565) 8d65a627058 is described below commit 8d65a627058bd7a4a18a10591619b6c0d585b144 Author: Calvin Kirs AuthorDate: Thu Oct 19 09:53:43 2023 +0800 [Fix](show-frontends-disk)Fix NPE and macOS compatibility (#25565) --- .../java/org/apache/doris/common/io/DiskUtils.java | 97 ++ 1 file changed, 63 insertions(+), 34 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/io/DiskUtils.java b/fe/fe-common/src/main/java/org/apache/doris/common/io/DiskUtils.java index e7b0ea414bd..4861f40fe8a 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/io/DiskUtils.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/io/DiskUtils.java @@ -23,18 +23,16 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.List; @Slf4j public class DiskUtils { public static class Df { public String fileSystem = ""; -public long blocks; -public long used; -public long available; -public int useRate; +public long blocks = 0L; +public long used = 0L; +public long available = 0L; +public int useRate = 0; public String mountedOn = ""; } @@ -46,52 +44,83 @@ public class DiskUtils { return df; } + Process process; try { -process = Runtime.getRuntime().exec("df " + dir); +process = Runtime.getRuntime().exec("df -k " + dir); InputStream inputStream = process.getInputStream(); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); - +Df df = new Df(); // Filesystem 1K-blocks Used Available Use% Mounted on // /dev/sdc 5814186096 5814169712 0 100% /home/spy-sd/sdc String titleLine = reader.readLine(); String dataLine = reader.readLine(); if (titleLine == null || dataLine == null) { -return null; +return df; } -String[] values = dataLine.split("\\s+"); -if (values.length != 6) { -return null; +String[] dfValues = dataLine.split("\\s+"); +if (os.startsWith("Mac")) { +parseMacOSDiskInfo(dfValues, df); +} else { +parseLinuxDiskInfo(dfValues, df); } - -Df df = new Df(); -df.fileSystem = values[0]; -df.blocks = Long.parseLong(values[1]); -df.used = Long.parseLong(values[2]); -df.available = Long.parseLong(values[3]); -df.useRate = Integer.parseInt(values[4].replace("%", "")); -df.mountedOn = values[5]; return df; } catch (IOException e) { log.info("failed to obtain disk information", e); -return null; +return new Df(); } } -private static List getTitles(String titlesLine) { -List titles = new ArrayList<>(); -String[] titleArray = titlesLine.split("\\s+"); -for (String title : titleArray) { -if (title.equalsIgnoreCase("on")) { -if (!titles.isEmpty()) { -int lastIdx = titles.size() - 1; -titles.set(lastIdx, titles.get(lastIdx) + "On"); -} -} else { -titles.add(title); -} +/** + * Linux df -k output + * Filesystem 1K-blocks Used Available Use% Mounted on + * /dev/sda1 8256952 2094712 5742232 27% / + */ +private static void parseLinuxDiskInfo(String[] dfValues, Df df) { +if (dfValues.length != 6) { +return; +} +df.fileSystem = dfValues[0]; +df.blocks = parseLongValue(dfValues[1]); +df.used = parseLongValue(dfValues[2]); +df.available = parseLongValue(dfValues[3]); +df.useRate = parseIntegerValue(dfValues[4].replace("%", "")); +df.mountedOn = dfValues[5]; +} + +/** + * MacOS df -k output + * Filesystem1024-blocks Used Available Capacity iused ifree %iused Mounted on + * /dev/disk1s1 48836 97511104 39004443220% 121655 4884849711 0% / + */ +private static void parseMacOSDiskInfo(String[] dfValues, Df df) { +if (dfValues.length != 9)
[doris] branch master updated: [typo](doc)update config (#25425)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 4752b800b2d [typo](doc)update config (#25425) 4752b800b2d is described below commit 4752b800b2df3b4c986af922d4ddf6d3cf4d27bd Author: caoliang-web <71004656+caoliang-...@users.noreply.github.com> AuthorDate: Thu Oct 19 10:02:31 2023 +0800 [typo](doc)update config (#25425) --- docs/en/docs/admin-manual/config/fe-config.md| 8 docs/zh-CN/docs/admin-manual/config/fe-config.md | 8 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index 3bd31e05a2b..8116015a8fc 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -2626,23 +2626,23 @@ Whether to enable the quantile_state data type `enable_date_conversion` -Default:false +Default:true IsMutable:true MasterOnly:true -If set to TRUE, FE will convert date/datetime to datev2/datetimev2(0) automatically. +FE will convert date/datetime to datev2/datetimev2(0) automatically. `enable_decimal_conversion` -Default:false +Default:true IsMutable:true MasterOnly:true -If set to TRUE, FE will convert DecimalV2 to DecimalV3 automatically. +FE will convert DecimalV2 to DecimalV3 automatically. `proxy_auth_magic_prefix` diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index f5515588e55..a04f6545d26 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -2632,23 +2632,23 @@ SmallFileMgr 中存储的最大文件数 `enable_date_conversion` -默认值:false +默认值:true 是否可以动态配置:true 是否为 Master FE 节点独有的配置项:true -如果设置为 true,FE 会自动将 Date/Datetime 转换为 DateV2/DatetimeV2(0)。 +FE 会自动将 Date/Datetime 转换为 DateV2/DatetimeV2(0)。 `enable_decimal_conversion` -默认值:false +默认值:true 是否可以动态配置:true 是否为 Master FE 节点独有的配置项:true -如果设置为 true,FE 将自动将 DecimalV2 转换为 DecimalV3。 +FE 将自动将 DecimalV2 转换为 DecimalV3。 `proxy_auth_magic_prefix` - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated: [typo](doc)Modify the default value of Stale rowset cleanup policy(#25517)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 68d3c25f263 [typo](doc)Modify the default value of Stale rowset cleanup policy(#25517) 68d3c25f263 is described below commit 68d3c25f263916b2aff569f44de1b0b8de61816f Author: caoliang-web <71004656+caoliang-...@users.noreply.github.com> AuthorDate: Fri Oct 20 15:03:49 2023 +0800 [typo](doc)Modify the default value of Stale rowset cleanup policy(#25517) --- docs/en/docs/admin-manual/config/be-config.md| 2 +- docs/zh-CN/docs/admin-manual/config/be-config.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 4fc3106fbe9..05a8a5d16a0 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -1266,7 +1266,7 @@ BaseCompaction:546859: * Type: int64 * Description: It is used to control the expiration time of cleaning up the merged rowset version. When the current time now() minus the max created rowset‘s create time in a version path is greater than tablet_rowset_stale_sweep_time_sec, the current path is cleaned up and these merged rowsets are deleted, the unit is second. - When writing is too frequent and the disk time is insufficient, you can configure less tablet_rowset_stale_sweep_time_sec. However, if this time is less than 5 minutes, it may cause fe to query the version that has been merged, causing a query -230 error. -* Default value: 1800 +* Default value: 300 `tablet_writer_open_rpc_timeout_sec` diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index ac0263491ae..4a1476a980e 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -1293,7 +1293,7 @@ BaseCompaction:546859: * 类型:int64 * 描述:用来表示清理合并版本的过期时间,当当前时间 now() 减去一个合并的版本路径中rowset最近创建创建时间大于tablet_rowset_stale_sweep_time_sec时,对当前路径进行清理,删除这些合并过的rowset, 单位为s。 - 当写入过于频繁,磁盘空间不足时,可以配置较少这个时间。不过这个时间过短小于5分钟时,可能会引发fe查询不到已经合并过的版本,引发查询-230错误。 -* 默认值:1800 +* 默认值:300 `tablet_writer_open_rpc_timeout_sec` - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated: [Bug](samples)fix IndexOutOfBoundsException (#25608)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 42e5a3324c2 [Bug](samples)fix IndexOutOfBoundsException (#25608) 42e5a3324c2 is described below commit 42e5a3324c2a9426df9b464c509063e98e53f195 Author: caoliang-web <71004656+caoliang-...@users.noreply.github.com> AuthorDate: Fri Oct 20 15:04:44 2023 +0800 [Bug](samples)fix IndexOutOfBoundsException (#25608) --- .../src/main/java/org/apache/doris/sdk/DorisReaderExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/doris-demo/doris-source-demo/src/main/java/org/apache/doris/sdk/DorisReaderExample.java b/samples/doris-demo/doris-source-demo/src/main/java/org/apache/doris/sdk/DorisReaderExample.java index 5c4a5c101b5..03bebef58d8 100644 --- a/samples/doris-demo/doris-source-demo/src/main/java/org/apache/doris/sdk/DorisReaderExample.java +++ b/samples/doris-demo/doris-source-demo/src/main/java/org/apache/doris/sdk/DorisReaderExample.java @@ -166,7 +166,7 @@ public class DorisReaderExample { if(!eos){ int i = convertArrow(next, selectedColumns); offset += i; -readRowCount += offset; +readRowCount = offset; } } //close - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-flink-connector] branch master updated: [improve] improve commit retry strategy (#218)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 0abf1e2 [improve] improve commit retry strategy (#218) 0abf1e2 is described below commit 0abf1e2bc82f9e33e847f4b22ee5f13882c4fdf1 Author: wudi <676366...@qq.com> AuthorDate: Tue Oct 24 16:20:35 2023 +0800 [improve] improve commit retry strategy (#218) --- .../doris/flink/cfg/DorisExecutionOptions.java | 2 +- .../org/apache/doris/flink/sink/BackendUtil.java | 2 +- .../doris/flink/sink/committer/DorisCommitter.java | 15 ++-- .../flink/sink/committer/TestDorisCommitter.java | 28 +++--- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 8f7022d..4a03024 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -29,7 +29,7 @@ public class DorisExecutionOptions implements Serializable { private static final long serialVersionUID = 1L; public static final int DEFAULT_CHECK_INTERVAL = 1; -public static final int DEFAULT_MAX_RETRY_TIMES = 1; +public static final int DEFAULT_MAX_RETRY_TIMES = 3; private static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; private static final int DEFAULT_BUFFER_COUNT = 3; //batch flush diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index 9f9516a..0d45e2f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -74,7 +74,7 @@ public class BackendUtil { throw new DorisRuntimeException("no available backend."); } -public boolean tryHttpConnection(String backend) { +public static boolean tryHttpConnection(String backend) { try { backend = "http://"; + backend; URL url = new URL(backend); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index 2a0fba0..ffcb8af 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -94,7 +94,7 @@ public class DorisCommitter implements Committer, Closeable { LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort); int retry = 0; -while (retry++ <= maxRetry) { +while (retry <= maxRetry) { //get latest-url String url = String.format(commitPattern, hostPort, committable.getDb()); HttpPut httpPut = builder.setUrl(url).setEmptyEntity().build(); @@ -108,7 +108,7 @@ public class DorisCommitter implements Committer, Closeable { Map res = jsonMapper.readValue(loadResult, new TypeReference>() { }); if (!res.get("status").equals(SUCCESS) && !ResponseUtil.isCommitted(res.get("msg"))) { -throw new DorisRuntimeException("Commit failed " + loadResult); +throw new DorisRuntimeException("commit transaction failed " + loadResult); } else { LOG.info("load result {}", loadResult); } @@ -116,18 +116,19 @@ public class DorisCommitter implements Committer, Closeable { return; } String reasonPhrase = statusLine.getReasonPhrase(); -LOG.warn("commit failed with {}, reason {}", hostPort, reasonPhrase); +LOG.error("commit failed with {}, reason {}", hostPort, reasonPhrase); if (retry == maxRetry) { -throw new DorisRuntimeException("stream load error: " + reasonPhrase); +throw new DorisRuntimeException("commit transaction error: " + reasonPhrase); } hostPort = backendUtil.getAvailableBackend(); -} catch (IOException e) { -LOG.error("commit transaction failed: ", e); +} catch (Exception e) { +
[doris-shade] branch master updated: [fix](spark-load) remove comflict jar for spark_load of Doris v1.2 (#27)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-shade.git The following commit(s) were added to refs/heads/master by this push: new fa4c2b6 [fix](spark-load) remove comflict jar for spark_load of Doris v1.2 (#27) fa4c2b6 is described below commit fa4c2b664a10282159009d181e6df8d0ca1596e5 Author: Yulei-Yang AuthorDate: Wed Oct 25 09:47:29 2023 +0800 [fix](spark-load) remove comflict jar for spark_load of Doris v1.2 (#27) --- hive-shade-3/pom.xml | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hive-shade-3/pom.xml b/hive-shade-3/pom.xml index 7783dc1..5d8370b 100644 --- a/hive-shade-3/pom.xml +++ b/hive-shade-3/pom.xml @@ -98,6 +98,10 @@ under the License. org.apache.hadoop hadoop-yarn-server-common + +org.apache.hadoop +hadoop-yarn-api + @@ -337,4 +341,4 @@ under the License. - \ No newline at end of file + - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-spark-connector] branch master updated: [improve] Reduce the performance loss of additional buffer expansion. (#143)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 1c21bc4 [improve] Reduce the performance loss of additional buffer expansion. (#143) 1c21bc4 is described below commit 1c21bc46d971eaed7df24e0382329bbbae4ed4c3 Author: Chuang Li <64473732+codecooke...@users.noreply.github.com> AuthorDate: Wed Oct 25 15:02:38 2023 +0800 [improve] Reduce the performance loss of additional buffer expansion. (#143) --- .../doris/spark/load/RecordBatchInputStream.java | 88 +++--- 1 file changed, 29 insertions(+), 59 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java index f70809b..a361c39 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java @@ -40,8 +40,6 @@ public class RecordBatchInputStream extends InputStream { public static final Logger LOG = LoggerFactory.getLogger(RecordBatchInputStream.class); -private static final int DEFAULT_BUF_SIZE = 4096; - /** * Load record batch */ @@ -55,7 +53,12 @@ public class RecordBatchInputStream extends InputStream { /** * record buffer */ -private ByteBuffer buffer = ByteBuffer.allocate(0); + +private ByteBuffer lineBuf = ByteBuffer.allocate(0);; + +private ByteBuffer delimBuf = ByteBuffer.allocate(0); + +private final byte[] delim; /** * record count has been read @@ -70,31 +73,42 @@ public class RecordBatchInputStream extends InputStream { public RecordBatchInputStream(RecordBatch recordBatch, boolean passThrough) { this.recordBatch = recordBatch; this.passThrough = passThrough; +this.delim = recordBatch.getDelim(); } @Override public int read() throws IOException { try { -if (buffer.remaining() == 0 && endOfBatch()) { -return -1; // End of stream +if (lineBuf.remaining() == 0 && endOfBatch()) { +return -1; +} + +if (delimBuf != null && delimBuf.remaining() > 0) { +return delimBuf.get() & 0xff; } } catch (DorisException e) { throw new IOException(e); } -return buffer.get() & 0xFF; +return lineBuf.get() & 0xFF; } @Override public int read(byte[] b, int off, int len) throws IOException { try { -if (buffer.remaining() == 0 && endOfBatch()) { -return -1; // End of stream +if (lineBuf.remaining() == 0 && endOfBatch()) { +return -1; +} + +if (delimBuf != null && delimBuf.remaining() > 0) { +int bytesRead = Math.min(len, delimBuf.remaining()); +delimBuf.get(b, off, bytesRead); +return bytesRead; } } catch (DorisException e) { throw new IOException(e); } -int bytesRead = Math.min(len, buffer.remaining()); -buffer.get(b, off, bytesRead); +int bytesRead = Math.min(len, lineBuf.remaining()); +lineBuf.get(b, off, bytesRead); return bytesRead; } @@ -109,6 +123,7 @@ public class RecordBatchInputStream extends InputStream { public boolean endOfBatch() throws DorisException { Iterator iterator = recordBatch.getIterator(); if (readCount >= recordBatch.getBatchSize() || !iterator.hasNext()) { +delimBuf = null; return true; } readNext(iterator); @@ -125,62 +140,18 @@ public class RecordBatchInputStream extends InputStream { if (!iterator.hasNext()) { throw new ShouldNeverHappenException(); } -byte[] delim = recordBatch.getDelim(); byte[] rowBytes = rowToByte(iterator.next()); if (isFirst) { -ensureCapacity(rowBytes.length); -buffer.put(rowBytes); -buffer.flip(); +delimBuf = null; +lineBuf = ByteBuffer.wrap(rowBytes); isFirst = false; } else { -ensureCapacity(delim.length + rowBytes.length); -buffer.put(delim); -buffer.put(rowBytes); -buffer.flip(); +delimBuf = ByteBuffer.wrap(delim); +lineBuf = ByteBuffer.wrap(rowBytes); } readCount++; } -/** - * Check if the buffer has enough capacity. - * - * @param need required buffer space -
[doris-spark-connector] branch master updated: [fix](load) fix npe when array is null (#147)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new f1e402a [fix](load) fix npe when array is null (#147) f1e402a is described below commit f1e402a7467dfe041e3d34425c586c8e129c48b9 Author: daikon <1059907...@qq.com> AuthorDate: Wed Oct 25 15:19:51 2023 +0800 [fix](load) fix npe when array is null (#147) --- .../scala/org/apache/doris/spark/sql/SchemaUtils.scala | 14 +++--- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index 44baa95..e806059 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory import java.sql.Timestamp import java.time.{LocalDateTime, ZoneOffset} import scala.collection.JavaConversions._ -import scala.collection.mutable private[spark] object SchemaUtils { private val logger = LoggerFactory.getLogger(SchemaUtils.getClass.getSimpleName.stripSuffix("$")) @@ -166,13 +165,14 @@ private[spark] object SchemaUtils { case dt: DecimalType => row.getDecimal(ordinal, dt.precision, dt.scale) case at: ArrayType => val arrayData = row.getArray(ordinal) -var i = 0 -val buffer = mutable.Buffer[Any]() -while (i < arrayData.numElements()) { - if (arrayData.isNullAt(i)) buffer += null else buffer += rowColumnValue(arrayData, i, at.elementType) - i += 1 +if (arrayData == null) DataUtil.NULL_VALUE +else if(arrayData.numElements() == 0) "[]" +else { + (0 until arrayData.numElements()).map(i => { +if (arrayData.isNullAt(i)) null else rowColumnValue(arrayData, i, at.elementType) + }).mkString("[", ",", "]") } -s"[${buffer.mkString(",")}]" + case mt: MapType => val mapData = row.getMap(ordinal) val keys = mapData.keyArray() - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-spark-connector] branch master updated: [improvement] batch load retry (#148)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 9dd57b0 [improvement] batch load retry (#148) 9dd57b0 is described below commit 9dd57b004afd91320d493c18ff53eb91bea3125d Author: gnehil AuthorDate: Wed Oct 25 15:45:20 2023 +0800 [improvement] batch load retry (#148) Co-authored-by: gnehil --- .../doris/spark/cfg/ConfigurationOptions.java | 2 +- .../apache/doris/spark/load/DorisStreamLoad.java | 4 - .../org/apache/doris/spark/load/RecordBatch.java | 21 + .../doris/spark/load/RecordBatchInputStream.java | 16 ++-- .../spark/listener/DorisTransactionListener.scala | 8 +- .../scala/org/apache/doris/spark/sql/Utils.scala | 27 --- .../apache/doris/spark/writer/DorisWriter.scala| 91 ++ 7 files changed, 107 insertions(+), 62 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index a6767f0..a144fb8 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -70,7 +70,7 @@ public interface ConfigurationOptions { int SINK_BATCH_SIZE_DEFAULT = 10; String DORIS_SINK_MAX_RETRIES = "doris.sink.max-retries"; -int SINK_MAX_RETRIES_DEFAULT = 1; +int SINK_MAX_RETRIES_DEFAULT = 0; String DORIS_MAX_FILTER_RATIO = "doris.max.filter.ratio"; diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index c524a4c..338ffbe 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -98,7 +98,6 @@ public class DorisStreamLoad implements Serializable { private String FIELD_DELIMITER; private final String LINE_DELIMITER; private boolean streamingPassthrough = false; -private final Integer batchSize; private final boolean enable2PC; private final Integer txnRetries; private final Integer txnIntervalMs; @@ -128,8 +127,6 @@ public class DorisStreamLoad implements Serializable { LINE_DELIMITER = escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n")); this.streamingPassthrough = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH, ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH_DEFAULT); -this.batchSize = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, -ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT); this.enable2PC = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC, ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT); this.txnRetries = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_RETRIES, @@ -200,7 +197,6 @@ public class DorisStreamLoad implements Serializable { this.loadUrlStr = loadUrlStr; HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC); RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows) -.batchSize(batchSize) .format(fileType) .sep(FIELD_DELIMITER) .delim(LINE_DELIMITER) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java index 4ce297f..e471d5b 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java @@ -36,11 +36,6 @@ public class RecordBatch { */ private final Iterator iterator; -/** - * batch size for single load - */ -private final int batchSize; - /** * stream load format */ @@ -63,10 +58,9 @@ public class RecordBatch { private final boolean addDoubleQuotes; -private RecordBatch(Iterator iterator, int batchSize, String format, String sep, byte[] delim, +private RecordBatch(Iterator iterator, String format, String sep, byte[] delim, StructType schema, boolean addDoubleQuotes) { this.iterator = iterator; -this.batchSize = batchSize; this.format = format; this.sep = sep; this.delim = delim; @@ -78,10 +72,6 @@ public clas
[doris-spark-connector] branch master updated: [feature] support overwrite save mode (#149)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 00ff399 [feature] support overwrite save mode (#149) 00ff399 is described below commit 00ff39986e52072ab840f8f0dd1bab915f58dede Author: gnehil AuthorDate: Wed Oct 25 18:13:53 2023 +0800 [feature] support overwrite save mode (#149) --- spark-doris-connector/pom.xml | 9 .../doris/spark/cfg/ConfigurationOptions.java | 1 + .../org/apache/doris/spark/jdbc/JdbcUtils.scala| 34 + .../org/apache/doris/spark/sql/DorisRelation.scala | 3 +- .../doris/spark/sql/DorisSourceProvider.scala | 59 +- 5 files changed, 104 insertions(+), 2 deletions(-) diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index 74a53cc..4148a66 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -184,6 +184,15 @@ jackson-core ${fasterxml.jackson.version} + + + +com.mysql +mysql-connector-j +8.0.33 +test + + diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index a144fb8..6498916 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -20,6 +20,7 @@ package org.apache.doris.spark.cfg; public interface ConfigurationOptions { // doris fe node address String DORIS_FENODES = "doris.fenodes"; +String DORIS_QUERY_PORT = "doris.query.port"; String DORIS_DEFAULT_CLUSTER = "default_cluster"; diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala new file mode 100644 index 000..aab1032 --- /dev/null +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/jdbc/JdbcUtils.scala @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.jdbc + +import java.sql.{Connection, DriverManager} +import java.util.Properties + +object JdbcUtils { + + def getJdbcUrl(host: String, port: Int): String = s"jdbc:mysql://$host:$port/information_schema" + + def getConnection(url: String, props: Properties): Connection = { + +DriverManager.getConnection(url, props) + } + + def getTruncateQuery(table: String): String = s"TRUNCATE TABLE $table" + +} diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala index 049d5a2..fe7e63d 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala @@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -98,6 +98,7 @@ private[sql] class DorisRelation( } data.write.format(DorisSourceProvider.SHORT_NAME) .options(insertCfg) + .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append) .save() } } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index 94fab9e..ac04401 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Dor
[doris-shade] branch master updated: Update CHANGE-LOG.txt (#29)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-shade.git The following commit(s) were added to refs/heads/master by this push: new 66eea50 Update CHANGE-LOG.txt (#29) 66eea50 is described below commit 66eea50dbdf140fa5c1be3730889ff3bdc3f995f Author: Calvin Kirs AuthorDate: Thu Oct 26 14:26:40 2023 +0800 Update CHANGE-LOG.txt (#29) --- CHANGE-LOG.txt | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/CHANGE-LOG.txt b/CHANGE-LOG.txt index 818aa74..b0a94ae 100644 --- a/CHANGE-LOG.txt +++ b/CHANGE-LOG.txt @@ -1,7 +1,7 @@ ## Changes -- Add hadoop dependency shade -- Add thrift dependency shade -- Add hive catalog dependency shade -- Add kerby dependency shade -- Add Spring5 dependency shade +- [fix](spark-load) remove comflict jar for spark_load of Doris v1.2 (#27) + +- [fix](multi-catalog)fix dependencies for dlf (#26) + + - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-flink-connector] branch master updated: [Improve](schemaChange)schema change support rename column (#206)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 78ed055 [Improve](schemaChange)schema change support rename column (#206) 78ed055 is described below commit 78ed055fd2e7f7657dadfb1d3f4f4dd9e9caadf7 Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com> AuthorDate: Fri Oct 27 16:02:32 2023 +0800 [Improve](schemaChange)schema change support rename column (#206) --- .../sink/writer/JsonDebeziumSchemaSerializer.java | 20 ++- .../flink/sink/writer/SchemaChangeHelper.java | 23 +++- .../flink/sink/writer/SchemaChangeHelperTest.java | 65 ++ .../writer/TestJsonDebeziumSchemaSerializer.java | 25 + 4 files changed, 130 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java index fd3c92a..bf7b81f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java @@ -82,6 +82,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer updateFiledSchema = new LinkedHashMap<>(); for (JsonNode column : columns) { buildFieldSchema(updateFiledSchema, column); } SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap); -// In order to avoid operations such as rename or change, which may lead to the accidental deletion of the doris column. +// In order to avoid other source table column change operations other than add/drop/rename, +// which may lead to the accidental deletion of the doris column. Matcher matcher = addDropDDLPattern.matcher(ddl); if (!matcher.find()) { return null; @@ -262,6 +276,10 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer originFieldSchemaMap) { +this.originFieldSchemaMap = originFieldSchemaMap; +} @VisibleForTesting public boolean schemaChange(JsonNode recordRoot) { boolean status = false; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java index dc8d83b..8e6307b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java @@ -32,8 +32,9 @@ public class SchemaChangeHelper { private static final List addFieldSchemas = Lists.newArrayList(); // Used to determine whether the doris table supports ddl private static final List ddlSchemas = Lists.newArrayList(); -public static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; -public static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s"; +private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; +private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s"; +private static final String RENAME_DDL = "ALTER TABLE %s RENAME COLUMN %s %s"; public static void compareSchema(Map updateFiledSchemaMap, Map originFieldSchemaMap) { @@ -57,6 +58,24 @@ public class SchemaChangeHelper { } } +public static List generateRenameDDLSql(String table, String oldColumnName, String newColumnName, +Map originFieldSchemaMap) { +ddlSchemas.clear(); +List ddlList = Lists.newArrayList(); +FieldSchema fieldSchema = null; +for (Entry originFieldSchema : originFieldSchemaMap.entrySet()) { +if (originFieldSchema.getKey().equals(oldColumnName)) { +fieldSchema = originFieldSchema.getValue(); +String renameSQL = String.format(RENAME_DDL, table, oldColumnName, newColumnName); +ddlList.add(renameSQL); +ddlSchemas.add(new DDLSchema(oldColumnName, false)); +} +} +originFieldSchemaMap.remove(oldColumnName); +originFieldSchemaMap.put(newColumnName, fieldSchema); +return ddlList; +} + public static List generateDDLSql(String table) { ddlSchemas.clear(); List ddlList = Lists.newArrayList(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelperTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper
(doris-flink-connector) branch master updated: [Improve] registerDriver improvement (#219)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new c9e2751 [Improve] registerDriver improvement (#219) c9e2751 is described below commit c9e2751b4767cf8837ca67b58f3395a2a294ecd9 Author: benjobs AuthorDate: Mon Oct 30 09:57:25 2023 +0800 [Improve] registerDriver improvement (#219) --- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 10 + .../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 26 +- .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 21 ++--- .../tools/cdc/postgres/PostgresDatabaseSync.java | 14 ++-- .../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 18 +++ 5 files changed, 74 insertions(+), 15 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 8aef65d..fcd0f4c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -50,26 +50,36 @@ public abstract class DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change"; private static final String TABLE_NAME_OPTIONS = "table-name"; + protected Configuration config; + protected String database; + protected TableNameConverter converter; protected Pattern includingPattern; protected Pattern excludingPattern; protected Map tableConfig; protected Configuration sinkConfig; protected boolean ignoreDefaultValue; + public StreamExecutionEnvironment env; private boolean createTableOnly = false; private boolean newSchemaChange; protected String includingTables; protected String excludingTables; +public abstract void registerDriver() throws SQLException; + public abstract Connection getConnection() throws SQLException; public abstract List getSchemaList() throws Exception; public abstract DataStreamSource buildCdcSource(StreamExecutionEnvironment env); +public DatabaseSync() throws SQLException { +registerDriver(); +} + public void create(StreamExecutionEnvironment env, String database, Configuration config, String tablePrefix, String tableSuffix, String includingTables, String excludingTables, boolean ignoreDefaultValue, Configuration sinkConfig, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index 2235e0b..22e49aa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -54,10 +54,25 @@ import java.util.regex.Pattern; public class MysqlDatabaseSync extends DatabaseSync { private static final Logger LOG = LoggerFactory.getLogger(MysqlDatabaseSync.class); -private static String JDBC_URL = "jdbc:mysql://%s:%d?useInformationSchema=true"; -private static String PROPERTIES_PREFIX = "jdbc.properties."; +private static final String JDBC_URL = "jdbc:mysql://%s:%d?useInformationSchema=true"; +private static final String PROPERTIES_PREFIX = "jdbc.properties."; -public MysqlDatabaseSync() { +public MysqlDatabaseSync() throws SQLException { +super(); +} + +@Override +public void registerDriver() throws SQLException { +try { +Class.forName("com.mysql.cj.jdbc.Driver"); +} catch (ClassNotFoundException ex) { +LOG.warn("can not found class com.mysql.cj.jdbc.Driver, use class com.mysql.jdbc.Driver"); +try { +Class.forName("com.mysql.jdbc.Driver"); +} catch (Exception e) { +throw new SQLException("No suitable driver found, can not found class com.mysql.cj.jdbc.Driver and com.mysql.jdbc.Driver"); +} +} } @Override @@ -86,7 +101,7 @@ public class MysqlDatabaseSync extends DatabaseSync { } SourceSchema sourceSchema = new MysqlSchema(metaData, databaseName, tableName, tableComment); -sourceSchema.setModel(sourceSchema.primaryKeys.size() > 0 ? DataModel.UNIQUE : DataModel.DUPLICATE); +
(doris-flink-connector) branch JNSimba-patch-1 created (now 689d3b2)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch JNSimba-patch-1 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git at 689d3b2 update version in pom.xml This branch includes the following new commits: new 689d3b2 update version in pom.xml The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) 01/01: update version in pom.xml
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch JNSimba-patch-1 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git commit 689d3b22786d5476c396a3e17919bbffdeeb40bf Author: wudi <676366...@qq.com> AuthorDate: Tue Oct 31 12:03:12 2023 +0800 update version in pom.xml --- flink-doris-connector/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index a28b7ac..75563c8 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -27,7 +27,7 @@ under the License. org.apache.doris flink-doris-connector-${flink.minor.version}_${scala.version} -1.0.0-SNAPSHOT +1.0.4-SNAPSHOT Flink Doris Connector https://doris.apache.org/ - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch branch-for-flink-before-1.13 updated: update version in pom.xml (#221)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch branch-for-flink-before-1.13 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 by this push: new 7326a64 update version in pom.xml (#221) 7326a64 is described below commit 7326a64ae0bfa23c047eb630981ad4e80f059af1 Author: wudi <676366...@qq.com> AuthorDate: Tue Oct 31 14:09:19 2023 +0800 update version in pom.xml (#221) --- flink-doris-connector/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index a28b7ac..75563c8 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -27,7 +27,7 @@ under the License. org.apache.doris flink-doris-connector-${flink.minor.version}_${scala.version} -1.0.0-SNAPSHOT +1.0.4-SNAPSHOT Flink Doris Connector https://doris.apache.org/ - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch master updated: [improve](catalog) improve code (#222)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 115017b [improve](catalog) improve code (#222) 115017b is described below commit 115017b9977dfda4adb2415733de3546b0b5513f Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Wed Nov 1 10:21:03 2023 +0800 [improve](catalog) improve code (#222) --- .../java/org/apache/doris/flink/catalog/doris/DorisSystem.java | 9 - 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index 77584d3..7aa314f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -24,7 +24,6 @@ import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider; import org.apache.doris.flink.exception.CreateTableException; import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.exception.DorisSystemException; -import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.flink.annotation.Public; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; @@ -33,7 +32,7 @@ import org.slf4j.LoggerFactory; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -47,9 +46,9 @@ import static org.apache.flink.util.Preconditions.checkArgument; */ @Public public class DorisSystem { -private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); -private JdbcConnectionProvider jdbcConnectionProvider; -private static final List builtinDatabases = Arrays.asList("information_schema"); +private static final Logger LOG = LoggerFactory.getLogger(DorisSystem.class); +private final JdbcConnectionProvider jdbcConnectionProvider; +private static final List builtinDatabases = Collections.singletonList("information_schema"); public DorisSystem(DorisConnectionOptions options) { this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(options); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris) branch master updated: [doc](fix) update doc for rename column (#25832)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new b94e6d6c051 [doc](fix) update doc for rename column (#25832) b94e6d6c051 is described below commit b94e6d6c051d32d96cb757c89e87b170a2151e46 Author: wudi <676366...@qq.com> AuthorDate: Wed Nov 1 11:30:03 2023 +0800 [doc](fix) update doc for rename column (#25832) --- .../Data-Definition-Statements/Alter/ALTER-TABLE-RENAME.md | 2 +- .../Data-Definition-Statements/Alter/ALTER-TABLE-RENAME.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-RENAME.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-RENAME.md index 8884fa77aad..2e176339cbb 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-RENAME.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-RENAME.md @@ -81,7 +81,7 @@ RENAME COLUMN old_column_name new_column_name; ``` Notice: -- Currently only tables of the unique model are supported, which are created with property 'light_schema_change=true'. +- When creating a table, you need to set 'light_schema_change=true' in the property. ### Example diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-RENAME.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-RENAME.md index f58ade73d54..052679a98b7 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-RENAME.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-RENAME.md @@ -81,7 +81,7 @@ RENAME COLUMN old_column_name new_column_name; ``` 注意: -- 目前仅支持unique模型的表,并且建表时需要在property中设置light_schema_change=true +- 建表时需要在property中设置light_schema_change=true ### Example - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) branch master updated: [fix] complex type npe problem (#151)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 4f2ef8d [fix] complex type npe problem (#151) 4f2ef8d is described below commit 4f2ef8df05b0860ab37acd6c2997d209ed4e3b3d Author: gnehil AuthorDate: Wed Nov 1 17:39:57 2023 +0800 [fix] complex type npe problem (#151) --- spark-doris-connector/pom.xml | 6 ++ .../apache/doris/spark/serialization/RowBatch.java | 4 +- .../org/apache/doris/spark/sql/SchemaUtils.scala | 120 ++--- .../doris/spark/sql/TestConnectorWriteDoris.scala | 60 ++- 4 files changed, 122 insertions(+), 68 deletions(-) diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index 4148a66..518a3e2 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -185,6 +185,12 @@ ${fasterxml.jackson.version} + +com.fasterxml.jackson.module +jackson-module-scala_${scala.version} +${fasterxml.jackson.version} + + com.mysql diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index b43b0a2..cb4d303 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -60,6 +60,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; /** * row batch data container. @@ -357,7 +358,8 @@ public class RowBatch { reader.setPosition(rowIndex); Map value = new HashMap<>(); while (reader.next()) { - value.put(reader.key().readObject().toString(), reader.value().readObject().toString()); + value.put(Objects.toString(reader.key().readObject(), null), + Objects.toString(reader.value().readObject(), null)); } addValueToRow(rowIndex, JavaConverters.mapAsScalaMapConverter(value).asScala()); } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index e806059..982e580 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -17,6 +17,8 @@ package org.apache.doris.spark.sql +import com.fasterxml.jackson.databind.json.JsonMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.doris.sdk.thrift.TScanColumnDesc import org.apache.doris.spark.cfg.ConfigurationOptions.{DORIS_IGNORE_TYPE, DORIS_READ_FIELD} import org.apache.doris.spark.cfg.Settings @@ -32,9 +34,11 @@ import org.slf4j.LoggerFactory import java.sql.Timestamp import java.time.{LocalDateTime, ZoneOffset} import scala.collection.JavaConversions._ +import scala.collection.mutable private[spark] object SchemaUtils { private val logger = LoggerFactory.getLogger(SchemaUtils.getClass.getSimpleName.stripSuffix("$")) + private val MAPPER = JsonMapper.builder().addModule(DefaultScalaModule).build() /** * discover Doris table schema from Doris FE. @@ -147,72 +151,60 @@ private[spark] object SchemaUtils { def rowColumnValue(row: SpecializedGetters, ordinal: Int, dataType: DataType): Any = { -dataType match { - case NullType => DataUtil.NULL_VALUE - case BooleanType => row.getBoolean(ordinal) - case ByteType => row.getByte(ordinal) - case ShortType => row.getShort(ordinal) - case IntegerType => row.getInt(ordinal) - case LongType => row.getLong(ordinal) - case FloatType => row.getFloat(ordinal) - case DoubleType => row.getDouble(ordinal) - case StringType => Option(row.getUTF8String(ordinal)).map(_.toString).getOrElse(DataUtil.NULL_VALUE) - case TimestampType => -LocalDateTime.ofEpochSecond(row.getLong(ordinal) / 10, (row.getLong(ordinal) % 1000).toInt, ZoneOffset.UTC) -new Timestamp(row.getLong(ordinal) / 1000).toString - case DateType => DateTimeUtils.toJavaDate(row.getInt(ordinal)).toString - case BinaryType => row.getBinary(ordinal) - case dt: DecimalType => row.getDecimal(ordinal, dt.precision, dt.scale) - case at: ArrayType => -val arrayD
(doris-spark-connector) branch master updated: [improvement] optimize log and exception message (#152)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new a364450 [improvement] optimize log and exception message (#152) a364450 is described below commit a364450da7bdfdd357f4323be35989003d0db15d Author: gnehil AuthorDate: Thu Nov 2 10:23:22 2023 +0800 [improvement] optimize log and exception message (#152) --- .../main/java/org/apache/doris/spark/load/DorisStreamLoad.java | 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 3d5bf36..2f365a8 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -237,14 +237,16 @@ public class DorisStreamLoad implements Serializable { } if (loadResponse.status != HttpStatus.SC_OK) { -LOG.info("Stream load Response HTTP Status Error:{}", loadResponse); -throw new StreamLoadException("stream load error"); +LOG.error("Stream load http status is not OK, status: {}, response: {}", loadResponse.status, loadResponse); +throw new StreamLoadException( +String.format("stream load error, http status:%d, response:%s", loadResponse.status, loadResponse)); } else { try { RespContent respContent = MAPPER.readValue(loadResponse.respContent, RespContent.class); if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { -LOG.error("Stream load Response RES STATUS Error:{}", loadResponse); -throw new StreamLoadException("stream load error"); +LOG.error("Stream load status is not success, status:{}, response:{}", respContent.getStatus(), loadResponse); +throw new StreamLoadException( +String.format("stream load error, load status:%s, response:%s", respContent.getStatus(), loadResponse)); } LOG.info("Stream load Response:{}", loadResponse); return respContent.getTxnId(); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-flink-connector) branch branch-1.5.0-v1103 created (now 9e68239)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch branch-1.5.0-v1103 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git at 9e68239 [improve] commit message compatible (#220) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated: [Improve](Routineload)Set the maximum timeout for obtaining partition to 60s (#24173)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new b4020a13ef [Improve](Routineload)Set the maximum timeout for obtaining partition to 60s (#24173) b4020a13ef is described below commit b4020a13ef43db4a792e902c8353980e3aa61cb1 Author: Calvin Kirs AuthorDate: Mon Sep 11 14:15:06 2023 +0800 [Improve](Routineload)Set the maximum timeout for obtaining partition to 60s (#24173) --- fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java index 581a1ca48e..40041502ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java @@ -41,6 +41,7 @@ import java.util.stream.Collectors; public class KafkaUtil { private static final Logger LOG = LogManager.getLogger(KafkaUtil.class); +private static final int MAX_KAFKA_PARTITION_TIMEOUT_SECOND = 60; public static List getAllKafkaPartitions(String brokerList, String topic, Map convertedCustomProperties) throws UserException { @@ -70,7 +71,7 @@ public class KafkaUtil { // get info Future future = BackendServiceProxy.getInstance().getInfo(address, request); -InternalService.PProxyResult result = future.get(5, TimeUnit.SECONDS); +InternalService.PProxyResult result = future.get(MAX_KAFKA_PARTITION_TIMEOUT_SECOND, TimeUnit.SECONDS); TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { throw new UserException("failed to get kafka partition info: " + result.getStatus().getErrorMsgsList()); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated (f27f486e8d -> 8ae7e67623)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from f27f486e8d fix missing stats in physical plan (#24159) add 8ae7e67623 [Docs](Ldap)Add Jdbc connect docs (#24181) No new revisions were added by this update. Summary of changes: docs/en/docs/admin-manual/privilege-ldap/ldap.md | 26 + .../zh-CN/docs/admin-manual/privilege-ldap/ldap.md | 27 ++ 2 files changed, 53 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-spark-connector] branch master updated: [optimize][refractor] Optimizing memory usage for writing data (#140)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 0daf6c4 [optimize][refractor] Optimizing memory usage for writing data (#140) 0daf6c4 is described below commit 0daf6c4de9c2b1686cd6862e5ada3b1cba36640a Author: gnehil AuthorDate: Wed Sep 13 15:47:57 2023 +0800 [optimize][refractor] Optimizing memory usage for writing data (#140) 1.Optimize data transmission 2.Optimize partitioned data iteration --- .../apache/doris/spark/backend/BackendClient.java | 3 +- .../java/org/apache/doris/spark/cfg/Settings.java | 1 + .../apache/doris/spark/load/DorisStreamLoad.java | 238 ++--- .../org/apache/doris/spark/load/RecordBatch.java | 153 + .../doris/spark/load/RecordBatchInputStream.java | 221 +++ .../apache/doris/spark/serialization/RowBatch.java | 38 ++-- .../java/org/apache/doris/spark/util/DataUtil.java | 53 +++-- .../org/apache/doris/spark/util/ListUtils.java | 2 +- .../scala/org/apache/doris/spark/package.scala | 6 +- .../org/apache/doris/spark/sql/ScalaDorisRow.scala | 6 +- .../org/apache/doris/spark/sql/SchemaUtils.scala | 58 - .../apache/doris/spark/writer/DorisWriter.scala| 91 ++-- .../org/apache/doris/spark/util/DataUtilTest.java | 32 --- .../apache/doris/spark/sql/SchemaUtilsTest.scala | 54 + 14 files changed, 637 insertions(+), 319 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java index aaafe09..b10797b 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java @@ -29,9 +29,10 @@ import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.exception.ConnectedFailedException; import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.exception.DorisInternalException; -import org.apache.doris.spark.util.ErrorMessages; import org.apache.doris.spark.cfg.Settings; import org.apache.doris.spark.serialization.Routing; +import org.apache.doris.spark.util.ErrorMessages; + import org.apache.thrift.TConfiguration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java index 798ec8c..c941fdf 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.exception.IllegalArgumentException; import org.apache.doris.spark.util.ErrorMessages; import org.apache.doris.spark.util.IOUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index ac920cd..9ecfa40 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -22,8 +22,6 @@ import org.apache.doris.spark.exception.StreamLoadException; import org.apache.doris.spark.rest.RestService; import org.apache.doris.spark.rest.models.BackendV2; import org.apache.doris.spark.rest.models.RespContent; -import org.apache.doris.spark.util.DataUtil; -import org.apache.doris.spark.util.ListUtils; import org.apache.doris.spark.util.ResponseUtil; import com.fasterxml.jackson.core.JsonProcessingException; @@ -39,71 +37,72 @@ import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.entity.BufferedHttpEntity; -import org.apache.http.entity.StringEntity; +import org.apache.http.entity.InputStreamEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.sql.Timestamp; import java.util.ArrayList
[doris] branch master updated (3827549aba -> 85e5b49d4c)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from 3827549aba [Chore](checks) change SonarCloud Scan projectBaseDir to be to avoid include .java file (#24377) add 85e5b49d4c update structured streaming doc (#24016) No new revisions were added by this update. Summary of changes: docs/en/docs/ecosystem/spark-doris-connector.md| 40 docs/zh-CN/docs/ecosystem/spark-doris-connector.md | 44 +- 2 files changed, 68 insertions(+), 16 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-spark-connector] branch master updated (0daf6c4 -> bcccb0d)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git from 0daf6c4 [optimize][refractor] Optimizing memory usage for writing data (#140) add bcccb0d [improvement] 2pc commit or abort retry and interval configurable. (#136) No new revisions were added by this update. Summary of changes: .../org/apache/doris/spark/cfg/ConfigurationOptions.java | 12 .../doris/spark/listener/DorisTransactionListener.scala | 6 +++--- .../scala/org/apache/doris/spark/writer/DorisWriter.scala| 10 -- 3 files changed, 23 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-spark-connector] branch master updated: [feature] add stream load config to add double quotes for field when csv format. (#119)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 32ec6c0 [feature] add stream load config to add double quotes for field when csv format. (#119) 32ec6c0 is described below commit 32ec6c09ee86bad708b93ea3c8d00c668f690585 Author: Chuang Li <64473732+codecooke...@users.noreply.github.com> AuthorDate: Thu Sep 14 18:31:28 2023 +0800 [feature] add stream load config to add double quotes for field when csv format. (#119) --- .../org/apache/doris/spark/load/DorisStreamLoad.java | 10 +- .../java/org/apache/doris/spark/load/RecordBatch.java | 18 -- .../doris/spark/load/RecordBatchInputStream.java | 6 +- .../java/org/apache/doris/spark/util/DataUtil.java | 16 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 9ecfa40..0b506b0 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -70,6 +70,7 @@ import java.util.concurrent.TimeUnit; * DorisStreamLoad **/ public class DorisStreamLoad implements Serializable { +private static final String NULL_VALUE = "\\N"; private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); @@ -88,6 +89,7 @@ public class DorisStreamLoad implements Serializable { private final String columns; private final String maxFilterRatio; private final Map streamLoadProp; +private boolean addDoubleQuotes; private static final long cacheExpireTimeout = 4 * 60; private final LoadingCache> cache; private final String fileType; @@ -111,6 +113,11 @@ public class DorisStreamLoad implements Serializable { fileType = streamLoadProp.getOrDefault("format", "csv"); if ("csv".equals(fileType)) { FIELD_DELIMITER = escapeString(streamLoadProp.getOrDefault("column_separator", "\t")); +this.addDoubleQuotes = Boolean.parseBoolean(streamLoadProp.getOrDefault("add_double_quotes", "false")); +if (addDoubleQuotes) { +LOG.info("set add_double_quotes for csv mode, add trim_double_quotes to true for prop."); +streamLoadProp.put("trim_double_quotes", "true"); +} } else if ("json".equalsIgnoreCase(fileType)) { streamLoadProp.put("read_json_by_line", "true"); } @@ -189,7 +196,8 @@ public class DorisStreamLoad implements Serializable { .format(fileType) .sep(FIELD_DELIMITER) .delim(LINE_DELIMITER) -.schema(schema).build(), streamingPassthrough); +.schema(schema) +.addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough); httpPut.setEntity(new InputStreamEntity(recodeBatchInputStream)); HttpResponse httpResponse = httpClient.execute(httpPut); loadResponse = new LoadResponse(httpResponse); diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java index 779c057..4ce297f 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatch.java @@ -61,14 +61,17 @@ public class RecordBatch { */ private final StructType schema; +private final boolean addDoubleQuotes; + private RecordBatch(Iterator iterator, int batchSize, String format, String sep, byte[] delim, -StructType schema) { +StructType schema, boolean addDoubleQuotes) { this.iterator = iterator; this.batchSize = batchSize; this.format = format; this.sep = sep; this.delim = delim; this.schema = schema; +this.addDoubleQuotes = addDoubleQuotes; } public Iterator getIterator() { @@ -94,6 +97,10 @@ public class RecordBatch { public StructType getSchema() { return schema; } + +public boolean getAddDoubleQuotes(){ +return addDoubleQuotes; +} public static Builder newBuilder(Iterator iterator) { return new Builder(iterator); } @@ -115,6 +122,8 @@ public class RecordBatch { private StructType schema; +private boolea
[doris] branch master updated (08740b47cd -> ddc0eb3508)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from 08740b47cd [FIX](decimalv3) fix decimalv3 value with leading zeros (#24416) add ddc0eb3508 [doc](flink-connector) add json string write (#24422) No new revisions were added by this update. Summary of changes: docs/en/docs/ecosystem/flink-doris-connector.md| 10 +- docs/zh-CN/docs/ecosystem/flink-doris-connector.md | 15 --- 2 files changed, 21 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-spark-connector] branch master updated: [bug] NPE occurs for StringType when column value is null. (#141)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new d46d5df [bug] NPE occurs for StringType when column value is null. (#141) d46d5df is described below commit d46d5dfd6a5673dd71e9615f3872e41a8e4229a8 Author: Chuang Li <64473732+codecooke...@users.noreply.github.com> AuthorDate: Fri Sep 15 17:58:50 2023 +0800 [bug] NPE occurs for StringType when column value is null. (#141) --- .../src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index f5a6a15..86a403f 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -156,7 +156,7 @@ private[spark] object SchemaUtils { case LongType => row.getLong(ordinal) case FloatType => row.getFloat(ordinal) case DoubleType => row.getDouble(ordinal) - case StringType => row.getUTF8String(ordinal).toString + case StringType => Option(row.getUTF8String(ordinal)).map(_.toString).getOrElse(DataUtil.NULL_VALUE) case TimestampType => LocalDateTime.ofEpochSecond(row.getLong(ordinal) / 10, (row.getLong(ordinal) % 1000).toInt, ZoneOffset.UTC) new Timestamp(row.getLong(ordinal) / 1000).toString - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-spark-connector] branch master updated: [bug] byteBuffer calculate new capacity bug fix. (#142)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new ad5d62f [bug] byteBuffer calculate new capacity bug fix. (#142) ad5d62f is described below commit ad5d62ff9faf4e8f4694cb8832f9b51a3d4bf301 Author: Chuang Li <64473732+codecooke...@users.noreply.github.com> AuthorDate: Fri Sep 15 17:59:04 2023 +0800 [bug] byteBuffer calculate new capacity bug fix. (#142) --- .../org/apache/doris/spark/load/RecordBatchInputStream.java | 11 +-- 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java index d705501..f70809b 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/RecordBatchInputStream.java @@ -170,14 +170,13 @@ public class RecordBatchInputStream extends InputStream { * @return new capacity */ private int calculateNewCapacity(int capacity, int minCapacity) { -int newCapacity; +int newCapacity = 0; if (capacity == 0) { newCapacity = DEFAULT_BUF_SIZE; -while (newCapacity < minCapacity) { -newCapacity = newCapacity << 1; -} -} else { -newCapacity = capacity << 1; + +} +while (newCapacity < minCapacity) { +newCapacity = newCapacity << 1; } return newCapacity; } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated: [Fix](Job)Fix the window time is not updated when no job is registered (#23628)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new ca55bd88ad [Fix](Job)Fix the window time is not updated when no job is registered (#23628) ca55bd88ad is described below commit ca55bd88adf3a376404b1b8b5c67b4deeba3cb54 Author: Calvin Kirs AuthorDate: Wed Aug 30 09:48:21 2023 +0800 [Fix](Job)Fix the window time is not updated when no job is registered (#23628) Fix resume job grammar definition is inconsistent Show Job task Add execution results JOB allows to define update operations --- .../Data-Definition-Statements/Create/CREATE-JOB.md | 2 +- .../sql-reference/Show-Statements/SHOW-JOB-TASK.md| 1 + .../Data-Definition-Statements/Create/CREATE-JOB.md | 2 +- .../sql-reference/Show-Statements/SHOW-JOB-TASK.md| 1 + .../src/main/java/org/apache/doris/common/Config.java | 2 +- fe/fe-core/src/main/cup/sql_parser.cup| 2 +- .../java/org/apache/doris/analysis/CreateJobStmt.java | 3 ++- .../org/apache/doris/analysis/ShowJobTaskStmt.java| 1 + .../apache/doris/scheduler/disruptor/TaskHandler.java | 10 -- .../doris/scheduler/executor/SqlJobExecutor.java | 19 +++ .../java/org/apache/doris/scheduler/job/JobTask.java | 7 +++ .../doris/scheduler/manager/JobTaskManager.java | 2 +- .../doris/scheduler/manager/TimerJobManager.java | 6 ++ 13 files changed, 42 insertions(+), 16 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md index 839fe0e72c..d89e0aad0d 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md @@ -98,7 +98,7 @@ The SCHEDULER statement is used to define the execution time, frequency and dura Used to specify the end time of the job, if not specified, it means permanent execution. - DO - It is used to specify the operation that needs to be performed when the job is triggered. Currently, all ***INSERT*** operations are supported. We will support more operations in the future. + It is used to specify the operation that needs to be performed when the job is triggered. Currently, all ***INSERT, UPDATE*** operations are supported. We will support more operations in the future. ### Example diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md index c7d1f6afb0..d816059510 100644 --- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md +++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md @@ -49,6 +49,7 @@ Result description: StartTime: start execution time EndTime: end time Status: status + Result: execution result ErrMsg: error message ``` diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md index b67e054714..c06d48a4d0 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md @@ -99,7 +99,7 @@ SCHEDULER 语句用于定义作业的执行时间,频率以及持续时间, 用于指定作业的结束时间,如果没有指定,则表示永久执行。 - DO - 用于指定作业触发时需要执行的操作,目前支持所有的 ***INSERT*** 操作。后续我们会支持更多的操作。 + 用于指定作业触发时需要执行的操作,目前支持所有的 ***INSERT,UPDATE*** 操作。后续我们会支持更多的操作。 ### Example diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md index 3ae1e24733..d1b458ea29 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB-TASK.md @@ -50,6 +50,7 @@ SHOW JOB TASKS FOR job_name; StartTime: 开始执行时间 EndTime: 结束时间 Status: 状态 + Result: 执行结果 ErrMsg: 错误信息 ``` diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 2815dfb0ec..a683b307da 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1554,7 +1554,7 @@ public class Config extends ConfigBase { public static boolean
[doris] branch master updated (e680d42fe7 -> 72fef48f87)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from e680d42fe7 [feature](information_schema)add metadata_name_ids for quickly get catlogs,db,table and add profiling table in order to Compatible with mysql (#22702) add 72fef48f87 [Doc](flink-connector)Flink connector adds schema change related parameter documents (#23439) No new revisions were added by this update. Summary of changes: docs/en/docs/ecosystem/flink-doris-connector.md| 2 ++ docs/zh-CN/docs/ecosystem/flink-doris-connector.md | 2 ++ 2 files changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated: [Doc](tvf)Added tvf support for reading documents from avro files (#23436)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new b763bfa17d [Doc](tvf)Added tvf support for reading documents from avro files (#23436) b763bfa17d is described below commit b763bfa17db45ee62687c72f9d86944dc67fdcb0 Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com> AuthorDate: Thu Aug 31 21:49:27 2023 +0800 [Doc](tvf)Added tvf support for reading documents from avro files (#23436) --- .../sql-manual/sql-functions/table-functions/hdfs.md | 2 +- .../sql-manual/sql-functions/table-functions/s3.md| 19 +++ .../sql-manual/sql-functions/table-functions/hdfs.md | 2 +- .../sql-manual/sql-functions/table-functions/s3.md| 18 ++ 4 files changed, 39 insertions(+), 2 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/hdfs.md b/docs/en/docs/sql-manual/sql-functions/table-functions/hdfs.md index 5969585ad5..7cbd21366a 100644 --- a/docs/en/docs/sql-manual/sql-functions/table-functions/hdfs.md +++ b/docs/en/docs/sql-manual/sql-functions/table-functions/hdfs.md @@ -69,7 +69,7 @@ Related parameters for accessing HDFS in HA mode: File format parameters: -- `format`: (required) Currently support `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc` +- `format`: (required) Currently support `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc/avro` - `column_separator`: (optional) default `,`. - `line_delimiter`: (optional) default `\n`. - `compress_type`: (optional) Currently support `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`. Default value is `UNKNOWN`, it will automatically infer the type based on the suffix of `uri`. diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/s3.md b/docs/en/docs/sql-manual/sql-functions/table-functions/s3.md index b42788583f..d089c98155 100644 --- a/docs/en/docs/sql-manual/sql-functions/table-functions/s3.md +++ b/docs/en/docs/sql-manual/sql-functions/table-functions/s3.md @@ -424,6 +424,25 @@ MySQL [(none)]> select * from s3( +---+--++--+-++-+---+-+ ``` +**avro format** + +`avro` format: S3 tvf supports parsing the column names and column types of the table schema from the avro file. Example: + +```sql +select * from s3( + "uri" = "http://127.0.0.1:9312/test2/person.avro";, + "ACCESS_KEY" = "ak", + "SECRET_KEY" = "sk", + "FORMAT" = "avro"); +++--+-+-+ +| name | boolean_type | double_type | long_type | +++--+-+-+ +| Alyssa |1 | 10.0012 | 1221133 | +| Ben|0 |.999 | 400999 | +| lisi |0 | 5992225.999 | 909990 | +++--+-+-+ +``` + **uri contains wildcards** uri can use wildcards to read multiple files. Note: If wildcards are used, the format of each file must be consistent (especially csv/csv_with_names/csv_with_names_and_types count as different formats), S3 tvf uses the first file to parse out the table schema. For example: diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/hdfs.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/hdfs.md index 47e723a623..c7faaa7a86 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/hdfs.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/hdfs.md @@ -70,7 +70,7 @@ hdfs( - `dfs.client.failover.proxy.provider.your-nameservices`:(选填) 文件格式相关参数 -- `format`:(必填) 目前支持 `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc` +- `format`:(必填) 目前支持 `csv/csv_with_names/csv_with_names_and_types/json/parquet/orc/avro` - `column_separator`:(选填) 列分割符, 默认为`,`。 - `line_delimiter`:(选填) 行分割符,默认为`\n`。 - `compress_type`: (选填) 目前支持 `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`。 默认值为 `UNKNOWN`, 将会根据 `uri` 的后缀自动推断类型。 diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/s3.md b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/s3.md index 1a64205643..081734985c 100644 --- a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/s3.md +++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/s3.md @@ -428,6 +428,24 @@ MySQL [(none)]> select * from s3( | 5 | forest brown coral puff cream| Manufacturer#3 | Brand#32 | STANDARD POLISHED TIN | 15 | SM PKG | 905 | wake carefully | +---+--++--+-+
[doris-flink-connector] branch master updated: [Improvement] Move DorisSourceBuilder into DorisSource as a static inner builder class (#189)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 9831982 [Improvement] Move DorisSourceBuilder into DorisSource as a static inner builder class (#189) 9831982 is described below commit 983198218adfcd9ec11e1b389f106dafbfc51c8a Author: thehuldra AuthorDate: Mon Sep 4 17:37:33 2023 +0800 [Improvement] Move DorisSourceBuilder into DorisSource as a static inner builder class (#189) --- .../org/apache/doris/flink/source/DorisSource.java | 52 .../doris/flink/source/DorisSourceBuilder.java | 71 -- .../doris/flink/table/DorisDynamicTableSource.java | 3 +- .../doris/flink/source/DorisSourceExampleTest.java | 2 +- 4 files changed, 54 insertions(+), 74 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java index e2b6d41..8edf10b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java @@ -120,4 +120,56 @@ public class DorisSource implements Source getProducedType() { return deserializer.getProducedType(); } + +public static DorisSourceBuilder builder() { +return new DorisSourceBuilder(); +} + +/** + * build for DorisSource. + * @param record type. + */ + +public static class DorisSourceBuilder { + +private DorisOptions options; +private DorisReadOptions readOptions; + +// Boundedness +private Boundedness boundedness; +private DorisDeserializationSchema deserializer; + +DorisSourceBuilder() { +boundedness = Boundedness.BOUNDED; +} + + +public DorisSourceBuilder setDorisOptions(DorisOptions options) { +this.options = options; +return this; +} + +public DorisSourceBuilder setDorisReadOptions(DorisReadOptions readOptions) { +this.readOptions = readOptions; +return this; +} + +public DorisSourceBuilder setBoundedness(Boundedness boundedness) { +this.boundedness = boundedness; +return this; +} + +public DorisSourceBuilder setDeserializer(DorisDeserializationSchema deserializer) { +this.deserializer = deserializer; +return this; +} + +public DorisSource build() { +if(readOptions == null){ +readOptions = DorisReadOptions.builder().build(); +} +return new DorisSource<>(options, readOptions, boundedness, deserializer); +} +} + } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java deleted file mode 100644 index 94febb8..000 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package org.apache.doris.flink.source; - -import org.apache.doris.flink.cfg.DorisOptions; -import org.apache.doris.flink.cfg.DorisReadOptions; -import org.apache.doris.flink.deserialization.DorisDeserializationSchema; -import org.apache.flink.api.connector.source.Boundedness; - -/** - * The builder class for {@link DorisSource} to make it easier for the users to construct a {@link - * DorisSource}. - **/ -public class DorisSourceBuilder { - -private DorisOptions options; -private DorisReadOptions readOptions; - -// Boundedness -private Boundedness boundedness; -private DorisDeserializationSchema deserializer; - -DorisSourceBuilder() { -boundedness = Boundedness.BOUNDED; -} - -public static DorisSourceBuilder builder() { -return new DorisSourceBuilder(); -} - -
[doris] branch master updated: [Fix](Plan)StreamLoad cannot be parsed correctly when it contains complex where conditions (#23874)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 4dac2d3b94 [Fix](Plan)StreamLoad cannot be parsed correctly when it contains complex where conditions (#23874) 4dac2d3b94 is described below commit 4dac2d3b943241f2b818181d699da97c5e58b75f Author: Calvin Kirs AuthorDate: Tue Sep 5 11:26:59 2023 +0800 [Fix](Plan)StreamLoad cannot be parsed correctly when it contains complex where conditions (#23874) --- .../apache/doris/planner/StreamLoadPlanner.java| 11 +++- .../stream_load/test_include_where_expr.json | 1 + .../test_stream_load_include_where_expr.groovy | 60 ++ 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 5bf07f1f72..63c068e057 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -212,6 +212,11 @@ public class StreamLoadPlanner { } } +scanTupleDesc.setTable(destTable); +analyzer.registerTupleDescriptor(scanTupleDesc); +if (null != taskInfo.getWhereExpr()) { +taskInfo.getWhereExpr().analyze(analyzer); +} // create scan node FileLoadScanNode fileScanNode = new FileLoadScanNode(new PlanNodeId(0), scanTupleDesc); // 1. create file group @@ -416,7 +421,11 @@ public class StreamLoadPlanner { throw new DdlException("Column is not SUM AggregateType. column:" + col.getName()); } } - +scanTupleDesc.setTable(destTable); +analyzer.registerTupleDescriptor(scanTupleDesc); +if (null != taskInfo.getWhereExpr()) { +taskInfo.getWhereExpr().analyze(analyzer); +} // create scan node FileLoadScanNode fileScanNode = new FileLoadScanNode(new PlanNodeId(0), scanTupleDesc); // 1. create file group diff --git a/regression-test/data/load_p0/stream_load/test_include_where_expr.json b/regression-test/data/load_p0/stream_load/test_include_where_expr.json new file mode 100644 index 00..b193dd1a6d --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_include_where_expr.json @@ -0,0 +1 @@ +{"a":"3","b":"2","c":"389","d":"doris"} \ No newline at end of file diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_include_where_expr.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_include_where_expr.groovy new file mode 100644 index 00..a94ecd05a1 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_include_where_expr.groovy @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_stream_load_include_where_expr", "p0") { +// define a sql table +def tableName = "tbl_test_stream_load_include_where_expr" + +sql """ DROP TABLE IF EXISTS ${tableName} """ +sql """ +CREATE TABLE IF NOT EXISTS ${tableName} +( +`a` INT COMMENT 'timestamp', +`b` INT COMMENT 'a int value', +`c` INT COMMENT 'b int value', +`d` varchar(100) +) +DUPLICATE KEY(`a`) +DISTRIBUTED BY HASH(a) BUCKETS AUTO +properties( +"replication_num" = "1" +); +""" + +streamLoad { +table "${tableName}" +set 'columns', 'a, b, c, d' +set 'format', 'json' +set 'where', 'd = \'doris\' or d = \'
[doris-spark-connector] branch master updated: [fix] streaming write execution plan error (#135)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 11f4976 [fix] streaming write execution plan error (#135) 11f4976 is described below commit 11f4976be1aae4c041ea66e3e83487aa2614c947 Author: gnehil AuthorDate: Tue Sep 5 13:56:25 2023 +0800 [fix] streaming write execution plan error (#135) * fix streaming write error and add json data pass through option * handle stream pass through, force set read_json_by_line is true when format is json --- .../doris/spark/cfg/ConfigurationOptions.java | 6 ++ .../apache/doris/spark/load/DorisStreamLoad.java | 50 +++ .../doris/spark/sql/DorisStreamLoadSink.scala | 2 +- .../apache/doris/spark/writer/DorisWriter.scala| 97 +- 4 files changed, 135 insertions(+), 20 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 2ab200d..09c0416 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -100,4 +100,10 @@ public interface ConfigurationOptions { String DORIS_SINK_ENABLE_2PC = "doris.sink.enable-2pc"; boolean DORIS_SINK_ENABLE_2PC_DEFAULT = false; +/** + * pass through json data when sink to doris in streaming mode + */ +String DORIS_SINK_STREAMING_PASSTHROUGH = "doris.sink.streaming.passthrough"; +boolean DORIS_SINK_STREAMING_PASSTHROUGH_DEFAULT = false; + } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 4a7b1e0..ac920cd 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -96,6 +96,8 @@ public class DorisStreamLoad implements Serializable { private boolean readJsonByLine = false; +private boolean streamingPassthrough = false; + public DorisStreamLoad(SparkSettings settings) { String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\."); this.db = dbTable[0]; @@ -121,6 +123,8 @@ public class DorisStreamLoad implements Serializable { } } LINE_DELIMITER = escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n")); +this.streamingPassthrough = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH, +ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH_DEFAULT); } public String getLoadUrlStr() { @@ -196,6 +200,38 @@ public class DorisStreamLoad implements Serializable { } +public List loadStream(List> rows, String[] dfColumns, Boolean enable2PC) +throws StreamLoadException, JsonProcessingException { + +List loadData; + +if (this.streamingPassthrough) { +handleStreamPassThrough(); +loadData = passthrough(rows); +} else { +loadData = parseLoadData(rows, dfColumns); +} + +List txnIds = new ArrayList<>(loadData.size()); + +try { +for (String data : loadData) { +txnIds.add(load(data, enable2PC)); +} +} catch (StreamLoadException e) { +if (enable2PC && !txnIds.isEmpty()) { +LOG.error("load batch failed, abort previously pre-committed transactions"); +for (Integer txnId : txnIds) { +abort(txnId); +} +} +throw e; +} + +return txnIds; + +} + public int load(String value, Boolean enable2PC) throws StreamLoadException { String label = generateLoadLabel(); @@ -442,4 +478,18 @@ public class DorisStreamLoad implements Serializable { return hexData; } +private void handleStreamPassThrough() { + +if ("json".equalsIgnoreCase(fileType)) { +LOG.info("handle stream pass through, force set read_json_by_line is true for json format"); +streamLoadProp.put("read_json_by_line", "true"); +streamLoadProp.remove("strip_outer_array"); +} + +} + +private List passthrough(List> values) { +return values.stream().map(list -> list.get(0).toString()).collect(Collectors.toList()); +} + } diff --git a/spark-doris-connecto
[doris-flink-connector] branch master updated: [Improve]Added direct access to BE through the intranet (#187)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 8c15c4f [Improve]Added direct access to BE through the intranet (#187) 8c15c4f is described below commit 8c15c4f0bf2d63507b8ac1fd0b8b6d00a37afb6d Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com> AuthorDate: Tue Sep 5 15:26:20 2023 +0800 [Improve]Added direct access to BE through the intranet (#187) --- flink-doris-connector/pom.xml | 8 +- .../doris/flink/cfg/DorisConnectionOptions.java| 16 +++- .../org/apache/doris/flink/cfg/DorisOptions.java | 19 +++- .../org/apache/doris/flink/sink/BackendUtil.java | 32 ++- .../flink/sink/batch/DorisBatchStreamLoad.java | 6 +- .../doris/flink/sink/committer/DorisCommitter.java | 10 +- .../doris/flink/sink/writer/DorisWriter.java | 7 +- .../doris/flink/table/DorisConfigOptions.java | 1 + .../flink/table/DorisDynamicTableFactory.java | 5 + .../doris/flink/table/DorisDynamicTableSource.java | 1 + .../doris/flink/table/DorisRowDataInputFormat.java | 5 + .../apache/doris/flink/tools/cdc/DatabaseSync.java | 2 + .../flink/DorisIntranetAccessSinkExample.java | 105 + .../flink/sink/committer/TestDorisCommitter.java | 24 - .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 5 +- .../tools/cdc/CdcOraclelSyncDatabaseCase.java | 5 +- .../tools/cdc/CdcPostgresSyncDatabaseCase.java | 5 +- .../tools/cdc/CdcSqlServerSyncDatabaseCase.java| 5 +- 18 files changed, 236 insertions(+), 25 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 333a40b..e52a07a 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -224,7 +224,13 @@ under the License. org.mockito mockito-core -2.27.0 +4.2.0 +test + + +org.mockito +mockito-inline +4.2.0 test diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java index 00abd52..1382dde 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java @@ -31,6 +31,7 @@ public class DorisConnectionOptions implements Serializable { protected final String username; protected final String password; protected String jdbcUrl; +protected String benodes; public DorisConnectionOptions(String fenodes, String username, String password) { this.fenodes = Preconditions.checkNotNull(fenodes, "fenodes is empty"); @@ -38,8 +39,15 @@ public class DorisConnectionOptions implements Serializable { this.password = password; } -public DorisConnectionOptions(String fenodes, String username, String password, String jdbcUrl){ -this(fenodes,username,password); +public DorisConnectionOptions(String fenodes, String username, String password, String jdbcUrl) { +this(fenodes, username, password); +this.jdbcUrl = jdbcUrl; +} + +public DorisConnectionOptions(String fenodes, String benodes, String username, String password, +String jdbcUrl) { +this(fenodes, username, password); +this.benodes = benodes; this.jdbcUrl = jdbcUrl; } @@ -55,6 +63,10 @@ public class DorisConnectionOptions implements Serializable { return password; } +public String getBenodes() { +return benodes; +} + public String getJdbcUrl(){ return jdbcUrl; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java index c9e36e3..cf7b932 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java @@ -42,6 +42,12 @@ public class DorisOptions extends DorisConnectionOptions { this.tableIdentifier = tableIdentifier; } +public DorisOptions(String fenodes, String beNodes, String username, String password, +String tableIdentifier, String jdbcUrl) { +super(fenodes, beNodes, username, password, jdbcUrl); +this.tableIdentifier = tableIdentifier; +} + public String getTableIdentifier() { return tableIdentifier; } @@ -60,7 +66,7 @@ public class DorisOptions extends DorisConnectionOptions { */ pub
(doris-flink-connector) branch master updated: [feature] multiple tables to one for DatabaseSync (#208)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new a4b4bdf [feature] multiple tables to one for DatabaseSync (#208) a4b4bdf is described below commit a4b4bdfc92bb8fecefffb6f4f81b0a8f577d142e Author: Antg <57290855+codea...@users.noreply.github.com> AuthorDate: Mon Nov 6 16:43:46 2023 +0800 [feature] multiple tables to one for DatabaseSync (#208) --- .../org/apache/doris/flink/tools/cdc/CdcTools.java | 4 +- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 70 -- .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 10 +++- .../tools/cdc/CdcOraclelSyncDatabaseCase.java | 6 +- .../tools/cdc/CdcPostgresSyncDatabaseCase.java | 6 +- .../tools/cdc/CdcSqlServerSyncDatabaseCase.java| 10 ++-- .../doris/flink/tools/cdc/DatabaseSyncTest.java| 40 + 7 files changed, 128 insertions(+), 18 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 6a390ea..8a8b3db 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -103,6 +103,8 @@ public class CdcTools { String tableSuffix = params.get("table-suffix"); String includingTables = params.get("including-tables"); String excludingTables = params.get("excluding-tables"); +String multiToOneOrigin = params.get("multi-to-one-origin"); +String multiToOneTarget = params.get("multi-to-one-target"); boolean createTableOnly = params.has("create-table-only"); boolean ignoreDefaultValue = params.has("ignore-default-value"); boolean useNewSchemaChange = params.has("use-new-schema-change"); @@ -112,7 +114,7 @@ public class CdcTools { Configuration sinkConfig = Configuration.fromMap(sinkMap); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange); +databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables,multiToOneOrigin,multiToOneTarget, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange); databaseSync.build(); if(StringUtils.isNullOrWhitespaceOnly(jobName)){ jobName = String.format("%s-Doris Sync Database: %s", type, config.getString("database-name","db")); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index fcd0f4c..99c45eb 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +59,7 @@ public abstract class DatabaseSync { protected TableNameConverter converter; protected Pattern includingPattern; protected Pattern excludingPattern; +protected Map multiToOneRulesPattern; protected Map tableConfig; protected Configuration sinkConfig; protected boolean ignoreDefaultValue; @@ -67,6 +69,8 @@ public abstract class DatabaseSync { private boolean newSchemaChange; protected String includingTables; protected String excludingTables; +protected String multiToOneOrigin; +protected String multiToOneTarget; public abstract void registerDriver() throws SQLException; @@ -82,16 +86,19 @@ public abstract class DatabaseSync { public void create(StreamExecutionEnvironment env, String database, Configuration config, String tablePrefix, String tableSuffix, String includingTables, - String excludingTables, boolean ignoreDefaultValue, Configuration sinkConfig, + String excludingTables,String multiToOneOrigin,String multiToOneTarget, boolean ignoreDefaultValue, Configuration sinkConfig, Map tableCo
(doris-spark-connector) branch master updated: [improve] remove table path from 2pc url (#154)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new e118070 [improve] remove table path from 2pc url (#154) e118070 is described below commit e118070af5f0232a616b2c79df5cadef05d32e1a Author: gnehil AuthorDate: Mon Nov 6 18:22:45 2023 +0800 [improve] remove table path from 2pc url (#154) --- .../main/java/org/apache/doris/spark/load/DorisStreamLoad.java| 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 2f365a8..69287d8 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -82,9 +82,9 @@ public class DorisStreamLoad implements Serializable { private final static List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout")); -private static final String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";; +private static final String loadUrlPattern = "http://%s/api/%s/%s/_stream_load";; -private static final String abortUrlPattern = "http://%s/api/%s/%s/_stream_load_2pc?";; +private static final String abortUrlPattern = "http://%s/api/%s/_stream_load_2pc";; private String loadUrlStr; private final String db; @@ -270,7 +270,7 @@ public class DorisStreamLoad implements Serializable { try (CloseableHttpClient client = getHttpClient()) { String backend = getBackend(); -String abortUrl = String.format(abortUrlPattern, backend, db, tbl); +String abortUrl = String.format(abortUrlPattern, backend, db); HttpPut httpPut = new HttpPut(abortUrl); addCommonHeader(httpPut); httpPut.setHeader("txn_operation", "commit"); @@ -358,7 +358,7 @@ public class DorisStreamLoad implements Serializable { private void doAbort(Consumer putConsumer) throws StreamLoadException { try (CloseableHttpClient client = getHttpClient()) { -String abortUrl = String.format(abortUrlPattern, getBackend(), db, tbl); +String abortUrl = String.format(abortUrlPattern, getBackend(), db); HttpPut httpPut = new HttpPut(abortUrl); addCommonHeader(httpPut); httpPut.setHeader("txn_operation", "abort"); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) branch master updated: [Improve] DataFrame CSV Stream Load optimization (#137)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new a2e682b [Improve] DataFrame CSV Stream Load optimization (#137) a2e682b is described below commit a2e682bddbb6ef81592d66ab8cb9eff692bc0014 Author: huanccwang AuthorDate: Tue Nov 7 10:54:40 2023 +0800 [Improve] DataFrame CSV Stream Load optimization (#137) --- .../main/java/org/apache/doris/spark/load/DorisStreamLoad.java | 10 -- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 69287d8..f473636 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -69,6 +69,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; +import java.util.stream.Collectors; /** @@ -162,12 +163,17 @@ public class DorisStreamLoad implements Serializable { return httpClientBuilder.build(); } -private HttpPut getHttpPut(String label, String loadUrlStr, Boolean enable2PC) { +private HttpPut getHttpPut(String label, String loadUrlStr, Boolean enable2PC, StructType schema) { HttpPut httpPut = new HttpPut(loadUrlStr); addCommonHeader(httpPut); httpPut.setHeader("label", label); if (StringUtils.isNotBlank(columns)) { httpPut.setHeader("columns", columns); +} else { +if (schema != null && !schema.isEmpty()) { +String dfColumns = Arrays.stream(schema.fieldNames()).collect(Collectors.joining(",")); +httpPut.setHeader("columns", dfColumns); +} } if (StringUtils.isNotBlank(maxFilterRatio)) { httpPut.setHeader("max_filter_ratio", maxFilterRatio); @@ -210,7 +216,7 @@ public class DorisStreamLoad implements Serializable { try (CloseableHttpClient httpClient = getHttpClient()) { String loadUrlStr = String.format(loadUrlPattern, getBackend(), db, tbl); this.loadUrlStr = loadUrlStr; -HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC); +HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema); RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows) .format(fileType) .sep(FIELD_DELIMITER) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) branch master updated: [fix] The result of serialization of decimal type does not meet the expected problem (#155)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 38c2718 [fix] The result of serialization of decimal type does not meet the expected problem (#155) 38c2718 is described below commit 38c2718f44af9d13e41b59622b5ccac8a03f413e Author: gnehil AuthorDate: Tue Nov 7 10:54:56 2023 +0800 [fix] The result of serialization of decimal type does not meet the expected problem (#155) --- .../src/main/java/org/apache/doris/spark/util/DataUtil.java| 10 +- .../main/scala/org/apache/doris/spark/sql/SchemaUtils.scala| 2 +- .../main/scala/org/apache/doris/spark/writer/DorisWriter.scala | 4 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java index 3f53d45..530657e 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java @@ -17,10 +17,11 @@ package org.apache.doris.spark.util; -import org.apache.doris.spark.sql.SchemaUtils; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.module.scala.DefaultScalaModule; +import org.apache.doris.spark.sql.SchemaUtils; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -31,7 +32,7 @@ import java.util.Map; public class DataUtil { -private static final ObjectMapper MAPPER = new ObjectMapper(); +private static final ObjectMapper MAPPER = JsonMapper.builder().addModule(new DefaultScalaModule()).build(); public static final String NULL_VALUE = "\\N"; @@ -67,8 +68,7 @@ public class DataUtil { return builder.toString().getBytes(StandardCharsets.UTF_8); } -public static byte[] rowToJsonBytes(InternalRow row, StructType schema) -throws JsonProcessingException { +public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws JsonProcessingException { StructField[] fields = schema.fields(); Map rowMap = new HashMap<>(row.numFields()); for (int i = 0; i < fields.length; i++) { diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index 982e580..1f0e942 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -168,7 +168,7 @@ private[spark] object SchemaUtils { new Timestamp(row.getLong(ordinal) / 1000).toString case DateType => DateTimeUtils.toJavaDate(row.getInt(ordinal)).toString case BinaryType => row.getBinary(ordinal) -case dt: DecimalType => row.getDecimal(ordinal, dt.precision, dt.scale) +case dt: DecimalType => row.getDecimal(ordinal, dt.precision, dt.scale).toJavaBigDecimal case at: ArrayType => val arrayData = row.getArray(ordinal) if (arrayData == null) DataUtil.NULL_VALUE diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala index f90bcc6..55f4d73 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -79,6 +79,10 @@ class DorisWriter(settings: SparkSettings) extends Serializable { * @param dataFrame source dataframe */ def writeStream(dataFrame: DataFrame): Unit = { +if (enable2PC) { + val errMsg = "two phrase commit is not supported in stream mode, please set doris.sink.enable-2pc to false." + throw new UnsupportedOperationException(errMsg) +} doWrite(dataFrame, dorisStreamLoader.loadStream) } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) branch master updated: [improvement] support two phases commit in structured streaming (#156)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new df4f107 [improvement] support two phases commit in structured streaming (#156) df4f107 is described below commit df4f107f6cf9ee4f75bd33c74aad7e43dde5058f Author: gnehil AuthorDate: Tue Nov 7 11:13:04 2023 +0800 [improvement] support two phases commit in structured streaming (#156) --- .../spark/listener/DorisTransactionListener.scala | 83 - .../doris/spark/sql/DorisSourceProvider.scala | 4 +- .../doris/spark/sql/DorisStreamLoadSink.scala | 11 ++- .../doris/spark/txn/TransactionHandler.scala | 100 + .../txn/listener/DorisTransactionListener.scala| 66 ++ .../listener/DorisTxnStreamingQueryListener.scala | 69 ++ .../apache/doris/spark/writer/DorisWriter.scala| 42 - 7 files changed, 262 insertions(+), 113 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala deleted file mode 100644 index b1e9d84..000 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala +++ /dev/null @@ -1,83 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.spark.listener - -import org.apache.doris.spark.load.DorisStreamLoad -import org.apache.doris.spark.sql.Utils -import org.apache.spark.scheduler._ -import org.apache.spark.util.CollectionAccumulator -import org.slf4j.{Logger, LoggerFactory} - -import java.time.Duration -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.util.{Failure, Success} - -class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Long], dorisStreamLoad: DorisStreamLoad, sinkTnxIntervalMs: Int, sinkTxnRetries: Int) - extends SparkListener { - - val logger: Logger = LoggerFactory.getLogger(classOf[DorisTransactionListener]) - - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { -val txnIds: mutable.Buffer[Long] = preCommittedTxnAcc.value.asScala -val failedTxnIds = mutable.Buffer[Long]() -jobEnd.jobResult match { - // if job succeed, commit all transactions - case JobSucceeded => -if (txnIds.isEmpty) { - logger.warn("job run succeed, but there is no pre-committed txn ids") - return -} -logger.info("job run succeed, start committing transactions") -txnIds.foreach(txnId => - Utils.retry(sinkTxnRetries, Duration.ofMillis(sinkTnxIntervalMs), logger) { -dorisStreamLoad.commit(txnId) - } () match { -case Success(_) => // do nothing -case Failure(_) => failedTxnIds += txnId - } -) - -if (failedTxnIds.nonEmpty) { - logger.error("uncommitted txn ids: {}", failedTxnIds.mkString(",")) -} else { - logger.info("commit transaction success") -} - // if job failed, abort all pre committed transactions - case _ => -if (txnIds.isEmpty) { - logger.warn("job run failed, but there is no pre-committed txn ids") - return -} -logger.info("job run failed, start aborting transactions") -txnIds.foreach(txnId => - Utils.retry(sinkTxnRetries, Duration.ofMillis(sinkTnxIntervalMs), logger) { -dorisStreamLoad.abortById(txnId) - } () match { -case Success(_) => // do nothing -case Failure(_) => failedTxnIds += txnId - }) -if (failedTxnIds.nonEmpty) { - logger.error("not aborted txn ids: {}", failedTxnIds.mkString(",")) -} else { - logger.info("abort transaction success") -} -} - } - -} diff --git
(doris) branch master updated (5d80e7dc2fe -> 3faf3b41187)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from 5d80e7dc2fe [Improvement](pipelineX) Improve local exchange on pipelineX engine (#26464) add 3faf3b41187 [chore] Print FE version even if it has been started (#26427) No new revisions were added by this update. Summary of changes: bin/start_fe.sh | 5 - .../src/main/java/org/apache/doris/DorisFE.java | 19 ++- 2 files changed, 18 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) branch master updated: [improve] update build script (#157)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 9cbce93 [improve] update build script (#157) 9cbce93 is described below commit 9cbce935b43a98a3ce608dc67c15aeb048bdb42d Author: gnehil AuthorDate: Wed Nov 8 18:39:28 2023 +0800 [improve] update build script (#157) Co-authored-by: gnehil --- spark-doris-connector/build.sh | 34 ++ 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/spark-doris-connector/build.sh b/spark-doris-connector/build.sh index d62aee1..ab2f770 100755 --- a/spark-doris-connector/build.sh +++ b/spark-doris-connector/build.sh @@ -142,24 +142,30 @@ selectScala() { selectSpark() { echo 'Spark-Doris-Connector supports multiple versions of spark. Which version do you need ?' - select spark in "2.3.x" "3.1.x" "3.2.x" "3.3.x" "other" + select spark in "2.3.x" "2.4.x" "3.1.x" "3.2.x" "3.3.x" "3.4.x" "other" do case $spark in "2.3.x") return 1 ;; - "3.1.x") + "2.4.x") return 2 ;; - "3.2.x") + "3.1.x") return 3 ;; - "3.3.x") + "3.2.x") return 4 ;; - "other") + "3.3.x") return 5 ;; + "3.4.x") +return 6 +;; + "other") +return 7 +;; esac done } @@ -174,18 +180,30 @@ SparkVer=$? if [ ${SparkVer} -eq 1 ]; then SPARK_VERSION="2.3.4" elif [ ${SparkVer} -eq 2 ]; then -SPARK_VERSION="3.1.2" +SPARK_VERSION="2.4.8" elif [ ${SparkVer} -eq 3 ]; then -SPARK_VERSION="3.2.0" +SPARK_VERSION="3.1.3" elif [ ${SparkVer} -eq 4 ]; then -SPARK_VERSION="3.3.2" +SPARK_VERSION="3.2.4" elif [ ${SparkVer} -eq 5 ]; then +SPARK_VERSION="3.3.3" +elif [ ${SparkVer} -eq 6 ]; then +SPARK_VERSION="3.4.1" +elif [ ${SparkVer} -eq 7 ]; then # shellcheck disable=SC2162 read -p 'Which spark version do you need? please input :' ver SPARK_VERSION=$ver fi +if [[ $SPARK_VERSION =~ ^2.3 && $SCALA_VERSION == "2.12" ]]; then + echo_r "Spark 2.3 is not compatible with scala 2.12, will exit." + exit 1 +elif [[ $SPARK_VERSION =~ ^3.* && $SCALA_VERSION == "2.11" ]]; then + echo_r "Spark 3.x is not compatible with scala 2.11, will exit." + exit 1 +fi + # extract major version: # eg: 3.1.2 -> 3.1 SPARK_MAJOR_VERSION=0 - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) branch release-1.3.0 created (now 9cbce93)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git at 9cbce93 [improve] update build script (#157) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) branch release-1.3.0 deleted (was 9cbce93)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git was 9cbce93 [improve] update build script (#157) The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) branch master updated: [Fix] quote field name with grave accent (#158)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new de60f63 [Fix] quote field name with grave accent (#158) de60f63 is described below commit de60f635cee7f30512d11126022402226641fc74 Author: gnehil AuthorDate: Thu Nov 9 21:23:11 2023 +0800 [Fix] quote field name with grave accent (#158) Co-authored-by: gnehil --- .../src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala index fe7e63d..db546ce 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala @@ -76,7 +76,7 @@ private[sql] class DorisRelation( requiredColumns.map(Utils.quote).mkString(",")) } else { paramWithScan += (ConfigurationOptions.DORIS_READ_FIELD -> - lazySchema.fields.map(f => f.name).mkString(",")) + lazySchema.fields.map(f => Utils.quote(f.name)).mkString(",")) } if (filters != null && filters.length > 0) { - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) branch release-1.3.0 created (now de60f63)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git at de60f63 [Fix] quote field name with grave accent (#158) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
svn commit: r65130 - in /dev/doris/spark-connector/1.3.0: ./ apache-doris-spark-connector-1.3.0-src.tar.gz apache-doris-spark-connector-1.3.0-src.tar.gz.asc apache-doris-spark-connector-1.3.0-src.tar.
Author: diwu Date: Thu Nov 9 13:58:35 2023 New Revision: 65130 Log: add spark connector 1.3.0 Added: dev/doris/spark-connector/1.3.0/ dev/doris/spark-connector/1.3.0/apache-doris-spark-connector-1.3.0-src.tar.gz (with props) dev/doris/spark-connector/1.3.0/apache-doris-spark-connector-1.3.0-src.tar.gz.asc (with props) dev/doris/spark-connector/1.3.0/apache-doris-spark-connector-1.3.0-src.tar.gz.sha512 Added: dev/doris/spark-connector/1.3.0/apache-doris-spark-connector-1.3.0-src.tar.gz == Binary file - no diff available. Propchange: dev/doris/spark-connector/1.3.0/apache-doris-spark-connector-1.3.0-src.tar.gz -- svn:mime-type = application/x-gzip Added: dev/doris/spark-connector/1.3.0/apache-doris-spark-connector-1.3.0-src.tar.gz.asc == Binary file - no diff available. Propchange: dev/doris/spark-connector/1.3.0/apache-doris-spark-connector-1.3.0-src.tar.gz.asc -- svn:mime-type = application/pgp-signature Added: dev/doris/spark-connector/1.3.0/apache-doris-spark-connector-1.3.0-src.tar.gz.sha512 == --- dev/doris/spark-connector/1.3.0/apache-doris-spark-connector-1.3.0-src.tar.gz.sha512 (added) +++ dev/doris/spark-connector/1.3.0/apache-doris-spark-connector-1.3.0-src.tar.gz.sha512 Thu Nov 9 13:58:35 2023 @@ -0,0 +1 @@ +71771acaca47b63dbf792fea56d634a27e609a7443609457736f807ab56325bb1972f1ce8ea20ee1de256dfc1d3d8a3d38cf2fd2c3c5f8740b29be73cfbb8622 apache-doris-spark-connector-1.3.0-src.tar.gz - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) tag 1.3.0 created (now de60f63)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to tag 1.3.0 in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git at de60f63 (commit) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) tag 1.3.0 deleted (was de60f63)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to tag 1.3.0 in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git *** WARNING: tag 1.3.0 was deleted! *** was de60f63 [Fix] quote field name with grave accent (#158) The revisions that were on this tag are still contained in other references; therefore, this change does not discard any commits from the repository. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
(doris-spark-connector) tag 1.3.0 created (now de60f63)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to tag 1.3.0 in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git at de60f63 (commit) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated (a8f312794e5 -> 6b64b7fec78)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from a8f312794e5 [feature](nereids)support stats estimation for is-null predicate (#24764) add 6b64b7fec78 [typo](doc)Add flink to read the doris table and use doris.filter.query to configure the display (#24736) No new revisions were added by this update. Summary of changes: docs/en/docs/ecosystem/flink-doris-connector.md| 4 docs/zh-CN/docs/ecosystem/flink-doris-connector.md | 4 2 files changed, 8 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-flink-connector] branch master updated: [fix] If there are single quotes in ddl statements, table creation fails (#200)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 85bfb93 [fix] If there are single quotes in ddl statements, table creation fails (#200) 85bfb93 is described below commit 85bfb93b5fdd0619efaf1da163bd608dbece3b96 Author: Antg <57290855+codea...@users.noreply.github.com> AuthorDate: Wed Sep 27 18:26:45 2023 +0800 [fix] If there are single quotes in ddl statements, table creation fails (#200) --- .../src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index b635a96..77584d3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -180,7 +180,7 @@ public class DorisSystem { //append table comment if(!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())){ sb.append(" COMMENT '") -.append(schema.getTableComment()) +.append(quoteComment(schema.getTableComment())) .append("' "); } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated (b6babf3af4 -> 377554ee1c)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from b6babf3af4 [pipelineX](sink) support jdbc table sink (#24970) add 377554ee1c [Fix](Job)Job Task does not display error message (#24897) No new revisions were added by this update. Summary of changes: .../main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java| 2 +- .../main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-shade] branch master updated: [paimon-catalog] support paimon 0.5.0 (#25)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-shade.git The following commit(s) were added to refs/heads/master by this push: new 897d2ec [paimon-catalog] support paimon 0.5.0 (#25) 897d2ec is described below commit 897d2ec3dd62e1c9fbf460a6779554ba432f382c Author: zhangdong <493738...@qq.com> AuthorDate: Thu Sep 28 14:51:27 2023 +0800 [paimon-catalog] support paimon 0.5.0 (#25) --- hadoop-aws-shade-2/pom.xml | 2 +- hadoop-cloud-shade-2/pom.xml | 2 +- hadoop-shade-2/pom.xml | 2 +- hive-catalog-shade/pom.xml | 8 hive-shade-3/pom.xml | 2 +- hudi-mr-shade-0.13/pom.xml | 2 +- kerby-shade-1.0/pom.xml | 2 +- pom.xml | 2 +- spring-shade-5/pom.xml | 2 +- thrift-shade-0.13/pom.xml| 4 ++-- 10 files changed, 14 insertions(+), 14 deletions(-) diff --git a/hadoop-aws-shade-2/pom.xml b/hadoop-aws-shade-2/pom.xml index ef75270..00121be 100644 --- a/hadoop-aws-shade-2/pom.xml +++ b/hadoop-aws-shade-2/pom.xml @@ -22,7 +22,7 @@ under the License. org.apache.doris doris-shade -1.0.1-SNAPSHOT +1.0.2-SNAPSHOT hadoop-aws-shade-2 diff --git a/hadoop-cloud-shade-2/pom.xml b/hadoop-cloud-shade-2/pom.xml index 8ec2798..6fe44a4 100644 --- a/hadoop-cloud-shade-2/pom.xml +++ b/hadoop-cloud-shade-2/pom.xml @@ -22,7 +22,7 @@ under the License. org.apache.doris doris-shade -1.0.1-SNAPSHOT +1.0.2-SNAPSHOT hadoop-cloud-shade-2 diff --git a/hadoop-shade-2/pom.xml b/hadoop-shade-2/pom.xml index f09d6d2..e39b9f2 100644 --- a/hadoop-shade-2/pom.xml +++ b/hadoop-shade-2/pom.xml @@ -22,7 +22,7 @@ under the License. org.apache.doris doris-shade -1.0.1-SNAPSHOT +1.0.2-SNAPSHOT hadoop-shade-2 diff --git a/hive-catalog-shade/pom.xml b/hive-catalog-shade/pom.xml index 11b2d4d..33d1e11 100644 --- a/hive-catalog-shade/pom.xml +++ b/hive-catalog-shade/pom.xml @@ -20,7 +20,7 @@ under the License. org.apache.doris doris-shade -1.0.1-SNAPSHOT +1.0.2-SNAPSHOT hive-catalog-shade @@ -28,10 +28,10 @@ under the License. 3.1.3 -3.3.4 +3.3.6 2.8.1 1.1.0 - 0.4.0-incubating + 0.5.0-incubating @@ -449,7 +449,7 @@ under the License. org.apache.httpcomponents - shade.doris.hive.org.apache.httpcomponents + shade.doris.hive.org.apache.httpcomponents diff --git a/hive-shade-3/pom.xml b/hive-shade-3/pom.xml index 78492fb..7783dc1 100644 --- a/hive-shade-3/pom.xml +++ b/hive-shade-3/pom.xml @@ -22,7 +22,7 @@ under the License. org.apache.doris doris-shade -1.0.1-SNAPSHOT +1.0.2-SNAPSHOT hive-shade-3 diff --git a/hudi-mr-shade-0.13/pom.xml b/hudi-mr-shade-0.13/pom.xml index 258715e..b20917c 100644 --- a/hudi-mr-shade-0.13/pom.xml +++ b/hudi-mr-shade-0.13/pom.xml @@ -22,7 +22,7 @@ under the License. org.apache.doris doris-shade -1.0.1-SNAPSHOT +1.0.2-SNAPSHOT hudi-mr-shade-0.13 diff --git a/kerby-shade-1.0/pom.xml b/kerby-shade-1.0/pom.xml index 8f84c4e..70bc032 100644 --- a/kerby-shade-1.0/pom.xml +++ b/kerby-shade-1.0/pom.xml @@ -22,7 +22,7 @@ under the License. org.apache.doris doris-shade -1.0.1-SNAPSHOT +1.0.2-SNAPSHOT kerby-shade-1.0 diff --git a/pom.xml b/pom.xml index 17f0ee7..fb4c723 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ under the License. org.apache.doris doris-shade -1.0.1-SNAPSHOT +1.0.2-SNAPSHOT pom Archetype - doris-shade https://doris.apache.org diff --git a/spring-shade-5/pom.xml b/spring-shade-5/pom.xml index 3a745a1..6910a05 100644 --- a/spring-shade-5/pom.xml +++ b/spring-shade-5/pom.xml @@ -22,7 +22,7 @@ under the License. org.apache.doris doris-shade -1.0.1-SNAPSHOT +1.0.2-SNAPSHOT spring-shade-5 diff --git a/thrift-shade-0.13/pom.xml b/thrift-shade-0.13/pom.xml index 31ac858..3beb8c2 100644 --- a/thrift-shade-0.13/pom.xml +++ b/thrift-shade-0.13/pom.xml @@ -21,10 +21,10 @@ under the License. org.apache.doris doris-shade -1.0.1-SNAPSHOT +1.0.2-SNAPSHOT thrift-shade-0.13 -1.0.1-SNAPSHOT +1.0.2-SNAPSHOT 0.13.0 - To unsubscribe, e-mail: commits-unsubscr...@doris.apac
[doris-shade] annotated tag doris-shade-1.0.2 updated (41abb36 -> 01dc9d7)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to annotated tag doris-shade-1.0.2 in repository https://gitbox.apache.org/repos/asf/doris-shade.git *** WARNING: tag doris-shade-1.0.2 was modified! *** from 41abb36 (commit) to 01dc9d7 (tag) tagging 41abb36b67901925414ab964ae4fdb86e5cdfc0c (commit) by wudi on Thu Sep 28 14:55:20 2023 +0800 - Log - [maven-release-plugin] copy for tag doris-shade-1.0.2 --- No new revisions were added by this update. Summary of changes: - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-shade] annotated tag doris-shade-1.0.2 deleted (was 01dc9d7)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to annotated tag doris-shade-1.0.2 in repository https://gitbox.apache.org/repos/asf/doris-shade.git *** WARNING: tag doris-shade-1.0.2 was deleted! *** tag was 01dc9d7 This change permanently discards the following revisions: discard 41abb36 [maven-release-plugin] prepare release doris-shade-1.0.2 - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-shade] branch 1.0.2-release created (now 897d2ec)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch 1.0.2-release in repository https://gitbox.apache.org/repos/asf/doris-shade.git at 897d2ec [paimon-catalog] support paimon 0.5.0 (#25) No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-shade] annotated tag doris-shade-1.0.2 updated (593bde4 -> 7372fc6)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to annotated tag doris-shade-1.0.2 in repository https://gitbox.apache.org/repos/asf/doris-shade.git *** WARNING: tag doris-shade-1.0.2 was modified! *** from 593bde4 (commit) to 7372fc6 (tag) tagging 593bde40c77d7e34b100836bc4b4624493956033 (commit) by wudi on Thu Sep 28 14:59:24 2023 +0800 - Log - [maven-release-plugin] copy for tag doris-shade-1.0.2 --- No new revisions were added by this update. Summary of changes: - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
svn commit: r64235 - in /dev/doris/doris-shade/1.0.2: ./ apache-doris-shade-1.0.2-src.tar.gz apache-doris-shade-1.0.2-src.tar.gz.asc apache-doris-shade-1.0.2-src.tar.gz.sha512
Author: diwu Date: Thu Sep 28 07:15:12 2023 New Revision: 64235 Log: add doris shade 1.0.2 Added: dev/doris/doris-shade/1.0.2/ dev/doris/doris-shade/1.0.2/apache-doris-shade-1.0.2-src.tar.gz (with props) dev/doris/doris-shade/1.0.2/apache-doris-shade-1.0.2-src.tar.gz.asc (with props) dev/doris/doris-shade/1.0.2/apache-doris-shade-1.0.2-src.tar.gz.sha512 Added: dev/doris/doris-shade/1.0.2/apache-doris-shade-1.0.2-src.tar.gz == Binary file - no diff available. Propchange: dev/doris/doris-shade/1.0.2/apache-doris-shade-1.0.2-src.tar.gz -- svn:mime-type = application/x-gzip Added: dev/doris/doris-shade/1.0.2/apache-doris-shade-1.0.2-src.tar.gz.asc == Binary file - no diff available. Propchange: dev/doris/doris-shade/1.0.2/apache-doris-shade-1.0.2-src.tar.gz.asc -- svn:mime-type = application/pgp-signature Added: dev/doris/doris-shade/1.0.2/apache-doris-shade-1.0.2-src.tar.gz.sha512 == --- dev/doris/doris-shade/1.0.2/apache-doris-shade-1.0.2-src.tar.gz.sha512 (added) +++ dev/doris/doris-shade/1.0.2/apache-doris-shade-1.0.2-src.tar.gz.sha512 Thu Sep 28 07:15:12 2023 @@ -0,0 +1 @@ +ba657f8a7f40c81c86fb12d295b0ebb0cc0bee24bbd29b4d77ad6b2bbbdf53fa62588a91cd207eae7f228f9dd375c7de80991a106d29bf83e681a8ba7da049c7 apache-doris-shade-1.0.2-src.tar.gz - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated (fddef8b473 -> 961ca76bd3)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from fddef8b473 [fix](es-catalog)fix error when querying the index ,elasticsearch version 8.9.1 (#24839) add 961ca76bd3 [doc](fix)fix doc misspell (#25072) No new revisions were added by this update. Summary of changes: .../sql-reference/Data-Manipulation-Statements/Manipulation/INSERT.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-spark-connector] branch master updated: Fix data loss due to internal retries (#145)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 5410651 Fix data loss due to internal retries (#145) 5410651 is described below commit 5410651e3fdcdc03ce14c09ae1b11a75f4a773ad Author: gnehil AuthorDate: Sun Oct 8 18:25:03 2023 +0800 Fix data loss due to internal retries (#145) --- .../apache/doris/spark/load/DorisStreamLoad.java | 96 +++--- .../spark/listener/DorisTransactionListener.scala | 2 +- .../apache/doris/spark/writer/DorisWriter.scala| 32 +--- 3 files changed, 106 insertions(+), 24 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 0b506b0..c524a4c 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; @@ -64,13 +65,14 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Consumer; /** * DorisStreamLoad **/ public class DorisStreamLoad implements Serializable { -private static final String NULL_VALUE = "\\N"; private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); @@ -97,7 +99,9 @@ public class DorisStreamLoad implements Serializable { private final String LINE_DELIMITER; private boolean streamingPassthrough = false; private final Integer batchSize; -private boolean enable2PC; +private final boolean enable2PC; +private final Integer txnRetries; +private final Integer txnIntervalMs; public DorisStreamLoad(SparkSettings settings) { String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\."); @@ -128,6 +132,10 @@ public class DorisStreamLoad implements Serializable { ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT); this.enable2PC = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC, ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT); +this.txnRetries = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_RETRIES, +ConfigurationOptions.DORIS_SINK_TXN_RETRIES_DEFAULT); +this.txnIntervalMs = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS, +ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS_DEFAULT); } public String getLoadUrlStr() { @@ -202,7 +210,19 @@ public class DorisStreamLoad implements Serializable { HttpResponse httpResponse = httpClient.execute(httpPut); loadResponse = new LoadResponse(httpResponse); } catch (IOException e) { -throw new RuntimeException(e); +if (enable2PC) { +int retries = txnRetries; +while (retries > 0) { +try { +abortByLabel(label); +retries = 0; +} catch (StreamLoadException ex) { + LockSupport.parkNanos(Duration.ofMillis(txnIntervalMs).toNanos()); +retries--; +} +} +} +throw new StreamLoadException("load execute failed", e); } if (loadResponse.status != HttpStatus.SC_OK) { @@ -274,22 +294,68 @@ public class DorisStreamLoad implements Serializable { } -public void abort(int txnId) throws StreamLoadException { +/** + * abort transaction by id + * + * @param txnId transaction id + * @throws StreamLoadException + */ +public void abortById(int txnId) throws StreamLoadException { LOG.info("start abort transaction {}.", txnId); +try { +doAbort(httpPut -> httpPut.setHeader("txn_id", String.valueOf(txnId))); +} catch (StreamLoadException e) { +LOG.error("abort transaction by id: {} failed.", txnId); +throw e; +} + +LOG.info("abort transaction {} succeed.", txnId); + +} + +/** + * abort transaction by label + * + * @param label label + * @throws StreamLoadException + */ +public void abortByLabel
[doris-spark-connector] branch master updated: map and struct type write format (#146)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 155a6a3 map and struct type write format (#146) 155a6a3 is described below commit 155a6a3f94f8b843c85431c3462a69a9452e3f46 Author: gnehil AuthorDate: Sun Oct 8 18:30:37 2023 +0800 map and struct type write format (#146) --- .../org/apache/doris/spark/sql/SchemaUtils.scala | 33 +++--- .../apache/doris/spark/sql/SchemaUtilsTest.scala | 21 +- 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index 86a403f..677cc2e 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -176,17 +176,42 @@ private[spark] object SchemaUtils { val mapData = row.getMap(ordinal) val keys = mapData.keyArray() val values = mapData.valueArray() +val sb = StringBuilder.newBuilder +sb.append("{") var i = 0 -val map = mutable.Map[Any, Any]() while (i < keys.numElements()) { - map += rowColumnValue(keys, i, mt.keyType) -> rowColumnValue(values, i, mt.valueType) + rowColumnValue(keys, i, mt.keyType) -> rowColumnValue(values, i, mt.valueType) + sb.append(quoteData(rowColumnValue(keys, i, mt.keyType), mt.keyType)) +.append(":").append(quoteData(rowColumnValue(values, i, mt.valueType), mt.valueType)) +.append(",") i += 1 } -map.toMap.asJava - case st: StructType => row.getStruct(ordinal, st.length) +if (i > 0) sb.dropRight(1) +sb.append("}").toString + case st: StructType => +val structData = row.getStruct(ordinal, st.length) +val sb = StringBuilder.newBuilder +sb.append("{") +var i = 0 +while (i < structData.numFields) { + val field = st.get(i) + sb.append(s""""${field.name}":""") +.append(quoteData(rowColumnValue(structData, i, field.dataType), field.dataType)) +.append(",") + i += 1 +} +if (i > 0) sb.dropRight(1) +sb.append("}").toString case _ => throw new DorisException(s"Unsupported spark type: ${dataType.typeName}") } } + private def quoteData(value: Any, dataType: DataType): Any = { +dataType match { + case StringType | TimestampType | DateType => s""""$value"""" + case _ => value +} + } + } diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala index e3868cb..7e6e5f5 100644 --- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala +++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala @@ -17,11 +17,11 @@ package org.apache.doris.spark.sql -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{Row, SparkSession} import org.junit.{Assert, Ignore, Test} import java.sql.{Date, Timestamp} -import scala.collection.JavaConverters._ @Ignore class SchemaUtilsTest { @@ -31,9 +31,16 @@ class SchemaUtilsTest { val spark = SparkSession.builder().master("local").getOrCreate() -val df = spark.createDataFrame(Seq( - (1, Date.valueOf("2023-09-08"), Timestamp.valueOf("2023-09-08 17:00:00"), Array(1, 2, 3), Map[String, String]("a" -> "1")) -)).toDF("c1", "c2", "c3", "c4", "c5") +val rdd = spark.sparkContext.parallelize(Seq( + Row(1, Date.valueOf("2023-09-08"), Timestamp.valueOf("2023-09-08 17:00:00"), Array(1, 2, 3), +Map[String, String]("a" -> "1"), Row("a", 1)) +)) +val df = spark.createDataFrame(rdd, new StructType().add("c1", IntegerType) + .add("c2", DateType) + .add("c3", TimestampType) + .add("c4", ArrayType.apply(IntegerType)) + .add("c5", MapType.apply(StringType, StringType)) + .add("c6", StructType.apply(Seq(StructField("a", StringType), StructField("b", IntegerType) val schema = df.schema @@ -44,8 +51,8 @@ class SchemaUtils
[doris] branch master updated (6ca0f3fa5f -> d702bc3c13)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from 6ca0f3fa5f [Bug](writer) Fix ub in async writer (#25218) add d702bc3c13 [typo](doc) hot and cold stratification increases FAQ (#24974) No new revisions were added by this update. Summary of changes: docs/en/docs/advanced/cold-hot-separation.md| 26 - docs/zh-CN/docs/advanced/cold-hot-separation.md | 24 +++ 2 files changed, 49 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated (b7ac95a9703 -> d4673ce28a8)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from b7ac95a9703 [enhancement](regression-test) open routine load regression test by default and add data check (#25122) add d4673ce28a8 [Feature](Job)Jobs in the Finish state will be automatically deleted after three days. (#25170) No new revisions were added by this update. Summary of changes: .../main/java/org/apache/doris/common/Config.java | 8 .../java/org/apache/doris/scheduler/job/Job.java | 4 .../doris/scheduler/manager/TimerJobManager.java | 23 +- 3 files changed, 34 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-flink-connector] branch master updated: [Improve](schemaChange)schema change type adapts to other connectors (#205)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 222ce60 [Improve](schemaChange)schema change type adapts to other connectors (#205) 222ce60 is described below commit 222ce60833ae114f54497e2404dfee8d93993357 Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com> AuthorDate: Wed Oct 11 10:05:03 2023 +0800 [Improve](schemaChange)schema change type adapts to other connectors (#205) --- .../sink/writer/JsonDebeziumSchemaSerializer.java | 156 + .../flink/sink/writer/SchemaChangeHelper.java | 2 + .../doris/flink/tools/cdc/SourceConnector.java | 36 + .../writer/TestJsonDebeziumSchemaSerializer.java | 138 ++ 4 files changed, 250 insertions(+), 82 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java index b2d88c6..fd3c92a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.NullNode; import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.CollectionUtils; + import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; @@ -33,7 +34,12 @@ import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.HttpGetWithEntity; import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema; +import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.doris.flink.tools.cdc.mysql.MysqlType; +import org.apache.doris.flink.tools.cdc.oracle.OracleType; +import org.apache.doris.flink.tools.cdc.postgres.PostgresType; +import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType; + import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.StringUtils; import org.apache.http.HttpHeaders; @@ -73,14 +79,15 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer beforeRow = extractBeforeRow(recordRoot); addDeleteSign(beforeRow, true); updateRow.append(objectMapper.writeValueAsString(beforeRow)) .append(this.lineDelimiter); } -//convert insert +// convert insert Map afterRow = extractAfterRow(recordRoot); addDeleteSign(afterRow, false); updateRow.append(objectMapper.writeValueAsString(afterRow)); @@ -207,14 +217,15 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer param = buildRequestParam(ddlSchema); +String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, +RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table); +Map param = buildRequestParam(ddlSchema); HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); boolean success = handleResponse(httpGet); if (!success) { -LOG.warn("schema change can not do table {}.{}",database,table); +LOG.warn("schema change can not do table {}.{}", database, table); } return success; } @@ -224,18 +235,18 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer(); } LOG.debug("received debezium ddl :{}", ddl); JsonNode tableChange = tableChanges.get(0); -Matcher matcher = addDropDDLPattern.matcher(ddl); -if (Objects.isNull(tableChange)|| !tableChange.get("type").asText().equals("ALTER") || !matcher.find()) { +if (Objects.isNull(tableChange) || !tableChange.get("type").asText().equals("ALTER")) { return null; } JsonNode columns = tableChange.get("table").get("columns"); if (firstSchemaChange) { +sourceConnector = SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase()); fillOriginSchema(columns); } Map updateFiledSchema = new LinkedHashMap<>(); @@ -243,25 +254,30 @@ public class JsonDebeziumSchemaSerializer implement
[doris-flink-connector] branch branch-for-flink-before-1.13 updated: fix setting error (#192)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch branch-for-flink-before-1.13 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 by this push: new db20f65 fix setting error (#192) db20f65 is described below commit db20f65c7811a7d3e4bf8a2fe9691a2cc60221fe Author: wanshicheng AuthorDate: Mon Sep 18 09:54:11 2023 +0800 fix setting error (#192) --- .../java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index 3f51df6..065eb18 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -177,7 +177,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat { } private boolean enableBatchDelete() { -return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(keysType); +return executionOptions.getEnableDelete() && UNIQUE_KEYS_TYPE.equals(keysType); } @Override - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated (a718f1f6cd -> 016dd2a45f)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from a718f1f6cd [fix](lock): do not use instance lock to protect static data (#24611) add 016dd2a45f [Chore](ci)Temporarily cancel the mandatory restrictions of shellCheck (#24765) No new revisions were added by this update. Summary of changes: .asf.yaml | 1 - 1 file changed, 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-flink-connector] branch master updated: [Feature](CacheWriter) doris sink support cache record buffer (#193)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 17b13a0 [Feature](CacheWriter) doris sink support cache record buffer (#193) 17b13a0 is described below commit 17b13a09e7efbb16a9415d68cdb0936b42a69d4d Author: GoGoWen <82132356+gogo...@users.noreply.github.com> AuthorDate: Tue Sep 26 15:32:10 2023 +0800 [Feature](CacheWriter) doris sink support cache record buffer (#193) --- .../doris/flink/catalog/DorisCatalogFactory.java | 2 + .../doris/flink/cfg/DorisExecutionOptions.java | 16 ++- .../doris/flink/sink/writer/CacheRecordBuffer.java | 118 + .../doris/flink/sink/writer/DorisStreamLoad.java | 11 +- .../doris/flink/sink/writer/DorisWriter.java | 56 ++ .../doris/flink/sink/writer/RecordBuffer.java | 4 +- .../doris/flink/sink/writer/RecordStream.java | 14 ++- .../doris/flink/table/DorisConfigOptions.java | 6 ++ .../flink/table/DorisDynamicTableFactory.java | 5 + .../apache/doris/flink/tools/cdc/DatabaseSync.java | 2 + .../flink/sink/writer/TestCacheRecordBuffer.java | 77 ++ .../flink/sink/writer/TestDorisStreamLoad.java | 6 +- .../doris/flink/sink/writer/TestDorisWriter.java | 2 +- 13 files changed, 285 insertions(+), 34 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java index 958ce3d..c00e5f1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalogFactory.java @@ -50,6 +50,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_USE_CACHE; import static org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API; import static org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER; @@ -105,6 +106,7 @@ public class DorisCatalogFactory implements CatalogFactory { options.add(SINK_BUFFER_SIZE); options.add(SINK_BUFFER_COUNT); options.add(SINK_PARALLELISM); +options.add(SINK_USE_CACHE); options.add(SOURCE_USE_OLD_API); return options; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index ccd23f9..2422df8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -42,6 +42,8 @@ public class DorisExecutionOptions implements Serializable { private final int bufferSize; private final int bufferCount; private final String labelPrefix; +private final boolean useCache; + /** * Properties for the StreamLoad. */ @@ -62,6 +64,7 @@ public class DorisExecutionOptions implements Serializable { int bufferSize, int bufferCount, String labelPrefix, + boolean useCache, Properties streamLoadProp, Boolean enableDelete, Boolean enable2PC, @@ -77,6 +80,7 @@ public class DorisExecutionOptions implements Serializable { this.bufferSize = bufferSize; this.bufferCount = bufferCount; this.labelPrefix = labelPrefix; +this.useCache = useCache; this.streamLoadProp = streamLoadProp; this.enableDelete = enableDelete; this.enable2PC = enable2PC; @@ -132,6 +136,10 @@ public class DorisExecutionOptions implements Serializable { return labelPrefix; } +public boolean isUseCache () { +return useCache; +} + public Properties getStreamLoadProp() { return streamLoadProp; } @@ -177,6 +185,7 @@ public class DorisExecutionOptions implements Serializable { private int bufferSize = DEFAULT_BUFFER_SIZE; private int bufferCount = DEFAULT_BUFFER_COUNT; private String labelPrefix = ""; +private boolean useCache = false;
[doris-flink-connector] branch master updated: [fix] fix batch sink flush lost batch (#180)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 2e7d791 [fix] fix batch sink flush lost batch (#180) 2e7d791 is described below commit 2e7d7914393acf7a69d7596120f1be10694adef9 Author: bingquanzhao AuthorDate: Mon Aug 21 10:30:58 2023 +0800 [fix] fix batch sink flush lost batch (#180) --- .../flink/sink/batch/DorisBatchStreamLoad.java | 15 +++-- .../apache/doris/flink/DorisSinkBatchExample.java | 64 -- 2 files changed, 66 insertions(+), 13 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 1fe217d..a43220c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -139,17 +139,16 @@ public class DorisBatchStreamLoad implements Serializable { public synchronized void flush(boolean waitUtilDone) throws InterruptedException { checkFlushException(); -if (buffer == null) { -LOG.debug("buffer is empty, skip flush."); -return; +if (buffer != null && !buffer.isEmpty()) { +buffer.setLabelName(labelGenerator.generateBatchLabel()); +BatchRecordBuffer tmpBuff = buffer; +readQueue.put(tmpBuff); +this.buffer = null; } -buffer.setLabelName(labelGenerator.generateBatchLabel()); -BatchRecordBuffer tmpBuff = buffer; -readQueue.put(tmpBuff); -if(waitUtilDone){ + +if (waitUtilDone) { waitAsyncLoadFinish(); } -this.buffer = null; } private void putRecordToWriteQueue(BatchRecordBuffer buffer){ diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java index a6835c6..4a04d78 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java @@ -21,15 +21,17 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.batch.DorisBatchSink; import org.apache.doris.flink.sink.writer.SimpleStringSerializer; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import java.util.Arrays; import java.util.Properties; import java.util.UUID; public class DorisSinkBatchExample { -public static void main(String[] args) throws Exception{ +public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); @@ -56,11 +58,11 @@ public class DorisSinkBatchExample { .setTableIdentifier("test.test_flink") .setUsername("root") .setPassword(""); -DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); +DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder.setLabelPrefix("label") .setStreamLoadProp(properties) .setDeletable(false) -.setBufferFlushMaxBytes(8*1024) +.setBufferFlushMaxBytes(8 * 1024) .setBufferFlushMaxRows(900) .setBufferFlushIntervalMs(1000 * 10); @@ -71,15 +73,17 @@ public class DorisSinkBatchExample { env.addSource(new SourceFunction() { private Long id = 0L; + @Override public void run(SourceContext out) throws Exception { -while(true){ -id=id+1; +while (true) { +id = id + 1; String record = id + "," + UUID.randomUUID() + "," + id + ""; out.collect(record); Thread.sleep(500); } } + @Override public void cancel() { @@ -88,4 +92,54 @@ public class DorisSinkBatchExample { env.execute("doris batch test"); } + +public void testBatchFlush() throws Excepti
[doris] branch master updated (37b49f60b7 -> b9127fa847)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/doris.git from 37b49f60b7 [refactor](conf) add be conf for partition topn partitions threshold (#23220) add b9127fa847 [Fix](View)Use the element_at function to replace %element_extract% to prevent parsing errors (#23093) No new revisions were added by this update. Summary of changes: fe/fe-core/src/main/cup/sql_parser.cup| 6 +-- regression-test/data/view_p0/view_p0.out | 4 ++ regression-test/suites/view_p0/view_p0.groovy | 75 +++ 3 files changed, 82 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-spark-connector] branch master updated: [fix] write date type error with json format (#130)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new b5f9095 [fix] write date type error with json format (#130) b5f9095 is described below commit b5f9095360ca0b6a7c7ef82732d09717be4bb941 Author: gnehil AuthorDate: Mon Aug 21 15:15:34 2023 +0800 [fix] write date type error with json format (#130) --- spark-doris-connector/pom.xml | 2 +- .../src/main/java/org/apache/doris/spark/util/DataUtil.java| 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index a5a7e7a..74a53cc 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -67,7 +67,7 @@ -1.2.0-SNAPSHOT +1.3.0-SNAPSHOT 3.1.2 3.1 2.12 diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java index ed9af50..5877447 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java @@ -20,6 +20,7 @@ package org.apache.doris.spark.util; import scala.collection.JavaConversions; import scala.collection.mutable.WrappedArray; +import java.sql.Date; import java.sql.Timestamp; import java.util.Arrays; @@ -33,7 +34,7 @@ public class DataUtil { return NULL_VALUE; } -if (value instanceof Timestamp) { +if (value instanceof Date || value instanceof Timestamp) { return value.toString(); } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-flink-connector] branch master updated: add sqlserver cdc (#181)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 7b4b9ec add sqlserver cdc (#181) 7b4b9ec is described below commit 7b4b9ec7e3869bc2800ce83a6ee9d4a51b536ec8 Author: wudi <676366...@qq.com> AuthorDate: Thu Aug 24 14:46:24 2023 +0800 add sqlserver cdc (#181) --- flink-doris-connector/pom.xml | 6 + .../org/apache/doris/flink/tools/cdc/CdcTools.java | 13 ++ .../flink/tools/cdc/postgres/PostgresType.java | 2 +- .../tools/cdc/sqlserver/SqlServerDatabaseSync.java | 178 + .../cdc/sqlserver/SqlServerDateConverter.java | 97 +++ .../flink/tools/cdc/sqlserver/SqlServerSchema.java | 33 .../flink/tools/cdc/sqlserver/SqlServerType.java | 94 +++ .../tools/cdc/CdcSqlServerSyncDatabaseCase.java| 82 ++ 8 files changed, 504 insertions(+), 1 deletion(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 73a748c..333a40b 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -260,6 +260,12 @@ under the License. 2.4.1 provided + +com.ververica +flink-sql-connector-sqlserver-cdc +2.4.1 +provided + org.apache.flink flink-runtime-web diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 754dbce..6a390ea 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -20,6 +20,7 @@ package org.apache.doris.flink.tools.cdc; import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync; import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync; +import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerDatabaseSync; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -37,6 +38,7 @@ public class CdcTools { private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database"; private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database"; private static final String POSTGRES_SYNC_DATABASE = "postgres-sync-database"; +private static final String SQLSERVER_SYNC_DATABASE = "sqlserver-sync-database"; private static final List EMPTY_KEYS = Arrays.asList("password"); public static void main(String[] args) throws Exception { @@ -53,6 +55,9 @@ public class CdcTools { case POSTGRES_SYNC_DATABASE: createPostgresSyncDatabase(opArgs); break; +case SQLSERVER_SYNC_DATABASE: +createSqlServerSyncDatabase(opArgs); +break; default: System.out.println("Unknown operation " + operation); System.exit(1); @@ -83,6 +88,14 @@ public class CdcTools { syncDatabase(params, databaseSync, postgresConfig, "Postgres"); } +private static void createSqlServerSyncDatabase(String[] opArgs) throws Exception { +MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); +Map postgresMap = getConfigMap(params, "sqlserver-conf"); +Configuration postgresConfig = Configuration.fromMap(postgresMap); +DatabaseSync databaseSync = new SqlServerDatabaseSync(); +syncDatabase(params, databaseSync, postgresConfig, "SqlServer"); +} + private static void syncDatabase(MultipleParameterTool params, DatabaseSync databaseSync, Configuration config, String type) throws Exception { String jobName = params.get("job-name"); String database = params.get("database"); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java index 87cbde2..8886c09 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java @@ -78,7 +78,7 @@ public class PostgresType { case BIGSERIAL: return DorisType.BIGINT; case NUMERIC: -return precision != null && precision
[doris-flink-connector] branch master updated: [Fix]fix type mapping with mysql and pg (#183)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 57ea05d [Fix]fix type mapping with mysql and pg (#183) 57ea05d is described below commit 57ea05d9a8d2d25d123c983727d3c6d258285d06 Author: wudi <676366...@qq.com> AuthorDate: Thu Aug 24 15:56:50 2023 +0800 [Fix]fix type mapping with mysql and pg (#183) --- .../main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java | 2 -- .../org/apache/doris/flink/tools/cdc/postgres/PostgresType.java | 6 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java index 92325ac..143ea52 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java @@ -140,8 +140,6 @@ public class MysqlType { case TIMESTAMP: return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(length == null ? 0 : length, 6)); case CHAR: -Preconditions.checkNotNull(length); -return String.format("%s(%s)", DorisType.CHAR, length); case VARCHAR: Preconditions.checkNotNull(length); return length * 3 > 65533 ? DorisType.STRING : String.format("%s(%s)", DorisType.VARCHAR, length * 3); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java index 8886c09..5c2feff 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresType.java @@ -67,6 +67,9 @@ public class PostgresType { public static String toDorisType(String postgresType, Integer precision, Integer scale) { postgresType = postgresType.toLowerCase(); +if(postgresType.startsWith("_")){ + return DorisType.STRING; +} switch (postgresType){ case INT2: case SMALLSERIAL: @@ -121,6 +124,8 @@ public class PostgresType { case JSON: case JSONB: return DorisType.JSONB; +/* Compatible with doris1.2 array type can only be used in dup table, + and then converted to array in the next version case _BOOL: return String.format("%s<%s>", DorisType.ARRAY, DorisType.BOOLEAN); case _INT2: @@ -139,6 +144,7 @@ public class PostgresType { return String.format("%s<%s>", DorisType.ARRAY, DorisType.DATE_V2); case _TIMESTAMP: return String.format("%s<%s>", DorisType.ARRAY, DorisType.DATETIME_V2); +**/ default: throw new UnsupportedOperationException("Unsupported Postgres Type: " + postgresType); } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated: [doc](flink-connector) add pg and sqlserver database sync (#23427)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new 3692c10654 [doc](flink-connector) add pg and sqlserver database sync (#23427) 3692c10654 is described below commit 3692c10654394b8c461e911180d3b1965c2520e9 Author: wudi <676366...@qq.com> AuthorDate: Fri Aug 25 15:30:59 2023 +0800 [doc](flink-connector) add pg and sqlserver database sync (#23427) --- docs/en/docs/ecosystem/flink-doris-connector.md| 54 +- docs/zh-CN/docs/ecosystem/flink-doris-connector.md | 54 +- 2 files changed, 106 insertions(+), 2 deletions(-) diff --git a/docs/en/docs/ecosystem/flink-doris-connector.md b/docs/en/docs/ecosystem/flink-doris-connector.md index b8f385469d..5b7620c659 100644 --- a/docs/en/docs/ecosystem/flink-doris-connector.md +++ b/docs/en/docs/ecosystem/flink-doris-connector.md @@ -416,7 +416,7 @@ insert into doris_sink select id,name from cdc_mysql_source; /bin/flink run \ -c org.apache.doris.flink.tools.cdc.CdcTools \ lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar\ - \ + \ --database \ [--job-name ] \ [--table-prefix ] \ @@ -490,6 +490,58 @@ insert into doris_sink select id,name from cdc_mysql_source; --table-conf replication_num=1 ``` +### PostgreSQL synchronization example + +```shell +/bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1\ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \ + postgres-sync-database \ + --database db1\ + --postgres-conf hostname=127.0.0.1 \ + --postgres-conf port=5432 \ + --postgres-conf username=postgres \ + --postgres-conf password="123456" \ + --postgres-conf database-name=postgres \ + --postgres-conf schema-name=public \ + --postgres-conf slot.name=test \ + --postgres-conf decoding.plugin.name=pgoutput \ + --including-tables "tbl1|tbl2" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password=\ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + +### SQLServer synchronization example + +```shell +/bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \ + sqlserver-sync-database \ + --database db1\ + --sqlserver-conf hostname=127.0.0.1 \ + --sqlserver-conf port=1433 \ + --sqlserver-conf username=sa \ + --sqlserver-conf password="123456" \ + --sqlserver-conf database-name=CDC_DB \ + --sqlserver-conf schema-name=dbo \ + --including-tables "tbl1|tbl2" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password=\ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + ## Use FlinkCDC to update Key column Generally, in a business database, the number is used as the primary key of the table, such as the Student table, the number (id) is used as the primary key, but with the development of the business, the number corresponding to the data may change. diff --git a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md index ad1947aaee..179234b606 100644 --- a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md +++ b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md @@ -418,7 +418,7 @@ insert into doris_sink select id,name from cdc_mysql_source; /bin/flink run \ -c org.apache.doris.flink.tools.cdc.CdcTools \ lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \ - \ + \ --database \ [--job-name ] \ [--table-prefix ] \ @@ -491,6 +491,58 @@ insert into doris_sink select id,name from cdc_mysql_source; --table-conf replication_num=1 ``` +### PostgreSQL同步示例 + +```shell +/bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1\ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \ + postgres-sync-database \ + --database db1\ + --postgres-conf hostname=127.0.0.1 \ + --postgres-conf port=5432 \ + --postgres-conf username=postgres \ + --postgres-conf password="123456" \ + --postgres-conf database-name=postgres \ + --postgres-conf schema-name=public \ + --postgres-conf slot.name=test \ + --postgres-conf decoding.plugin.name=pgoutput \ + --including-tables "tbl1|tbl2" \ + --sink-conf fenod
[doris-spark-connector] branch master updated: fix error when reading with not in clause (#134)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 71af841 fix error when reading with not in clause (#134) 71af841 is described below commit 71af841c7536d843e9d7babaf69e4b95298a9502 Author: gnehil AuthorDate: Fri Aug 25 16:55:06 2023 +0800 fix error when reading with not in clause (#134) --- .../scala/org/apache/doris/spark/sql/DorisRelation.scala | 12 +--- .../src/main/scala/org/apache/doris/spark/sql/Utils.scala| 6 ++ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala index 4c9d348..049d5a2 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala @@ -17,19 +17,18 @@ package org.apache.doris.spark.sql -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.math.min - import org.apache.doris.spark.cfg.ConfigurationOptions._ import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} - import org.apache.spark.rdd.RDD import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.math.min + private[sql] class DorisRelation( val sqlContext: SQLContext, parameters: Map[String, String]) @@ -81,8 +80,7 @@ private[sql] class DorisRelation( } if (filters != null && filters.length > 0) { - val dorisFilterQuery = cfg.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY, "1=1") - paramWithScan += (ConfigurationOptions.DORIS_FILTER_QUERY -> (dorisFilterQuery + " and " + filterWhereClause)) + paramWithScan += (ConfigurationOptions.DORIS_FILTER_QUERY -> filterWhereClause) } new ScalaDorisRowRDD(sqlContext.sparkContext, paramWithScan.toMap, lazySchema) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala index 2b9c3c1..54976a7 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala @@ -59,6 +59,12 @@ private[spark] object Utils { } else { s"${quote(attribute)} in (${compileValue(values)})" } + case Not(In(attribute, values)) => +if (values.isEmpty || values.length >= inValueLengthLimit) { + null +} else { + s"${quote(attribute)} not in (${compileValue(values)})" +} case IsNull(attribute) => s"${quote(attribute)} is null" case IsNotNull(attribute) => s"${quote(attribute)} is not null" case And(left, right) => - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch master updated: [Feature](Job)Allow Job to perform all insert operations, and limit permissions to allow Admin operations (#23492)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new da21b1cb24 [Feature](Job)Allow Job to perform all insert operations, and limit permissions to allow Admin operations (#23492) da21b1cb24 is described below commit da21b1cb244fb24bd48ad9b53bc4a48377862f44 Author: Calvin Kirs AuthorDate: Fri Aug 25 21:58:53 2023 +0800 [Feature](Job)Allow Job to perform all insert operations, and limit permissions to allow Admin operations (#23492) --- .../org/apache/doris/analysis/CreateJobStmt.java | 35 -- .../org/apache/doris/analysis/PauseJobStmt.java| 11 +-- .../org/apache/doris/analysis/ResumeJobStmt.java | 13 ++-- .../org/apache/doris/analysis/ShowJobTaskStmt.java | 10 +-- .../org/apache/doris/analysis/StopJobStmt.java | 11 +-- 5 files changed, 31 insertions(+), 49 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index b6736b2730..a1f9b6bd82 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -19,8 +19,11 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.scheduler.common.IntervalUnit; import org.apache.doris.scheduler.constants.JobCategory; @@ -33,6 +36,8 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import java.util.HashSet; + /** * syntax: * CREATE @@ -79,8 +84,10 @@ public class CreateJobStmt extends DdlStmt { private String timezone = TimeUtils.DEFAULT_TIME_ZONE; -private static final ImmutableSet supportStmtClassName = new ImmutableSet.Builder() -.add(NativeInsertStmt.class.getName()).build(); +private static final ImmutableSet> supportStmtSuperClass += new ImmutableSet.Builder>().add(InsertStmt.class).build(); + +private static HashSet supportStmtClassNamesCache = new HashSet<>(16); public CreateJobStmt(LabelName labelName, String onceJobStartTimestamp, Boolean isStreamingJob, Long interval, String intervalTimeUnit, @@ -134,17 +141,27 @@ public class CreateJobStmt extends DdlStmt { analyzerSqlStmt(); } -private void checkAuth() throws AnalysisException { -UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity(); -if (!userIdentity.isRootUser()) { -throw new AnalysisException("only root user can create job"); +protected static void checkAuth() throws AnalysisException { +if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); } } -private void analyzerSqlStmt() throws UserException { -if (!supportStmtClassName.contains(stmt.getClass().getName())) { -throw new AnalysisException("Not support stmt type"); +private void checkStmtSupport() throws AnalysisException { +if (supportStmtClassNamesCache.contains(stmt.getClass().getSimpleName())) { +return; +} +for (Class clazz : supportStmtSuperClass) { +if (clazz.isAssignableFrom(stmt.getClass())) { + supportStmtClassNamesCache.add(stmt.getClass().getSimpleName()); +return; +} } +throw new AnalysisException("Not support this stmt type"); +} + +private void analyzerSqlStmt() throws UserException { +checkStmtSupport(); stmt.analyze(analyzer); String originStmt = getOrigStmt().originStmt; String executeSql = parseExecuteSql(originStmt); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java index dd23e61b9a..c399fc37cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java @@ -18,11 +18,9 @@ package org.apache.doris.analysis; import org.apache.doris.cluster.ClusterNamespace; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode;
[doris] branch master updated: [Improve](Job)Allows modify the configuration of the Job queue and the number of consumer threads (#23547)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/master by this push: new ef2fc44e5c [Improve](Job)Allows modify the configuration of the Job queue and the number of consumer threads (#23547) ef2fc44e5c is described below commit ef2fc44e5c07312011e33820c71679fc13643235 Author: Calvin Kirs AuthorDate: Mon Aug 28 12:01:49 2023 +0800 [Improve](Job)Allows modify the configuration of the Job queue and the number of consumer threads (#23547) --- .../src/main/java/org/apache/doris/common/Config.java| 14 ++ .../apache/doris/scheduler/disruptor/TaskDisruptor.java | 16 ++-- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 9da22b20da..44b7a27007 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1556,6 +1556,20 @@ public class Config extends ConfigBase { @ConfField public static int scheduler_job_task_max_num = 10; +/** + * The number of async tasks that can be queued. @See TaskDisruptor + * if consumer is slow, the queue will be full, and the producer will be blocked. + */ +@ConfField +public static int async_task_queen_size = 1024; + +/** + * The number of threads used to consume async tasks. @See TaskDisruptor + * if we have a lot of async tasks, we need more threads to consume them. Sure, it's depends on the cpu cores. + */ +@ConfField +public static int async_task_consumer_thread_num = 10; + // enable_workload_group should be immutable and temporarily set to mutable during the development test phase @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL) public static boolean enable_workload_group = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java index 889355f2cd..3b59a5187e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java @@ -17,6 +17,7 @@ package org.apache.doris.scheduler.disruptor; +import org.apache.doris.common.Config; import org.apache.doris.scheduler.constants.TaskType; import org.apache.doris.scheduler.manager.TimerJobManager; import org.apache.doris.scheduler.manager.TransientTaskManager; @@ -48,20 +49,15 @@ import java.util.concurrent.TimeUnit; public class TaskDisruptor implements Closeable { private final Disruptor disruptor; -private static final int DEFAULT_RING_BUFFER_SIZE = 1024; +private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size; + +private static int consumerThreadCount = Config.async_task_consumer_thread_num; /** * The default timeout for {@link #close()} in seconds. */ private static final int DEFAULT_CLOSE_WAIT_TIME_SECONDS = 5; -/** - * The default number of consumers to create for each {@link Disruptor} instance. - */ -private static final int DEFAULT_CONSUMER_COUNT = System.getProperty("event.task.disruptor.consumer.count") -== null ? Runtime.getRuntime().availableProcessors() -: Integer.parseInt(System.getProperty("event.task.disruptor.consumer.count")); - /** * Whether this disruptor has been closed. * if true, then we can't publish any more events. @@ -82,8 +78,8 @@ public class TaskDisruptor implements Closeable { ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE; disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, producerThreadFactory, ProducerType.SINGLE, new BlockingWaitStrategy()); -WorkHandler[] workers = new TaskHandler[DEFAULT_CONSUMER_COUNT]; -for (int i = 0; i < DEFAULT_CONSUMER_COUNT; i++) { +WorkHandler[] workers = new TaskHandler[consumerThreadCount]; +for (int i = 0; i < consumerThreadCount; i++) { workers[i] = new TaskHandler(timerJobManager, transientTaskManager); } disruptor.handleEventsWithWorkerPool(workers); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris] branch branch-1.2-lts updated: [typo](docs) modify the documentation for the bitmap function in 1.2-lts (#21724)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git The following commit(s) were added to refs/heads/branch-1.2-lts by this push: new eef6a4ab6b [typo](docs) modify the documentation for the bitmap function in 1.2-lts (#21724) eef6a4ab6b is described below commit eef6a4ab6b6d86dad33ae81e54f880a203b42d0c Author: Euporia <31833513+vinle...@users.noreply.github.com> AuthorDate: Fri Jul 14 18:48:20 2023 +0800 [typo](docs) modify the documentation for the bitmap function in 1.2-lts (#21724) --- .../sql-functions/bitmap-functions/bitmap_and.md | 15 + .../bitmap-functions/bitmap_and_not.md | 21 + .../sql-functions/bitmap-functions/bitmap_empty.md | 7 ++ .../bitmap-functions/bitmap_has_all.md | 26 +++--- .../bitmap-functions/bitmap_has_any.md | 26 +++--- .../bitmap-functions/bitmap_hash64.md | 12 +- .../sql-functions/bitmap-functions/bitmap_not.md | 14 ++-- .../sql-functions/bitmap-functions/bitmap_or.md| 25 + .../sql-functions/bitmap-functions/bitmap_and.md | 15 + .../bitmap-functions/bitmap_and_not.md | 21 + .../sql-functions/bitmap-functions/bitmap_empty.md | 7 ++ .../bitmap-functions/bitmap_has_all.md | 26 +++--- .../bitmap-functions/bitmap_has_any.md | 26 +++--- .../bitmap-functions/bitmap_hash64.md | 12 +- .../sql-functions/bitmap-functions/bitmap_not.md | 20 - .../sql-functions/bitmap-functions/bitmap_or.md| 25 + 16 files changed, 191 insertions(+), 107 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-functions/bitmap-functions/bitmap_and.md b/docs/en/docs/sql-manual/sql-functions/bitmap-functions/bitmap_and.md index a64a52b570..7cfdcec0d0 100644 --- a/docs/en/docs/sql-manual/sql-functions/bitmap-functions/bitmap_and.md +++ b/docs/en/docs/sql-manual/sql-functions/bitmap-functions/bitmap_and.md @@ -42,6 +42,13 @@ mysql> select bitmap_count(bitmap_and(to_bitmap(1), to_bitmap(2))) cnt; |0 | +--+ +mysql> select bitmap_to_string(bitmap_and(to_bitmap(1), to_bitmap(2))); ++--+ +| bitmap_to_string(bitmap_and(to_bitmap(1), to_bitmap(2))) | ++--+ +| | ++--+ + mysql> select bitmap_count(bitmap_and(to_bitmap(1), to_bitmap(1))) cnt; +--+ | cnt | @@ -49,28 +56,28 @@ mysql> select bitmap_count(bitmap_and(to_bitmap(1), to_bitmap(1))) cnt; |1 | +--+ -MySQL> select bitmap_to_string(bitmap_and(to_bitmap(1), to_bitmap(1))); +mysql> select bitmap_to_string(bitmap_and(to_bitmap(1), to_bitmap(1))); +--+ | bitmap_to_string(bitmap_and(to_bitmap(1), to_bitmap(1))) | +--+ | 1| +--+ -MySQL> select bitmap_to_string(bitmap_and(bitmap_from_string('1,2,3'), bitmap_from_string('1,2'), bitmap_from_string('1,2,3,4,5'))); +mysql> select bitmap_to_string(bitmap_and(bitmap_from_string('1,2,3'), bitmap_from_string('1,2'), bitmap_from_string('1,2,3,4,5'))); +---+ | bitmap_to_string(bitmap_and(bitmap_from_string('1,2,3'), bitmap_from_string('1,2'), bitmap_from_string('1,2,3,4,5'))) | +---+ | 1,2 | +---+ -MySQL> select bitmap_to_string(bitmap_and(bitmap_from_string('1,2,3'), bitmap_from_string('1,2'), bitmap_from_string('1,2,3,4,5'),bitmap_empty())); +mysql> select bitmap_to_string(bitmap_and(bitmap_from_string('1,2,3'), bitmap_from_string('1,2'), bitmap_from_string('1,2,3,4,5'),bitmap_empty())); +
[doris-spark-connector] branch master updated: [improvement] Escaping special characters (#118)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git The following commit(s) were added to refs/heads/master by this push: new 9a4464b [improvement] Escaping special characters (#118) 9a4464b is described below commit 9a4464bf63ca1af5ca82d95e0ab6d2e136de49f6 Author: gnehil AuthorDate: Tue Jul 18 14:33:37 2023 +0800 [improvement] Escaping special characters (#118) --- .../apache/doris/spark/load/DorisStreamLoad.java | 6 ++-- .../org/apache/doris/spark/util/EscapeHandler.java | 40 ++ .../apache/doris/spark/util/EscapeHandlerTest.java | 36 +++ 3 files changed, 80 insertions(+), 2 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 61379e3..07e6624 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + package org.apache.doris.spark.load; import org.apache.doris.spark.cfg.ConfigurationOptions; @@ -22,6 +23,7 @@ import org.apache.doris.spark.exception.StreamLoadException; import org.apache.doris.spark.rest.RestService; import org.apache.doris.spark.rest.models.BackendV2; import org.apache.doris.spark.rest.models.RespContent; +import org.apache.doris.spark.util.EscapeHandler; import org.apache.doris.spark.util.ListUtils; import com.fasterxml.jackson.core.JsonProcessingException; @@ -102,9 +104,9 @@ public class DorisStreamLoad implements Serializable { .build(new BackendCacheLoader(settings)); fileType = streamLoadProp.getOrDefault("format", "csv"); if ("csv".equals(fileType)){ -FIELD_DELIMITER = streamLoadProp.getOrDefault("column_separator", "\t"); -LINE_DELIMITER = streamLoadProp.getOrDefault("line_delimiter", "\n"); +FIELD_DELIMITER = EscapeHandler.escapeString(streamLoadProp.getOrDefault("column_separator", "\t")); } +LINE_DELIMITER = EscapeHandler.escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n")); } public String getLoadUrlStr() { diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/EscapeHandler.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/EscapeHandler.java new file mode 100644 index 000..87a3989 --- /dev/null +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/EscapeHandler.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.util; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class EscapeHandler { +public static final String ESCAPE_DELIMITERS_FLAGS = "\\x"; +public static final Pattern ESCAPE_PATTERN = Pattern.compile("x([0-9|a-f|A-F]{2})"); + +public static String escapeString(String source) { +if (source.contains(ESCAPE_DELIMITERS_FLAGS)) { +Matcher m = ESCAPE_PATTERN.matcher(source); +StringBuffer buf = new StringBuffer(); +while (m.find()) { +m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1), 16))); +} +m.appendTail(buf); +return buf.toString(); +} +return source; +} + +} \ No newline at end of file diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/EscapeHandlerTest.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/EscapeHandlerTest.java new file mode 100644 index 000..d8fb270 --- /dev/null +++ b/spark-doris-connector/src/test/java/org/apache/doris/sp
[doris-flink-connector] branch master updated: [Improve]Optimize method for constructing builder object based on Builder pattern
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 093b755 [Improve]Optimize method for constructing builder object based on Builder pattern 093b755 is described below commit 093b755f81ae91f7db3511a4c4330ea59d0cc9ca Author: huangkai66 <87286391+huangka...@users.noreply.github.com> AuthorDate: Wed Jul 19 13:56:20 2023 +0800 [Improve]Optimize method for constructing builder object based on Builder pattern --- .../java/org/apache/doris/flink/cfg/DorisExecutionOptions.java | 7 +++ 1 file changed, 7 insertions(+) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 722f6ef..03adf19 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -70,6 +70,13 @@ public class DorisExecutionOptions implements Serializable { return new Builder(); } +public static Builder builderDefaults() { +Properties properties = new Properties(); +properties.setProperty("format", "json"); +properties.setProperty("read_json_by_line", "true"); +return new Builder().setStreamLoadProp(properties); +} + public static DorisExecutionOptions defaults() { Properties properties = new Properties(); properties.setProperty("format", "json"); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[doris-flink-connector] branch master updated: [Improve](mysqlSync)Add the configuration of whether to synchronize the default value (#152)
This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git The following commit(s) were added to refs/heads/master by this push: new 4038842 [Improve](mysqlSync)Add the configuration of whether to synchronize the default value (#152) 4038842 is described below commit 403884239c9e7de58deef82bc26599d07f68e0d2 Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com> AuthorDate: Fri Jul 21 15:07:51 2023 +0800 [Improve](mysqlSync)Add the configuration of whether to synchronize the default value (#152) --- .../DorisJsonDebeziumDeserializationSchema.java| 185 + .../sink/writer/JsonDebeziumSchemaSerializer.java | 2 +- .../org/apache/doris/flink/tools/cdc/CdcTools.java | 3 +- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 5 +- .../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 16 +- .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 3 +- .../tools/cdc/CdcOraclelSyncDatabaseCase.java | 3 +- 7 files changed, 207 insertions(+), 10 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java new file mode 100644 index 000..9c54ade --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.deserialization; + +import org.apache.doris.flink.exception.DorisException; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Map; + +/** + * Currently just use for synchronous mysql non-default. + */ +public class DorisJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema { + +private static final JsonNodeFactory JSON_NODE_FACTORY = JsonNodeFactory.withExactBigDecimals(true); +private final ObjectMapper objectMapper; + +public DorisJsonDebeziumDeserializationSchema() { +objectMapper = new ObjectMapper(); +} + +@Override +public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception { +Schema schema = sourceRecord.valueSchema(); +Object value = sourceRecord.value(); +JsonNode jsonValue = convertToJson(schema, value); +byte[] bytes = objectMapper.writeValueAsString(jsonValue).getBytes(StandardCharsets.UTF_8); +collector.collect(new String(bytes)); +} + +private JsonNode convertToJson(Schema schema, Object value) throws DorisException { +if (value == null) { +if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema +{ +return null; +} +if (schema.isOptional()) { +return JSON_NODE_FACTORY.nullNode(); +} +throw new DorisException( +&q