[GitHub] [incubator-doris] HappenLee commented on a change in pull request #8051: eliminate branch prediction
HappenLee commented on a change in pull request #8051: URL: https://github.com/apache/incubator-doris/pull/8051#discussion_r806548118 ## File path: be/src/vec/exec/join/vhash_join_node.cpp ## @@ -168,89 +177,136 @@ struct ProcessHashTableProbe { // the output block struct is same with mutable block. we can do more opt on it and simplify // the logic of probe // TODO: opt the visited here to reduce the size of hash table +template Status do_process(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, Block* output_block) { using KeyGetter = typename HashTableContext::State; using Mapped = typename HashTableContext::Mapped; KeyGetter key_getter(_probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); - + std::vector items_counts(_probe_rows); auto& mcol = mutable_block.mutable_columns(); - -int right_col_idx = _join_node->_is_right_semi_anti ? 0 : _left_table_data_types.size(); -int right_col_len = _right_table_data_types.size(); int current_offset = 0; for (; _probe_index < _probe_rows;) { -// ignore null rows if constexpr (ignore_null) { if ((*null_map)[_probe_index]) { items_counts[_probe_index++] = 0; continue; } } - int repeat_count = 0; -auto find_result = -(*null_map)[_probe_index] +if constexpr (JoinOpType::value == TJoinOp::INNER_JOIN) { +if (!(*null_map)[_probe_index]) { +auto find_result = key_getter.find_key(hash_table_ctx.hash_table, _probe_index, _arena); + +if (find_result.is_found()) { +auto& mapped = find_result.get_mapped(); + +// TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. +// We should rethink whether to use this iterator mode in the future. Now just opt the one row case +if (mapped.get_row_count() == 1) { +mapped.visited = true; +// right semi/anti join should dispose the data in hash table +// after probe data eof +++repeat_count; +for (size_t j = 0; j < _right_col_len; ++j) { +auto& column = *mapped.block->get_by_position(j).column; +mcol[j + _right_col_idx]->insert_from(column, mapped.row_num); +} +} else { +if (_probe_index + 2 < _probe_rows) +key_getter.prefetch(hash_table_ctx.hash_table, _probe_index + 2, _arena); +for (auto it = mapped.begin(); it.ok(); ++it) { +// right semi/anti join should dispose the data in hash table +// after probe data eof +++repeat_count; +for (size_t j = 0; j < _right_col_len; ++j) { +auto& column = *it->block->get_by_position(j).column; +// TODO: interface insert from cause serious performance problems +// when column is nullable. Try to make more effective way +mcol[j + _right_col_idx]->insert_from(column, it->row_num); +} +it->visited = true; Review comment: inner join not need to set visited ## File path: be/src/vec/exec/join/vhash_join_node.cpp ## @@ -168,89 +177,136 @@ struct ProcessHashTableProbe { // the output block struct is same with mutable block. we can do more opt on it and simplify // the logic of probe // TODO: opt the visited here to reduce the size of hash table +template Status do_process(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, Block* output_block) { using KeyGetter = typename HashTableContext::State; using Mapped = typename HashTableContext::Mapped; KeyGetter key_getter(_probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); - + std::vector items_counts(_probe_rows); auto& mcol = mutable_block.mutable_columns(); - -int right_col_idx = _join_node->_is_right_semi_anti ? 0 : _left_table_data_types.size(); -int right_col_len = _right_table_data_types.size(); int current_offset = 0; for (; _probe_index < _probe_rows;) { -// ignore null rows if constexpr (ignore
[GitHub] [incubator-doris] HappenLee commented on a change in pull request #8051: eliminate branch prediction
HappenLee commented on a change in pull request #8051: URL: https://github.com/apache/incubator-doris/pull/8051#discussion_r806556854 ## File path: be/src/vec/exec/join/vhash_join_node.cpp ## @@ -168,89 +177,136 @@ struct ProcessHashTableProbe { // the output block struct is same with mutable block. we can do more opt on it and simplify // the logic of probe // TODO: opt the visited here to reduce the size of hash table +template Status do_process(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, Block* output_block) { using KeyGetter = typename HashTableContext::State; using Mapped = typename HashTableContext::Mapped; KeyGetter key_getter(_probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); - + std::vector items_counts(_probe_rows); auto& mcol = mutable_block.mutable_columns(); - -int right_col_idx = _join_node->_is_right_semi_anti ? 0 : _left_table_data_types.size(); -int right_col_len = _right_table_data_types.size(); int current_offset = 0; for (; _probe_index < _probe_rows;) { -// ignore null rows if constexpr (ignore_null) { if ((*null_map)[_probe_index]) { items_counts[_probe_index++] = 0; continue; } } - int repeat_count = 0; -auto find_result = -(*null_map)[_probe_index] +if constexpr (JoinOpType::value == TJoinOp::INNER_JOIN) { +if (!(*null_map)[_probe_index]) { +auto find_result = key_getter.find_key(hash_table_ctx.hash_table, _probe_index, _arena); + +if (find_result.is_found()) { +auto& mapped = find_result.get_mapped(); + +// TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. +// We should rethink whether to use this iterator mode in the future. Now just opt the one row case +if (mapped.get_row_count() == 1) { +mapped.visited = true; Review comment: no need set visited here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
commits@doris.apache.org
zhangstar333 opened a new issue #8065: URL: https://github.com/apache/incubator-doris/issues/8065 ### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-doris/issues?q=is%3Aissue) and found no similar issues. ### Description _No response_ ### Use case _No response_ ### Related issues _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
commits@doris.apache.org
zhangstar333 opened a new pull request #8066: URL: https://github.com/apache/incubator-doris/pull/8066 # Proposed changes Issue Number: close #8065 ## Problem Summary: Describe the overview of changes. ## Checklist(Required) 1. Does it affect the original behavior: (Yes/No/I Don't know) 2. Has unit tests been added: (Yes/No/No Need) 3. Has document been added or modified: (Yes/No/No Need) 4. Does it need to update dependencies: (Yes/No) 5. Are there any changes that cannot be rolled back: (Yes/No) ## Further comments If this is a relatively large or complex change, kick off the discussion at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] HappenLee opened a new issue #8067: [Feature][Vectorized] Support year/month/week/hour/mintue/day/second floor/ceil function
HappenLee opened a new issue #8067: URL: https://github.com/apache/incubator-doris/issues/8067 ### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-doris/issues?q=is%3Aissue) and found no similar issues. ### Description Support year/month/week/hour/mintue/day/second floor/ceil function ### Use case as not vec exe engine ### Related issues _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] HappenLee opened a new pull request #8068: [Vectorized][Function] Support year/month/week/hour/mintue/day/second floor/ceil function
HappenLee opened a new pull request #8068: URL: https://github.com/apache/incubator-doris/pull/8068 # Proposed changes Issue Number: close #8067 ## Problem Summary: Describe the overview of changes. ## Checklist(Required) 1. Does it affect the original behavior: (No) 2. Has unit tests been added: (No Need) 3. Has document been added or modified: (No) 4. Does it need to update dependencies: (No) 5. Are there any changes that cannot be rolled back: (Yes) ## Further comments If this is a relatively large or complex change, kick off the discussion at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] tianhui5 commented on issue #8025: [Feature] Support load binlog from MySQL directly instead of Canal
tianhui5 commented on issue #8025: URL: https://github.com/apache/incubator-doris/issues/8025#issuecomment-1040026201 > fault tolerance design Debezium is periodically flushing those offsets to persistent storage, So Doris will receive each source record exactly once during normal operation (including restart after a graceful shutdown), but do need to be tolerant of receiving duplicate events immediately following a restart after a crash or improper shutdown. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] yangzhg opened a new pull request #8069: [refactor] fix warings when compile with clang
yangzhg opened a new pull request #8069: URL: https://github.com/apache/incubator-doris/pull/8069 # Proposed changes Issue Number: close #xxx ## Problem Summary: fix warings when compile with clang ## Checklist(Required) 1. Does it affect the original behavior: (No) 2. Has unit tests been added: (No) 3. Has document been added or modified: (No Need) 4. Does it need to update dependencies: (No) 5. Are there any changes that cannot be rolled back: (Yes) ## Further comments If this is a relatively large or complex change, kick off the discussion at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] tianhui5 commented on issue #8025: [Feature] Support load binlog from MySQL directly instead of Canal
tianhui5 commented on issue #8025: URL: https://github.com/apache/incubator-doris/issues/8025#issuecomment-1040056960 > interface design Reuse the current binlog load syntax, add a new kind of `binlog_desc` with prefix of `mysql.`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] tianhui5 commented on issue #8025: [Feature] Support load binlog from MySQL directly instead of Canal
tianhui5 commented on issue #8025: URL: https://github.com/apache/incubator-doris/issues/8025#issuecomment-1040069762 I think the main problem is that the current canal design can only run load job on Master, I think it's better to distribute binlog load job into all FE. I'm prefer to support Debezium first, than make both Canal client and Debezium can be run in all FE. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] stgztsw opened a new issue #8070: [Enhancement] It is better to ignore case in column name, when we do the stream load in json format
stgztsw opened a new issue #8070: URL: https://github.com/apache/incubator-doris/issues/8070 ### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-doris/issues?q=is%3Aissue) and found no similar issues. ### Description when we have a table whose column name is "Name" and "Sex". we failed to input the below data to this table in stream load with json format, because column name is case sensitive。 {"name": "tom", "sex": "male"} But we can use the sql as “select name, sex from table” to get the right result。so it is case insensitive in sql, and sensitive in stream load。It is better to make it consistent。 ### Solution _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] liteng2430 opened a new pull request #8071: Correct the wrong words in the prompt information
liteng2430 opened a new pull request #8071: URL: https://github.com/apache/incubator-doris/pull/8071 # Proposed changes Correct the wrong words in the prompt information ## Problem Summary: Describe the overview of changes. ## Checklist(Required) 1. Does it affect the original behavior: (Yes/No/I Don't know) 2. Has unit tests been added: (Yes/No/No Need) 3. Has document been added or modified: (Yes/No/No Need) 4. Does it need to update dependencies: (Yes/No) 5. Are there any changes that cannot be rolled back: (Yes/No) ## Further comments If this is a relatively large or complex change, kick off the discussion at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] HappenLee commented on a change in pull request #8064: (#8063) refactor some interfaces of storage layer column
HappenLee commented on a change in pull request #8064: URL: https://github.com/apache/incubator-doris/pull/8064#discussion_r806688824 ## File path: be/src/vec/columns/column.h ## @@ -171,6 +171,19 @@ class IColumn : public COW { /// All data will be inserted as single element virtual void insert_data(const char* pos, size_t length) = 0; +virtual void insert_many_fix_len_data(const char* pos, size_t num) { + LOG(FATAL) << "Method insert_many_fix_len_data is not supported for " << get_name(); +} + +virtual void insert_many_dict_data(const int32_t* data_array, size_t start_index, const uint32_t* start_offset_array, +const uint32_t* len_array, char* dict_data, size_t num) { + LOG(FATAL) << "Method insert_many_dict_data is not supported for " << get_name(); +} + +virtual void insert_many_binary_data(size_t num, char* data_array, uint32_t* len_array, uint32_t* start_offset_array) { Review comment: ```size_t num``` like other interface set last ## File path: be/src/olap/rowset/segment_v2/binary_plain_page.h ## @@ -238,48 +238,15 @@ class BinaryPlainPageDecoder : public PageDecoder { } const size_t max_fetch = std::min(*n, static_cast(_num_elems - _cur_idx)); -auto* dst_col_ptr = dst.get(); -if (dst->is_nullable()) { -auto nullable_column = assert_cast(dst.get()); -dst_col_ptr = nullable_column->get_nested_column_ptr().get(); -// fill null bitmap here, not null; -for (int i = 0; i < max_fetch; i++) { -nullable_column->get_null_map_data().push_back(0); -} -} - -if (dst_col_ptr->is_bitmap()) { -auto& bitmap_column = reinterpret_cast(*dst_col_ptr); -for (size_t i = 0; i < max_fetch; i++, _cur_idx++) { -const uint32_t start_offset = offset(_cur_idx); -uint32_t len = offset(_cur_idx + 1) - start_offset; - -bitmap_column.insert_default(); -BitmapValue* pvalue = &bitmap_column.get_element(bitmap_column.size() - 1); -if (len != 0) { -BitmapValue value; -value.deserialize(&_data[start_offset]); -*pvalue = std::move(value); -} else { -*pvalue = std::move(*reinterpret_cast(const_cast(&_data[start_offset]))); -} -} -} else if (dst_col_ptr->is_predicate_column()) { -// todo(wb) padding sv here for better comparison performance -for (size_t i = 0; i < max_fetch; i++, _cur_idx++) { -const uint32_t start_offset = offset(_cur_idx); -uint32_t len = offset(_cur_idx + 1) - start_offset; -StringValue sv(const_cast(&_data[start_offset]), len); -dst_col_ptr->insert_data(reinterpret_cast(&sv), 0); -} -} else { -for (size_t i = 0; i < max_fetch; i++, _cur_idx++) { -// todo(wb) need more test case and then improve here -const uint32_t start_offset = offset(_cur_idx); -uint32_t len = offset(_cur_idx + 1) - start_offset; -dst_col_ptr->insert_data(&_data[start_offset], len); -} + uint32_t len_array[max_fetch]; Review comment: format ## File path: be/src/vec/columns/column_vector.h ## @@ -212,11 +272,11 @@ class ColumnVector final : public COWHelper> void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override; -void insert_elements(void* elements, size_t num) { +void batch_set_null_bitmap(uint8_t val, size_t num) { Review comment: look like same ase insert elements? why not use `insert_elements` ## File path: be/src/olap/rowset/segment_v2/segment_iterator.cpp ## @@ -943,9 +939,9 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { _output_non_pred_columns(block, is_mem_reuse); // 4.2 output short circuit predicate column -_output_column_by_sel_idx(block, _short_cir_pred_column_ids, sel_rowid_idx, selected_size, is_mem_reuse); +return _output_column_by_sel_idx(block, _short_cir_pred_column_ids, sel_rowid_idx, selected_size, is_mem_reuse); Review comment: here return the vec_pred_column will not exec -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...
[GitHub] [incubator-doris] HappenLee commented on a change in pull request #7828: [Vectorized][Feature] add ColumnHLL to support hll type
HappenLee commented on a change in pull request #7828: URL: https://github.com/apache/incubator-doris/pull/7828#discussion_r806712392 ## File path: be/src/vec/data_types/data_type_hll.cpp ## @@ -76,10 +79,12 @@ int64_t DataTypeHLL::get_uncompressed_serialized_bytes(const IColumn& column) co auto allocate_content_size = 0; for (size_t i = 0; i < column.size(); ++i) { auto& hll = const_cast(data_column.get_element(i)); -allocate_content_size += hll.max_serialized_size(); +std::string result(hll.max_serialized_size(), '0'); +size_t actual_size = hll.serialize((uint8_t*)result.c_str()); Review comment: should not serialize two times -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
commits@doris.apache.org
BiteThet commented on a change in pull request #8066: URL: https://github.com/apache/incubator-doris/pull/8066#discussion_r806768158 ## File path: be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h ## @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "util/counts.h" +#include "util/tdigest.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_number.h" +#include "vec/io/io_helper.h" +namespace doris::vectorized { + +struct PercentileApproxState { +static constexpr double INIT_QUANTILE = -1.0; +PercentileApproxState() {} +~PercentileApproxState() {} + +void init(double compression = 1) { +if (!init_flag) { +digest.reset(new TDigest(compression)); +init_flag = true; +} +} + +void write(BufferWritable& buf) const { +write_binary(init_flag, buf); +write_binary(targetQuantile, buf); + +uint32_t serialize_size = digest->serialized_size(); +std::string result(serialize_size, '0'); +DCHECK(digest.get() != nullptr); +digest->serialize((uint8_t*)result.c_str()); + +write_binary(result, buf); +} +void read(BufferReadable& buf) { +read_binary(init_flag, buf); +read_binary(targetQuantile, buf); + +std::string str; +read_binary(str, buf); +digest.reset(new TDigest()); +digest->unserialize((uint8_t*)str.c_str()); +} + +double get() const { return digest->quantile(targetQuantile); } + +void merge(const PercentileApproxState& rhs) { +if (init_flag) { +DCHECK(digest.get() != nullptr); +digest->merge(rhs.digest.get()); +} else { +digest.reset(new TDigest()); Review comment: here should update inif_flag -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] awsl-dbq commented on issue #7582: [Roadmap] Enhanced system observability
awsl-dbq commented on issue #7582: URL: https://github.com/apache/incubator-doris/issues/7582#issuecomment-1040203670 Some links for opentelemetry ## Website: - HomePage https://opentelemetry.io/ - Github https://github.com/open-telemetry ## SDK - Cpp https://github.com/open-telemetry/opentelemetry-cpp - Java https://github.com/open-telemetry/opentelemetry-java ## Examples - Cpp https://github.com/open-telemetry/opentelemetry-cpp/blob/main/examples/multi_processor/main.cc - Java https://github.com/open-telemetry/opentelemetry-java-docs/blob/main/grpc/src/main/java/io/opentelemetry/example/grpc/HelloWorldServer.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] morningman opened a new pull request #8072: [fix](load)(compaction) Fix NodeChannel coredump bug and modify some compaction logic
morningman opened a new pull request #8072: URL: https://github.com/apache/incubator-doris/pull/8072 # Proposed changes Issue Number: close #8058 ## Problem Summary: 1. Fix the problem of BE crash caused by destruct sequence. (close #8058) 2. Add a new BE config `compaction_task_num_per_fast_disk` This config specify the max concurrent compaction task num on fast disk(typically .SSD). So that for high speed disk, we can execute more compaction task at same time, to compact the data as soon as possible 3. Avoid frequent selection of unqualified tablet to perform compaction. 4. Modify some log level to reduce the log size of BE. ## Checklist(Required) 1. Does it affect the original behavior: (No) 2. Has unit tests been added: (No Need) 3. Has document been added or modified: (Yes) 4. Does it need to update dependencies: (No) 5. Are there any changes that cannot be rolled back: (No) ## Further comments If this is a relatively large or complex change, kick off the discussion at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] github-actions[bot] commented on pull request #7473: [Feature][Transaction] Support two phase commit (2PC) for stream load
github-actions[bot] commented on pull request #7473: URL: https://github.com/apache/incubator-doris/pull/7473#issuecomment-1040343251 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] morningman commented on a change in pull request #8035: [Refactor][heartbeat] Make get fe heart response by thrift and make BootstrapFinishAction Deprecated
morningman commented on a change in pull request #8035: URL: https://github.com/apache/incubator-doris/pull/8035#discussion_r806896577 ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: Why deprecate this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] caiconghui commented on a change in pull request #8035: [Refactor][heartbeat] Make get fe heart response by thrift and make BootstrapFinishAction Deprecated
caiconghui commented on a change in pull request #8035: URL: https://github.com/apache/incubator-doris/pull/8035#discussion_r806904837 ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: > Why deprecate this? get heartbeat info by rpc method is a more stable way, once get heartbeat by thrift, BootstrapFinishAction is not needed anymore, here we just keep compatible when fe upgrade -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] morningman commented on a change in pull request #8035: [Refactor][heartbeat] Make get fe heart response by thrift and make BootstrapFinishAction Deprecated
morningman commented on a change in pull request #8035: URL: https://github.com/apache/incubator-doris/pull/8035#discussion_r806925650 ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: I thought that `BootstrapFinishAction` is nothing to do with the heartbeat? It is used for checking whether the FE is started successfully. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] caiconghui commented on a change in pull request #8035: [Refactor][heartbeat] Make get fe heart response by thrift and make BootstrapFinishAction Deprecated
caiconghui commented on a change in pull request #8035: URL: https://github.com/apache/incubator-doris/pull/8035#discussion_r806934702 ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: > I thought that `BootstrapFinishAction` is nothing to do with the heartbeat? It is used for checking whether the FE is started successfully. controller.registerHandler(HttpMethod.GET, "/api/bootstrap", new BootstrapFinishAction(controller)) it is now used for heartbeat private HeartbeatResponse getHeartbeatResponseByHttp() { String url = "http://"; + fe.getHost() + ":" + Config.http_port + "/api/bootstrap?cluster_id=" + clusterId + "&token=" + token; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] morningman commented on a change in pull request #8035: [Refactor][heartbeat] Make get fe heart response by thrift and make BootstrapFinishAction Deprecated
morningman commented on a change in pull request #8035: URL: https://github.com/apache/incubator-doris/pull/8035#discussion_r806936953 ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: Oh, I forgot. But this API is also used for checking whether FE is started. So we still need it. Just not use it for heartbeat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] morningman commented on a change in pull request #8035: [Refactor][heartbeat] Make get fe heart response by thrift and make BootstrapFinishAction Deprecated
morningman commented on a change in pull request #8035: URL: https://github.com/apache/incubator-doris/pull/8035#discussion_r806938900 ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: And by the way, most of user does not switch the heartbeat method to thrift way. So maybe we need notice the user by somehow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] morningman commented on a change in pull request #8035: [Refactor][heartbeat] Make get fe heart response by thrift and make BootstrapFinishAction Deprecated
morningman commented on a change in pull request #8035: URL: https://github.com/apache/incubator-doris/pull/8035#discussion_r806938900 ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: And by the way, most of user does not switch the heartbeat method to thrift way. So maybe we need notice the user by somehow. And you forgot the complete the required fields in PR template. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] caiconghui commented on a change in pull request #8035: [Refactor][heartbeat] Make get fe heart response by thrift and make BootstrapFinishAction Deprecated
caiconghui commented on a change in pull request #8035: URL: https://github.com/apache/incubator-doris/pull/8035#discussion_r806947934 ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: 1. we still can use thrift to check whether FE is started, which is duplicated with http method 2. no need, user just upgrade it, no need to know the change, because when user use the new fe version, the cluster will use thrift to get fe heartbeat info by default and cannot change , we still keep BootstrapFinishAction for compatibility 3. done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] caiconghui commented on a change in pull request #8035: [Refactor][heartbeat] Make get fe heart response by thrift and make BootstrapFinishAction Deprecated
caiconghui commented on a change in pull request #8035: URL: https://github.com/apache/incubator-doris/pull/8035#discussion_r806947934 ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: 1. no need, user just upgrade it, no need to know the change, because when user use the new fe version, the cluster will use thrift to get fe heartbeat info by default and cannot change , now we still keep BootstrapFinishAction and need to remove deprecate 2. done for completing the required fields in PR template. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris-manager] hf200012 merged pull request #1: [community] move GitBox mail to commits@doris
hf200012 merged pull request #1: URL: https://github.com/apache/incubator-doris-manager/pull/1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] HappenLee merged pull request #7979: [Vectorized][Feature] Support mysql external table insert into stm
HappenLee merged pull request #7979: URL: https://github.com/apache/incubator-doris/pull/7979 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
commits@doris.apache.org
BiteThet commented on a change in pull request #8066: URL: https://github.com/apache/incubator-doris/pull/8066#discussion_r806768158 ## File path: be/src/vec/aggregate_functions/aggregate_function_percentile_approx.h ## @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "util/counts.h" +#include "util/tdigest.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_number.h" +#include "vec/io/io_helper.h" +namespace doris::vectorized { + +struct PercentileApproxState { +static constexpr double INIT_QUANTILE = -1.0; +PercentileApproxState() {} +~PercentileApproxState() {} + +void init(double compression = 1) { +if (!init_flag) { +digest.reset(new TDigest(compression)); +init_flag = true; +} +} + +void write(BufferWritable& buf) const { +write_binary(init_flag, buf); +write_binary(targetQuantile, buf); + +uint32_t serialize_size = digest->serialized_size(); +std::string result(serialize_size, '0'); +DCHECK(digest.get() != nullptr); +digest->serialize((uint8_t*)result.c_str()); + +write_binary(result, buf); +} +void read(BufferReadable& buf) { +read_binary(init_flag, buf); +read_binary(targetQuantile, buf); + +std::string str; +read_binary(str, buf); +digest.reset(new TDigest()); +digest->unserialize((uint8_t*)str.c_str()); +} + +double get() const { return digest->quantile(targetQuantile); } + +void merge(const PercentileApproxState& rhs) { +if (init_flag) { +DCHECK(digest.get() != nullptr); +digest->merge(rhs.digest.get()); +} else { +digest.reset(new TDigest()); Review comment: here should update inif_flag -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] stalary commented on a change in pull request #7984: [Docs] add rpc function document
stalary commented on a change in pull request #7984: URL: https://github.com/apache/incubator-doris/pull/7984#discussion_r806427694 ## File path: samples/doris-demo/udf-demo/src/main/java/org/apache/doris/udf/FunctionGrpc.java ## @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import io.grpc.stub.StreamObserver; +import net.devh.boot.grpc.server.service.GrpcService; +import org.apache.doris.proto.FunctionService; +import org.apache.doris.proto.PFunctionServiceGrpc; +import org.apache.doris.proto.Types; + +import java.util.List; + +/** + * FunctionGrpc + * + * @author lirongqian + * @since 2022/02/08 + */ +@GrpcService +public class FunctionGrpc extends PFunctionServiceGrpc.PFunctionServiceImplBase { + +@Override +public void fnCall(FunctionService.PFunctionCallRequest request, StreamObserver responseObserver) { +String functionName = request.getFunctionName(); +FunctionService.PFunctionCallResponse res; +if ("grpc_add".equals(functionName)) { +List argsList = request.getArgsList(); +int sum = 0; +for (Types.PValues pValues : argsList) { +sum += pValues.getInt32Value(0) % 10; Review comment: I was using it for testing, so I deleted it ## File path: samples/doris-demo/udf-demo/src/main/java/org/apache/doris/udf/FunctionGrpc.java ## @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import io.grpc.stub.StreamObserver; +import net.devh.boot.grpc.server.service.GrpcService; +import org.apache.doris.proto.FunctionService; +import org.apache.doris.proto.PFunctionServiceGrpc; +import org.apache.doris.proto.Types; + +import java.util.List; + +/** + * FunctionGrpc + * + * @author lirongqian + * @since 2022/02/08 + */ +@GrpcService +public class FunctionGrpc extends PFunctionServiceGrpc.PFunctionServiceImplBase { + +@Override Review comment: done ## File path: docs/.vuepress/sidebar/en.js ## @@ -255,7 +255,8 @@ module.exports = [ directoryPath: "udf/", children: [ "contribute-udf", - "user-defined-function", + "user-defined-function-cpp", + "user-defined-function-rpc", Review comment: done ## File path: docs/.vuepress/sidebar/en.js ## @@ -255,7 +255,8 @@ module.exports = [ directoryPath: "udf/", children: [ "contribute-udf", - "user-defined-function", + "user-defined-function-cpp", Review comment: done ## File path: docs/zh-CN/extending-doris/udf/user-defined-function-rpc.md ## @@ -0,0 +1,98 @@ +--- +{ +"title": "User Defined Function Rpc", Review comment: done ## File path: docs/zh-CN/extending-doris/udf/user-defined-function-rpc.md ## @@ -0,0 +1,98 @@ +--- +{ +"title": "User Defined Function Rpc", +"language": "zh-CN" +} +--- + + + +# User Defined Function Rpc + +可以通过 Rpc 的方式调用函数逻辑,通过 protobuf 进行数据传输,支持 Java/C++/Python/Ruby/Go/PHP/JavaScript 等多种语言 Review comment: done ## File path: samples/doris-demo/udf-demo/src/main/resources/application.yml ## @@ -0,0 +1,3 @@ +grpc: Review comment: done ## File path: docs/zh-CN/extending-doris/udf/user-defined-function-rpc.md ## @@ -0,0 +1,98 @@ +--- +{ +"title
[GitHub] [incubator-doris] yinzhijian commented on a change in pull request #7994: [Feature](ThreadPool) Support thread pool per disk for scanners
yinzhijian commented on a change in pull request #7994: URL: https://github.com/apache/incubator-doris/pull/7994#discussion_r806397427 ## File path: be/src/util/priority_thread_pool.hpp ## @@ -39,6 +39,7 @@ class PriorityThreadPool { public: int priority; WorkFunction work_function; +int queue_id; Review comment: task_id is not mean which disk this task belongs to, so queue_id maybe better. ## File path: be/src/util/priority_thread_pool.hpp ## @@ -39,6 +39,7 @@ class PriorityThreadPool { public: int priority; WorkFunction work_function; +int queue_id; Review comment: task_id doesn't mean which disk this task belongs to, so queue_id maybe better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] HappenLee commented on a change in pull request #8044: [Feature][Vectorized] support aggregate function ndv()/approx_count_distinct()
HappenLee commented on a change in pull request #8044: URL: https://github.com/apache/incubator-doris/pull/8044#discussion_r806499008 ## File path: be/src/vec/aggregate_functions/aggregate_function_approx_count_distinct.cpp ## @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/aggregate_functions/aggregate_function_approx_count_distinct.h" + +#include "vec/columns/column_string.h" +#include "vec/columns/columns_number.h" + +namespace doris::vectorized { + +AggregateFunctionPtr create_aggregate_function_approx_count_distinct( +const std::string& name, const DataTypes& argument_types, const Array& parameters, +const bool result_is_nullable) { +AggregateFunctionPtr res = nullptr; +WhichDataType which(argument_types[0]->is_nullable() +? reinterpret_cast(argument_types[0].get()) + ->get_nested_type() +: argument_types[0]); + +// TODO: use template traits here. +if (which.is_uint8()) { +res.reset(new AggregateFunctionApproxCountDistinct(argument_types)); +} else if (which.is_int8()) { Review comment: support `Int128` and use `create_with_numeric_type` function to do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] HappenLee commented on a change in pull request #8051: eliminate branch prediction
HappenLee commented on a change in pull request #8051: URL: https://github.com/apache/incubator-doris/pull/8051#discussion_r806548118 ## File path: be/src/vec/exec/join/vhash_join_node.cpp ## @@ -168,89 +177,136 @@ struct ProcessHashTableProbe { // the output block struct is same with mutable block. we can do more opt on it and simplify // the logic of probe // TODO: opt the visited here to reduce the size of hash table +template Status do_process(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, Block* output_block) { using KeyGetter = typename HashTableContext::State; using Mapped = typename HashTableContext::Mapped; KeyGetter key_getter(_probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); - + std::vector items_counts(_probe_rows); auto& mcol = mutable_block.mutable_columns(); - -int right_col_idx = _join_node->_is_right_semi_anti ? 0 : _left_table_data_types.size(); -int right_col_len = _right_table_data_types.size(); int current_offset = 0; for (; _probe_index < _probe_rows;) { -// ignore null rows if constexpr (ignore_null) { if ((*null_map)[_probe_index]) { items_counts[_probe_index++] = 0; continue; } } - int repeat_count = 0; -auto find_result = -(*null_map)[_probe_index] +if constexpr (JoinOpType::value == TJoinOp::INNER_JOIN) { +if (!(*null_map)[_probe_index]) { +auto find_result = key_getter.find_key(hash_table_ctx.hash_table, _probe_index, _arena); + +if (find_result.is_found()) { +auto& mapped = find_result.get_mapped(); + +// TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. +// We should rethink whether to use this iterator mode in the future. Now just opt the one row case +if (mapped.get_row_count() == 1) { +mapped.visited = true; +// right semi/anti join should dispose the data in hash table +// after probe data eof +++repeat_count; +for (size_t j = 0; j < _right_col_len; ++j) { +auto& column = *mapped.block->get_by_position(j).column; +mcol[j + _right_col_idx]->insert_from(column, mapped.row_num); +} +} else { +if (_probe_index + 2 < _probe_rows) +key_getter.prefetch(hash_table_ctx.hash_table, _probe_index + 2, _arena); +for (auto it = mapped.begin(); it.ok(); ++it) { +// right semi/anti join should dispose the data in hash table +// after probe data eof +++repeat_count; +for (size_t j = 0; j < _right_col_len; ++j) { +auto& column = *it->block->get_by_position(j).column; +// TODO: interface insert from cause serious performance problems +// when column is nullable. Try to make more effective way +mcol[j + _right_col_idx]->insert_from(column, it->row_num); +} +it->visited = true; Review comment: inner join not need to set visited ## File path: be/src/vec/exec/join/vhash_join_node.cpp ## @@ -168,89 +177,136 @@ struct ProcessHashTableProbe { // the output block struct is same with mutable block. we can do more opt on it and simplify // the logic of probe // TODO: opt the visited here to reduce the size of hash table +template Status do_process(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, Block* output_block) { using KeyGetter = typename HashTableContext::State; using Mapped = typename HashTableContext::Mapped; KeyGetter key_getter(_probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); - + std::vector items_counts(_probe_rows); auto& mcol = mutable_block.mutable_columns(); - -int right_col_idx = _join_node->_is_right_semi_anti ? 0 : _left_table_data_types.size(); -int right_col_len = _right_table_data_types.size(); int current_offset = 0; for (; _probe_index < _probe_rows;) { -// ignore null rows if constexpr (ignore
[GitHub] [incubator-doris] github-actions[bot] commented on pull request #8045: [Fix bug] should remove decommission job type from enum
github-actions[bot] commented on pull request #8045: URL: https://github.com/apache/incubator-doris/pull/8045#issuecomment-1039840410 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] HappenLee closed issue #7978: [Vectorized][Feature] Support mysql external table insert into stm
HappenLee closed issue #7978: URL: https://github.com/apache/incubator-doris/issues/7978 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] qidaye commented on a change in pull request #8053: [docs] fix document date-time-functions typo
qidaye commented on a change in pull request #8053: URL: https://github.com/apache/incubator-doris/pull/8053#discussion_r806401270 ## File path: docs/en/sql-reference/sql-functions/aggregate-functions/group_concat.md ## @@ -0,0 +1,63 @@ +--- +{ +"title": "group_concat", Review comment: `group_concat` already exists in string-functions subdirectory. ## File path: docs/zh-CN/sql-reference/sql-functions/date-time-functions/convert_tz.md ## @@ -25,14 +25,12 @@ under the License. --> # convert_tz - +## description ## Syntax Review comment: ```suggestion ### Syntax ``` ## File path: docs/zh-CN/sql-reference/sql-functions/aggregate-functions/group_concat.md ## @@ -0,0 +1,63 @@ +--- +{ +"title": "group_concat", Review comment: Same as above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] morningman merged pull request #8000: [fix](compatibility) Fix compatibility issue of PRowBatch and some tablet sink bugs
morningman merged pull request #8000: URL: https://github.com/apache/incubator-doris/pull/8000 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] yiguolei commented on a change in pull request #8042: [Improvement] check simd instructions before start
yiguolei commented on a change in pull request #8042: URL: https://github.com/apache/incubator-doris/pull/8042#discussion_r806439574 ## File path: be/src/service/doris_main.cpp ## @@ -74,6 +79,199 @@ static void thrift_output(const char* x) { } // namespace doris +// These code is referenced from clickhouse Review comment: 1. These code is not utils, they will be only used in the main function and could not in other functions. There maybe some signal catching problem if running in other functions I have not test it. 2. These code should run at the beginning of the program so I add __attribute__(priority = 101) to checker. If other functions running before these code meet SIMD code and could not find the SSE instruction the program may crash we could not catch the signal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] morningman commented on a change in pull request #8042: [Improvement] check simd instructions before start
morningman commented on a change in pull request #8042: URL: https://github.com/apache/incubator-doris/pull/8042#discussion_r806436781 ## File path: be/src/service/doris_main.cpp ## @@ -74,6 +79,199 @@ static void thrift_output(const char* x) { } // namespace doris +// These code is referenced from clickhouse Review comment: Can these code be moved to a separated file. like `util/instruction_utils.hpp`? ## File path: be/src/service/doris_main.cpp ## @@ -74,6 +79,199 @@ static void thrift_output(const char* x) { } // namespace doris +// These code is referenced from clickhouse +// It is used to check the SIMD instructions +enum class InstructionFail { +NONE = 0, +SSE3 = 1, +SSSE3 = 2, +SSE4_1 = 3, +SSE4_2 = 4, +POPCNT = 5, +AVX = 6, +AVX2 = 7, +AVX512 = 8, +ARM_NEON = 9 +}; + +auto instruction_fail_to_string(InstructionFail fail) +{ +switch (fail) +{ +#define ret(x) return std::make_tuple(STDERR_FILENO, x, ARRAY_SIZE(x) - 1) +case InstructionFail::NONE: +ret("NONE"); +case InstructionFail::SSE3: +ret("SSE3"); +case InstructionFail::SSSE3: +ret("SSSE3"); +case InstructionFail::SSE4_1: +ret("SSE4.1"); +case InstructionFail::SSE4_2: +ret("SSE4.2"); +case InstructionFail::POPCNT: +ret("POPCNT"); +case InstructionFail::AVX: +ret("AVX"); +case InstructionFail::AVX2: +ret("AVX2"); +case InstructionFail::AVX512: +ret("AVX512"); +case InstructionFail::ARM_NEON: +ret("ARM_NEON"); +} +__builtin_unreachable(); +} + + +sigjmp_buf jmpbuf; + +void sig_ill_check_handler(int, siginfo_t *, void *) +{ +siglongjmp(jmpbuf, 1); +} + +/// Check if necessary SSE extensions are available by trying to execute some sse instructions. +/// If instruction is unavailable, SIGILL will be sent by kernel. +void check_required_instructions_impl(volatile InstructionFail & fail) +{ +#if defined(__SSE3__) +fail = InstructionFail::SSE3; +__asm__ volatile ("addsubpd %%xmm0, %%xmm0" : : : "xmm0"); +#endif + +#if defined(__SSSE3__) +fail = InstructionFail::SSSE3; +__asm__ volatile ("pabsw %%xmm0, %%xmm0" : : : "xmm0"); + +#endif + +#if defined(__SSE4_1__) +fail = InstructionFail::SSE4_1; +__asm__ volatile ("pmaxud %%xmm0, %%xmm0" : : : "xmm0"); +#endif + +#if defined(__SSE4_2__) +fail = InstructionFail::SSE4_2; +__asm__ volatile ("pcmpgtq %%xmm0, %%xmm0" : : : "xmm0"); +#endif + +/// Defined by -msse4.2 +#if defined(__POPCNT__) +fail = InstructionFail::POPCNT; +{ +uint64_t a = 0; +uint64_t b = 0; +__asm__ volatile ("popcnt %1, %0" : "=r"(a) :"r"(b) :); +} +#endif + +#if defined(__AVX__) +fail = InstructionFail::AVX; +__asm__ volatile ("vaddpd %%ymm0, %%ymm0, %%ymm0" : : : "ymm0"); +#endif + +#if defined(__AVX2__) +fail = InstructionFail::AVX2; +__asm__ volatile ("vpabsw %%ymm0, %%ymm0" : : : "ymm0"); +#endif + +#if defined(__AVX512__) +fail = InstructionFail::AVX512; +__asm__ volatile ("vpabsw %%zmm0, %%zmm0" : : : "zmm0"); +#endif + +#if defined(__ARM_NEON__) +fail = InstructionFail::ARM_NEON; +__asm__ volatile ("vadd.i32 q8, q8, q8" : : : "q8"); +#endif + +fail = InstructionFail::NONE; +} + +bool write_retry(int fd, const char * data, size_t size) +{ +if (!size) +size = strlen(data); + +while (size != 0) +{ +ssize_t res = ::write(fd, data, size); + +if ((-1 == res || 0 == res) && errno != EINTR) +return false; + +if (res > 0) +{ +data += res; +size -= res; +} +} + +return true; +} + +/// Macros to avoid using strlen(), since it may fail if SSE is not supported. +#define WRITE_ERROR(data) do \ +{ \ +static_assert(__builtin_constant_p(data)); \ +if (!write_retry(STDERR_FILENO, data, ARRAY_SIZE(data) - 1)) \ +_Exit(1); \ +} while (false) + +/// Check SSE and others instructions availability. Calls exit on fail. +/// This function must be called as early as possible, even before main, because static initializers may use unavailable instructions. +void check_required_instructions() +{ +struct sigaction sa{}; +struct sigaction sa_old{}; +sa.sa_sigaction = sig_ill_check_handler; +sa.sa_flags = SA_SIGINFO; +auto signal = SIGILL; +if (sigemptyset(&sa.sa_mask) != 0 +|| sigaddset(&sa.sa_mask, signal) != 0 +|| sigaction(signal, &sa, &sa_old) != 0) +{ +/// You may wonder about strlen. +/// Typical implementation of strlen is using SSE4.2 or AVX2. +/// But this is not the case because it's compiler builtin and is executed at compile time. + +WRITE_ERROR("Can not set signal handler\n"); +_Exit(1); +} + +volatile
[GitHub] [incubator-doris] morningman commented on pull request #8045: [Fix bug] should remove decommission job type from enum
morningman commented on pull request #8045: URL: https://github.com/apache/incubator-doris/pull/8045#issuecomment-1039840630 Hi @yiguolei , please pay attention to your PR title: http://doris.incubator.apache.org/community/commit-format-specification.html#commit-format-specification -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] tianhui5 commented on issue #8025: [Feature] Support load binlog from MySQL directly instead of Canal
tianhui5 commented on issue #8025: URL: https://github.com/apache/incubator-doris/issues/8025#issuecomment-1040026201 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] morningman merged pull request #8029: [community] add collaborators in .asf.yaml
morningman merged pull request #8029: URL: https://github.com/apache/incubator-doris/pull/8029 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] zbtzbtzbt commented on pull request #6367: [Bug][Bloom Filter] Fix bug of bloom filter null value flag not be reset
zbtzbtzbt commented on pull request #6367: URL: https://github.com/apache/incubator-doris/pull/6367#issuecomment-1039836809 we meet a same bug in meituan,your pr is useful,thanks @weizuo93 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] awsl-dbq commented on issue #7582: [Roadmap] Enhanced system observability
awsl-dbq commented on issue #7582: URL: https://github.com/apache/incubator-doris/issues/7582#issuecomment-1040203670 Some links for opentelemetry ## Website: - HomePage https://opentelemetry.io/ - Github https://github.com/open-telemetry ## SDK - Cpp https://github.com/open-telemetry/opentelemetry-cpp - Java https://github.com/open-telemetry/opentelemetry-java ## Examples - Cpp https://github.com/open-telemetry/opentelemetry-cpp/blob/main/examples/multi_processor/main.cc - Java https://github.com/open-telemetry/opentelemetry-java-docs/blob/main/grpc/src/main/java/io/opentelemetry/example/grpc/HelloWorldServer.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] HappenLee commented on a change in pull request #8064: (#8063) refactor some interfaces of storage layer column
HappenLee commented on a change in pull request #8064: URL: https://github.com/apache/incubator-doris/pull/8064#discussion_r806688824 ## File path: be/src/vec/columns/column.h ## @@ -171,6 +171,19 @@ class IColumn : public COW { /// All data will be inserted as single element virtual void insert_data(const char* pos, size_t length) = 0; +virtual void insert_many_fix_len_data(const char* pos, size_t num) { + LOG(FATAL) << "Method insert_many_fix_len_data is not supported for " << get_name(); +} + +virtual void insert_many_dict_data(const int32_t* data_array, size_t start_index, const uint32_t* start_offset_array, +const uint32_t* len_array, char* dict_data, size_t num) { + LOG(FATAL) << "Method insert_many_dict_data is not supported for " << get_name(); +} + +virtual void insert_many_binary_data(size_t num, char* data_array, uint32_t* len_array, uint32_t* start_offset_array) { Review comment: ```size_t num``` like other interface set last ## File path: be/src/olap/rowset/segment_v2/binary_plain_page.h ## @@ -238,48 +238,15 @@ class BinaryPlainPageDecoder : public PageDecoder { } const size_t max_fetch = std::min(*n, static_cast(_num_elems - _cur_idx)); -auto* dst_col_ptr = dst.get(); -if (dst->is_nullable()) { -auto nullable_column = assert_cast(dst.get()); -dst_col_ptr = nullable_column->get_nested_column_ptr().get(); -// fill null bitmap here, not null; -for (int i = 0; i < max_fetch; i++) { -nullable_column->get_null_map_data().push_back(0); -} -} - -if (dst_col_ptr->is_bitmap()) { -auto& bitmap_column = reinterpret_cast(*dst_col_ptr); -for (size_t i = 0; i < max_fetch; i++, _cur_idx++) { -const uint32_t start_offset = offset(_cur_idx); -uint32_t len = offset(_cur_idx + 1) - start_offset; - -bitmap_column.insert_default(); -BitmapValue* pvalue = &bitmap_column.get_element(bitmap_column.size() - 1); -if (len != 0) { -BitmapValue value; -value.deserialize(&_data[start_offset]); -*pvalue = std::move(value); -} else { -*pvalue = std::move(*reinterpret_cast(const_cast(&_data[start_offset]))); -} -} -} else if (dst_col_ptr->is_predicate_column()) { -// todo(wb) padding sv here for better comparison performance -for (size_t i = 0; i < max_fetch; i++, _cur_idx++) { -const uint32_t start_offset = offset(_cur_idx); -uint32_t len = offset(_cur_idx + 1) - start_offset; -StringValue sv(const_cast(&_data[start_offset]), len); -dst_col_ptr->insert_data(reinterpret_cast(&sv), 0); -} -} else { -for (size_t i = 0; i < max_fetch; i++, _cur_idx++) { -// todo(wb) need more test case and then improve here -const uint32_t start_offset = offset(_cur_idx); -uint32_t len = offset(_cur_idx + 1) - start_offset; -dst_col_ptr->insert_data(&_data[start_offset], len); -} + uint32_t len_array[max_fetch]; Review comment: format ## File path: be/src/vec/columns/column_vector.h ## @@ -212,11 +272,11 @@ class ColumnVector final : public COWHelper> void insert_indices_from(const IColumn& src, const int* indices_begin, const int* indices_end) override; -void insert_elements(void* elements, size_t num) { +void batch_set_null_bitmap(uint8_t val, size_t num) { Review comment: look like same ase insert elements? why not use `insert_elements` ## File path: be/src/olap/rowset/segment_v2/segment_iterator.cpp ## @@ -943,9 +939,9 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { _output_non_pred_columns(block, is_mem_reuse); // 4.2 output short circuit predicate column -_output_column_by_sel_idx(block, _short_cir_pred_column_ids, sel_rowid_idx, selected_size, is_mem_reuse); +return _output_column_by_sel_idx(block, _short_cir_pred_column_ids, sel_rowid_idx, selected_size, is_mem_reuse); Review comment: here return the vec_pred_column will not exec -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...
[GitHub] [incubator-doris] qidaye commented on pull request #8057: [Doc]Add version upgrade instructions
qidaye commented on pull request #8057: URL: https://github.com/apache/incubator-doris/pull/8057#issuecomment-1039809442 The three-digit version is not the official release. Is it appropriate to mention it in the official documentation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] HappenLee commented on a change in pull request #7828: [Vectorized][Feature] add ColumnHLL to support hll type
HappenLee commented on a change in pull request #7828: URL: https://github.com/apache/incubator-doris/pull/7828#discussion_r806712392 ## File path: be/src/vec/data_types/data_type_hll.cpp ## @@ -76,10 +79,12 @@ int64_t DataTypeHLL::get_uncompressed_serialized_bytes(const IColumn& column) co auto allocate_content_size = 0; for (size_t i = 0; i < column.size(); ++i) { auto& hll = const_cast(data_column.get_element(i)); -allocate_content_size += hll.max_serialized_size(); +std::string result(hll.max_serialized_size(), '0'); +size_t actual_size = hll.serialize((uint8_t*)result.c_str()); Review comment: should not serialize two times -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] github-actions[bot] commented on pull request #7473: [Feature][Transaction] Support two phase commit (2PC) for stream load
github-actions[bot] commented on pull request #7473: URL: https://github.com/apache/incubator-doris/pull/7473#issuecomment-1040343251 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] caiconghui commented on a change in pull request #8035: [Refactor][heartbeat] Make get fe heart response by thrift and make BootstrapFinishAction Deprecated
caiconghui commented on a change in pull request #8035: URL: https://github.com/apache/incubator-doris/pull/8035#discussion_r806904837 ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: > Why deprecate this? get heartbeat info by rpc method is a more stable way, once get heartbeat by thrift, BootstrapFinishAction is not needed anymore, here we just keep compatible when fe upgrade ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: > I thought that `BootstrapFinishAction` is nothing to do with the heartbeat? It is used for checking whether the FE is started successfully. controller.registerHandler(HttpMethod.GET, "/api/bootstrap", new BootstrapFinishAction(controller)) it is now used for heartbeat private HeartbeatResponse getHeartbeatResponseByHttp() { String url = "http://"; + fe.getHost() + ":" + Config.http_port + "/api/bootstrap?cluster_id=" + clusterId + "&token=" + token; ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: 1. we still can use thrift to check whether FE is started, which is duplicated with http method 2. no need, user just upgrade it, no need to know the change, because when user use the new fe version, the cluster will use thrift to get fe heartbeat info by default and cannot change , we still keep BootstrapFinishAction for compatibility 3. done ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: 1. no need, user just upgrade it, no need to know the change, because when user use the new fe version, the cluster will use thrift to get fe heartbeat info by default and cannot change , now we still keep BootstrapFinishAction and need to remove deprecate 2. done for completing the required fields in PR template. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] morningman commented on a change in pull request #8035: [Refactor][heartbeat] Make get fe heart response by thrift and make BootstrapFinishAction Deprecated
morningman commented on a change in pull request #8035: URL: https://github.com/apache/incubator-doris/pull/8035#discussion_r806896577 ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: Why deprecate this? ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: I thought that `BootstrapFinishAction` is nothing to do with the heartbeat? It is used for checking whether the FE is started successfully. ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: Oh, I forgot. But this API is also used for checking whether FE is started. So we still need it. Just not use it for heartbeat. ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: And by the way, most of user does not switch the heartbeat method to thrift way. So maybe we need notice the user by somehow. ## File path: fe/fe-core/src/main/java/org/apache/doris/http/rest/BootstrapFinishAction.java ## @@ -37,6 +37,7 @@ * {"status":"OK","msg":"Success","replayedJournal"=123456, "queryPort"=9000, "rpcPort"=9001} * {"status":"FAILED","msg":"err info..."} */ +@Deprecated Review comment: And by the way, most of user does not switch the heartbeat method to thrift way. So maybe we need notice the user by somehow. And you forgot the complete the required fields in PR template. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris-flink-connector] liuyaolin opened a new pull request #4: When doing checkpoint, write cache data to doris to prevent loss
liuyaolin opened a new pull request #4: URL: https://github.com/apache/incubator-doris-flink-connector/pull/4 # Proposed changes Issue Number: close ## Problem Summary: Describe the overview of changes. flink写doris是批量写入,用户设置条数为1000条时,才flush,假如写入500条时,程序正好做checkpoint成功了,kafka的offset做了相应的commit,后面500条过来后,未做checkpoint,但此时doris服务器出了问题,导致flush报错,flink的task重启,重新从上次checkpoint commit的offset消费kafka,导致前面500条数据出现丢失情况 ## Checklist(Required) 1. Does it affect the original behavior: (No) 2. Has unit tests been added: (No) 3. Has document been added or modified: (No) 4. Does it need to update dependencies: (No) 5. Are there any changes that cannot be rolled back: (No) ## Further comments If this is a relatively large or complex change, kick off the discussion at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] englefly opened a new issue #8073: [Bug]
englefly opened a new issue #8073: URL: https://github.com/apache/incubator-doris/issues/8073 ### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-doris/issues?q=is%3Aissue) and found no similar issues. ### Version branch-0.14 branch-0.15 df2c7563b08e94801daeb4370a1627886dbc3d34 ### What's Wrong? after alter schema, changing bigint column to string (release 0.14) or varchar (release 0.15), be is down. ### What You Expected? safely alter schema: `alter table table_1 modify column b varchar(1);` ### How to Reproduce? 1. create a table `table_1` ``` CREATE TABLE `table_1` ( `a` bigint(20) NULL COMMENT "主键", `b` int(11) NULL COMMENT "", ENGINE=OLAP UNIQUE KEY(`a`) COMMENT "" DISTRIBUTED BY HASH(`a`) BUCKETS 8 PROPERTIES ( "replication_num" = "1", "in_memory" = "false", "storage_format" = "V2" ); ``` 2. load data into `table_1` ( 1 tuples) 3. `alter table table_1 modify column b varchar(1);` 4. be crashed. ### Anything Else? with a few tuples, eg. 10 tuples, after alter schema, be still alive, but results of `select * from table_1` are messed. ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #8071: Correct the wrong words in the prompt information
EmmyMiao87 commented on a change in pull request #8071: URL: https://github.com/apache/incubator-doris/pull/8071#discussion_r807488793 ## File path: manager/dm-server/src/main/java/org/apache/doris/stack/agent/AgentHeatbeatRunner.java ## @@ -76,7 +76,7 @@ private void heartbeatCheck() { agent.setStatus(AgentStatus.STOP); agentRepository.save(agent); agentCache.putAgent(agent); -log.warn("agent {} is unhealthly", agent.getHost()); +log.warn("agent {} is unhealthily", agent.getHost()); Review comment: ```suggestion log.warn("agent {} is unhealthy", agent.getHost()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] adonis0147 opened a new issue #8074: [Feature] [array-type] Support ArrayLiteral in SQL.
adonis0147 opened a new issue #8074: URL: https://github.com/apache/incubator-doris/issues/8074 ### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-doris/issues?q=is%3Aissue) and found no similar issues. ### Description This issue is a sub task of #7570 . Support ArrayLiteral in SQL. An array literal is a list of zero or more expressions, each of which represents an array element, enclosed in square brackets (eg. [1, 2, 3]). ### Use case ```shell mysql> create table array_type_table(k1 INT, k2 Array>) duplicate key (k1) -> distributed by hash(k1) buckets 1 properties('replication_num' = '1'); mysql> insert into array_type_table values (1, [1, 2, 3]); ``` ### Related issues #757 ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] adonis0147 opened a new pull request #8075: Array type dev 1
adonis0147 opened a new pull request #8075: URL: https://github.com/apache/incubator-doris/pull/8075 # Proposed changes Issue Number: close #8074 . ## Problem Summary: Please refer to #8074 . ## Checklist(Required) 1. Does it affect the original behavior: No. 2. Has unit tests been added: Yes. 3. Has document been added or modified: No. 4. Does it need to update dependencies: No. 5. Are there any changes that cannot be rolled back: No. ## Further comments If this is a relatively large or complex change, kick off the discussion at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] Gongruixiao opened a new pull request #8076: [Docs] Update alter table docs
Gongruixiao opened a new pull request #8076: URL: https://github.com/apache/incubator-doris/pull/8076 # Proposed changes Update alter table docs. ## Checklist(Required) 1. Does it affect the original behavior: (No) 2. Has unit tests been added: (No) 3. Has document been added or modified: (Yes) 4. Does it need to update dependencies: (No) 5. Are there any changes that cannot be rolled back: (No) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] adonis0147 commented on pull request #8075: Array type dev 1
adonis0147 commented on pull request #8075: URL: https://github.com/apache/incubator-doris/pull/8075#issuecomment-1041045560 Hi @yangzhg , please review this pr when you are free. This pr is for the branch _**array-type**_. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] luzhijing commented on pull request #8076: [Docs] Update alter table docs
luzhijing commented on pull request #8076: URL: https://github.com/apache/incubator-doris/pull/8076#issuecomment-1041045649 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] github-actions[bot] commented on pull request #7880: [New feature](statistics) Step1: Statistics collection framework
github-actions[bot] commented on pull request #7880: URL: https://github.com/apache/incubator-doris/pull/7880#issuecomment-1041045876 PR approved by at least one committer and no changes requested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] BiteTheDDDDt opened a new issue #8077: [Bug] fix clang compile error on MysqlTableWriter
BiteThet opened a new issue #8077: URL: https://github.com/apache/incubator-doris/issues/8077 ### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-doris/issues?q=is%3Aissue) and found no similar issues. ### Version master ### What's Wrong? clang compile error on empty reference ```cpp :_output_expr_ctxs {},... ``` Reference member '_vec_output_expr_ctxs' binds to a temporary object whose lifetime would be shorter than the lifetime of the constructed objectclang(dangling_member) mysql_table_writer.h(75, 51): Reference member declared here ### What You Expected? fix it ### How to Reproduce? _No response_ ### Anything Else? _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] BiteTheDDDDt opened a new pull request #8078: [Bug] fix clang compile error on MysqlTableWriter
BiteThet opened a new pull request #8078: URL: https://github.com/apache/incubator-doris/pull/8078 # Proposed changes Issue Number: close #8077 ## Problem Summary: Describe the overview of changes. ## Checklist(Required) 1. Does it affect the original behavior: (Yes/No/I Don't know) 2. Has unit tests been added: (Yes/No/No Need) 3. Has document been added or modified: (Yes/No/No Need) 4. Does it need to update dependencies: (Yes/No) 5. Are there any changes that cannot be rolled back: (Yes/No) ## Further comments If this is a relatively large or complex change, kick off the discussion at [d...@doris.apache.org](mailto:d...@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] github-actions[bot] commented on pull request #8076: [Docs] Update alter table docs
github-actions[bot] commented on pull request #8076: URL: https://github.com/apache/incubator-doris/pull/8076#issuecomment-1041047863 PR approved by at least one committer and no changes requested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] BiteTheDDDDt closed pull request #8078: [Bug] fix clang compile error on MysqlTableWriter
BiteThet closed pull request #8078: URL: https://github.com/apache/incubator-doris/pull/8078 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] liuyaolin opened a new issue #8079: [Bug] Flink Doris Sink当提交异常情况时,出现数据丢失
liuyaolin opened a new issue #8079: URL: https://github.com/apache/incubator-doris/issues/8079 ### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-doris/issues?q=is%3Aissue) and found no similar issues. ### Version doris-flink-connector-1.11.6-2.12-1.0.0-SNAPSHOT ### What's Wrong? flink doris sink是批量写入,每个task上启动一个定时任务,定时批量flush数据到doris,假如用户设置条数为1000条时,才flush,当写入500条时,程序正好做checkpoint成功了,kafka的offset做了相应的commit,后面500条过来后,未做checkpoint,但此时doris服务器出了问题,导致flush报错,flink的task重启,重新从上次checkpoint commit的offset消费kafka,导致前面500条数据出现丢失情况 ### What You Expected? 在checkpoint时,也就是在GenericDorisSinkFunction类的initializeState方法中执行一次flush,防止数据丢失 ### How to Reproduce? _No response_ ### Anything Else? _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris] EmmyMiao87 merged pull request #7880: [New feature](statistics) Step1: Statistics collection framework
EmmyMiao87 merged pull request #7880: URL: https://github.com/apache/incubator-doris/pull/7880 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[GitHub] [incubator-doris-flink-connector] liuyaolin commented on pull request #4: When doing checkpoint, write cache data to doris to prevent loss
liuyaolin commented on pull request #4: URL: https://github.com/apache/incubator-doris-flink-connector/pull/4#issuecomment-1041050398 association issues :https://github.com/apache/incubator-doris/issues/8079 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris] branch master updated (25d6477 -> 6ccf9db)
This is an automated email from the ASF dual-hosted git repository. lingmiao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git. from 25d6477 [Vectorized][Feature] Support mysql external table insert into stm (#7979) add 6ccf9db [feature-wip](statistics) Step1: Statistics collection framework (#7880) No new revisions were added by this update. Summary of changes: .../{AlterClusterType.java => AnalyzeStmt.java}| 22 +++-- .../java/org/apache/doris/catalog/Catalog.java | 22 + .../main/java/org/apache/doris/common/Config.java | 18 + .../apache/doris/planner/DistributedPlanner.java | 14 .../MetaStatisticsTask.java} | 20 ++--- .../apache/doris/statistics/SQLStatisticsTask.java | 73 + .../SampleSQLStatisticsTask.java} | 38 - .../org/apache/doris/statistics/StatisticsJob.java | 79 ++ .../doris/statistics/StatisticsJobManager.java | 73 + .../doris/statistics/StatisticsJobScheduler.java | 63 +++ .../apache/doris/statistics/StatisticsTask.java| 55 + .../StatisticsTaskResult.java} | 29 +++ .../doris/statistics/StatisticsTaskScheduler.java | 93 ++ .../StatsCategoryDesc.java}| 16 ++-- .../StatsGranularityDesc.java} | 19 +++-- .../org/apache/doris/statistics/StatsType.java | 15 ++-- 16 files changed, 557 insertions(+), 92 deletions(-) copy fe/fe-core/src/main/java/org/apache/doris/analysis/{AlterClusterType.java => AnalyzeStmt.java} (68%) copy fe/fe-core/src/main/java/org/apache/doris/{analysis/ImportColumnsStmt.java => statistics/MetaStatisticsTask.java} (64%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/SQLStatisticsTask.java copy fe/fe-core/src/main/java/org/apache/doris/{plugin/PluginContext.java => statistics/SampleSQLStatisticsTask.java} (50%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java copy fe/fe-core/src/main/java/org/apache/doris/{analysis/AlterClause.java => statistics/StatisticsTaskResult.java} (60%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java copy fe/fe-core/src/main/java/org/apache/doris/{analysis/AlterClusterType.java => statistics/StatsCategoryDesc.java} (78%) copy fe/fe-core/src/main/java/org/apache/doris/{analysis/AlterClusterType.java => statistics/StatsGranularityDesc.java} (74%) copy be/src/common/env_config.h.in => fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java (82%) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 02/50: Fix document bugs in spark-doris-connector (#2275)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 51e8ee622d08531d07fcc43882dced872de7835d Author: vinson0526 AuthorDate: Fri Nov 22 18:05:36 2019 +0800 Fix document bugs in spark-doris-connector (#2275) --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 76e1d35..76f9d95 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ val dorisSparkDF = spark.read.format("doris") .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") .option("user", "$YOUR_DORIS_USERNAME") .option("password", "$YOUR_DORIS_PASSWORD") + .load() dorisSparkDF.show(5) ``` - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 03/50: Fix bug when spark on doris run long time (#2485)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit b53dc00c2c895b3879c9f2053243a3e92e6c9f29 Author: Youngwb AuthorDate: Wed Dec 18 13:08:21 2019 +0800 Fix bug when spark on doris run long time (#2485) --- README.md| 1 + .../java/org/apache/doris/spark/cfg/ConfigurationOptions.java| 2 ++ src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala | 9 + src/main/thrift/doris/DorisExternalService.thrift| 2 ++ 4 files changed, 14 insertions(+) diff --git a/README.md b/README.md index 76f9d95..2b0d2ef 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,7 @@ dorisSparkRDD.collect() | doris.request.retries| 3 | 向Doris发送请求的重试次数 | | doris.request.connect.timeout.ms | 3 | 向Doris发送请求的连接超时时间 | | doris.request.read.timeout.ms| 3 | 向Doris发送请求的读取超时时间 | +| doris.request.query.timeout.s| 3600 | 查询doris的超时时间,默认值为1小时,-1表示无超时限制 | | doris.request.tablet.size| Integer.MAX_VALUE | 一个RDD Partition对应的Doris Tablet个数。此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 | | doris.batch.size | 1024 | 一次从BE读取数据的最大行数。增大此数值可减少Spark与Doris之间建立连接的次数。从而减轻网络延迟所带来的的额外时间开销。 | diff --git a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 63dede1..d9b4231 100644 --- a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -40,9 +40,11 @@ public interface ConfigurationOptions { String DORIS_REQUEST_RETRIES = "doris.request.retries"; String DORIS_REQUEST_CONNECT_TIMEOUT_MS = "doris.request.connect.timeout.ms"; String DORIS_REQUEST_READ_TIMEOUT_MS = "doris.request.read.timeout.ms"; +String DORIS_REQUEST_QUERY_TIMEOUT_S = "doris.request.query.timeout.s"; int DORIS_REQUEST_RETRIES_DEFAULT = 3; int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000; int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000; +int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600; String DORIS_TABLET_SIZE = "doris.request.tablet.size"; int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE; diff --git a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index dabd826..16c5feb 100644 --- a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -64,7 +64,15 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { DORIS_BATCH_SIZE_DEFAULT } +val queryDorisTimeout = Try { + settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S, DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString).toInt +} getOrElse { + logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S)) + DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT +} + params.setBatch_size(batchSize) +params.setQuery_timeout(queryDorisTimeout) params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, "")) params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "")) @@ -74,6 +82,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { s"table: ${params.getTable}, " + s"tabletId: ${params.getTablet_ids}, " + s"batch size: $batchSize, " + +s"query timeout: $queryDorisTimeout, " + s"user: ${params.getUser}, " + s"query plan: ${params.opaqued_query_plan}") diff --git a/src/main/thrift/doris/DorisExternalService.thrift b/src/main/thrift/doris/DorisExternalService.thrift index 9a0b9b5..d3f7e8e 100644 --- a/src/main/thrift/doris/DorisExternalService.thrift +++ b/src/main/thrift/doris/DorisExternalService.thrift @@ -54,6 +54,8 @@ struct TScanOpenParams { 10: optional string passwd // max keep alive time min 11: optional i16 keep_alive_min + + 12: optional i32 query_timeout } struct TScanColumnDesc { - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] branch master updated (afb45af -> dffb307)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git. discard afb45af [Bug-Fix][Spark-Doris-Connector] resolve the problem of writing Chinese garbled characters (#6) discard f7f995a [community](license) Add license file for dependencies (#5) omit 1e6c5a8 unprotected (#4) omit ff7f389 [community] move GitBox mail to commits@doris (#3) omit 8fdc9f4 [fix](spark connector) fix spark connector unsupport STRING type. (#2) omit b549f07 [init] add missing files (#1) omit daffbaa [init] init commit new 176d2af Add spark-doris-connector extension (#2228) new 51e8ee6 Fix document bugs in spark-doris-connector (#2275) new b53dc00 Fix bug when spark on doris run long time (#2485) new 5d3f637 Fix npe in spark-doris-connector when query is complex (#2503) new 817c819 Spark return error to users when spark on doris query failed (#2531) new 921a2ca Convert from arrow to rowbatch (#2723) new 576a133 Update arrow's version to 0.15.1 and shaded it in spark-doris-connector (#2769) new 628cf75 Support param exec_mem_limit for spark-doris-connctor (#2775) new d3eebd4 Remove unused KUDU codes (#3175) new cd46034 [Spark] Support convert Arrow data to RowBatch asynchronously in Spark-Doris-Connector (#3186) new 0634c64 [License] Add License to codes (#3272) new d129998 [Spark on Doris] Shade and provide the thrift lib in spark-doris-connector (#3631) new ad9eff5 [Spark on Doris] fix the encode of varchar when convertArrowToRowBatch (#5202) new 3479e9a [Bug] Spark doris connector http v2 authentication fails, and HTTP v2 interface returns json nesting problem (#5366) new c1585ee Fix file licences (#5414) new 293bd8f [Spark-Doris-Connector][Bug-Fix] Resolve deserialize exception when Spark Doris Connector in aync deserialize mode (#5336) new d121e12 [Bug] Modify spark, flink doris connector to send request to FE, fix the problem of POST method, it should be the same as the method when sending the request (#5788) new e73f796 [Feature] Support spark connector sink data to Doris (#6256) new 3168584 [Doc] flink/spark connector: add sources/javadoc plugins (#6435) new 27381fe [Improve]The connector supports spark 3.0, flink 1.13 (#6449) new 9191e5b Spark 2.x and 3.x version compilation instructions (#6503) new acf8869 [Feature] support spark connector sink stream data to doris (#6761) new 39e8483 [Feature] support spark connector sink data using sql (#6796) new 7d1fa42 [Dependency] Upgrade thirdparty libs (#6766) new 18d3ade Fix spark connector build error (#6948) new dc9203a [Optimize] Spark connector supports multiple spark versions:2.1.x/2.3.x/2.4.x/3.x (#6956) new d8ad175 [Feature] Spark connector supports to specify fields to write (#6973) new 00589ff [Revert] Revert RestService.java (#6994) new 27b72a7 [HTTP][API] Add backends info API for spark/flink connector (#6984) new be2cded [Build]Compile and output the jar file, add Spark, Flink version and Scala version (#7051) new 6cd3915 [Compile] Fix spark-connector compile problem (#7048) new 031aac0 [SparkConnector] Add thrift dir for spark connector (#7074) new 30530a9 [Feature] Support Flink and Spark connector support String type (#7075) new 95d494c [License] Add License header for missing files (#7130) new c528252 [Improvement](spark-connector) Add 'sink.batch.size' and 'sink.max-retries' options in spark-connector (#7281) new b799643 [chore][community](github) Remove travis and add github action (#7380) new 115ae27 [improvement](spark-connector)(flink-connector) Modify the max num of batch written by Spark/Flink connector each time. (#7485) new 37383c7 [refactor] Standardize the writing of pom files, prepare for deployment to maven (#7477) new ee6969a [refactor] update parent pom version and optimize build scripts (#7548) new 5a6b28f [chore][docs] add deploy spark/flink connectors to maven release repo docs (#7616) new 040b959 [improvement](spark-connector) Stream load http exception handling (#7514) new ecc7633 [refactor](spark-connector) delete useless maven dependencies and some code variable definition issues (#7655) new 24c633c [improvement](spark-connector) Throw an exception when the data push fails and there are too many retries (#7531) new 19542ef Flink / Spark connector compilation problem (#7725) new b8ccc3c [chore][fix][doc](fe-plugin)(mysqldump) fix build auditlog plugin error (#7804) new 8558180 [fix](httpv2) make http v2 and v1 interface compatible (#7848) new ce7d406 [init] init commit new 0923ced [init] do some init work new c5bf0b2 [fix](spark connector) fix spark con
[incubator-doris-spark-connector] 05/50: Spark return error to users when spark on doris query failed (#2531)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 817c819f5af8c12121362521114ff4feaca5b3cc Author: Youngwb AuthorDate: Mon Dec 30 21:58:13 2019 +0800 Spark return error to users when spark on doris query failed (#2531) --- .../org/apache/doris/spark/backend/BackendClient.java| 13 +++-- .../DorisInternalException.java} | 16 ++-- .../java/org/apache/doris/spark/util/ErrorMessages.java | 1 + 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/backend/BackendClient.java b/src/main/java/org/apache/doris/spark/backend/BackendClient.java index 62d6398..90baf79 100644 --- a/src/main/java/org/apache/doris/spark/backend/BackendClient.java +++ b/src/main/java/org/apache/doris/spark/backend/BackendClient.java @@ -19,6 +19,8 @@ package org.apache.doris.spark.backend; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.exception.ConnectedFailedException; +import org.apache.doris.spark.exception.DorisException; +import org.apache.doris.spark.exception.DorisInternalException; import org.apache.doris.spark.util.ErrorMessages; import org.apache.doris.spark.cfg.Settings; import org.apache.doris.spark.serialization.Routing; @@ -151,16 +153,17 @@ public class BackendClient { * @return scan batch result * @throws ConnectedFailedException throw if cannot connect to Doris BE */ -public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws ConnectedFailedException { +public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws DorisException { logger.debug("GetNext to '{}', parameter is '{}'.", routing, nextBatchParams); if (!isConnected) { open(); } TException ex = null; +TScanBatchResult result = null; for (int attempt = 0; attempt < retries; ++attempt) { logger.debug("Attempt {} to getNext {}.", attempt, routing); try { -TScanBatchResult result = client.get_next(nextBatchParams); +result = client.get_next(nextBatchParams); if (result == null) { logger.warn("GetNext result from {} is null.", routing); continue; @@ -176,6 +179,12 @@ public class BackendClient { ex = e; } } +if (result != null && (TStatusCode.OK != (result.getStatus().getStatus_code( { +logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, routing, result.getStatus().getStatus_code(), +result.getStatus().getError_msgs()); +throw new DorisInternalException(routing.toString(), result.getStatus().getStatus_code(), +result.getStatus().getError_msgs()); +} logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing); throw new ConnectedFailedException(routing.toString(), ex); } diff --git a/src/main/java/org/apache/doris/spark/util/ErrorMessages.java b/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java similarity index 63% copy from src/main/java/org/apache/doris/spark/util/ErrorMessages.java copy to src/main/java/org/apache/doris/spark/exception/DorisInternalException.java index aff289d..f42acee 100644 --- a/src/main/java/org/apache/doris/spark/util/ErrorMessages.java +++ b/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java @@ -15,11 +15,15 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.spark.util; +package org.apache.doris.spark.exception; + +import org.apache.doris.thrift.TStatusCode; + +import java.util.List; + +public class DorisInternalException extends DorisException { +public DorisInternalException(String server, TStatusCode statusCode, List errorMsgs) { +super("Doris server " + server + " internal failed, status code [" + statusCode + "] error message is " + errorMsgs); +} -public abstract class ErrorMessages { -public static final String PARSE_NUMBER_FAILED_MESSAGE = "Parse '{}' to number failed. Original string is '{}'."; -public static final String CONNECT_FAILED_MESSAGE = "Connect to doris {} failed."; -public static final String ILLEGAL_ARGUMENT_MESSAGE = "argument '{}' is illegal, value is '{}'."; -public static final String SHOULD_NOT_HAPPEN_MESSAGE = "Should not come here."; } diff --git a/src/main/java/org/apache/doris/spark/util/ErrorMessages.java b/src/main/java/org/apache/doris/spark/util/ErrorMessages.java index aff289d..92a04e9 100644 --- a/src/main/java/org/apache/doris/spark/util/ErrorMessages.java +++ b/src/main/java/org/apache/doris/spark/util/ErrorMessages.java @@ -22,4 +22,5 @@ public abstract
[incubator-doris-spark-connector] 10/50: [Spark] Support convert Arrow data to RowBatch asynchronously in Spark-Doris-Connector (#3186)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit cd46034a84ef5e6ef9e359a7d3afb8f1b2535854 Author: Youngwb AuthorDate: Thu Mar 26 21:34:37 2020 +0800 [Spark] Support convert Arrow data to RowBatch asynchronously in Spark-Doris-Connector (#3186) Currently, in the Spark-Doris-Connector, when Spark iteratively obtains each row of data, it needs to synchronously convert the Arrow format data into the row format required by Spark. In order to speed up the conversion process, we can add an asynchronous thread in the Connector, which is responsible for obtaining the Arrow format data from BE and converting it into the row format required by Spark calculation In our test environment, Doris cluster used 1 fe and 7 be (32C+128G). When using Spark-Doris-Connector to query a table containing 67 columns, the original query returned 69 million rows of data took about 2.5min, but after improvement, it reduced to about 1.6min, which reduced the time by about 30% --- README.md | 2 + .../doris/spark/cfg/ConfigurationOptions.java | 6 ++ .../apache/doris/spark/serialization/RowBatch.java | 67 ++--- .../org/apache/doris/spark/util/ErrorMessages.java | 1 + .../apache/doris/spark/rdd/ScalaValueReader.scala | 106 ++--- 5 files changed, 129 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index d32db83..3c41b93 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,8 @@ dorisSparkRDD.collect() | doris.request.tablet.size| Integer.MAX_VALUE | 一个RDD Partition对应的Doris Tablet个数。此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 | | doris.batch.size | 1024 | 一次从BE读取数据的最大行数。增大此数值可减少Spark与Doris之间建立连接的次数。从而减轻网络延迟所带来的的额外时间开销。 | | doris.exec.mem.limit | 2147483648| 单个查询的内存限制。默认为 2GB,单位为字节 | +| doris.deserialize.arrow.async| false | 是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch | +| doris.deserialize.queue.size | 64| 异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效| ### SQL and Dataframe Only diff --git a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 742c3eb..1bb5dfc 100644 --- a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -57,4 +57,10 @@ public interface ConfigurationOptions { long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L; String DORIS_VALUE_READER_CLASS = "doris.value.reader.class"; + +String DORIS_DESERIALIZE_ARROW_ASYNC = "doris.deserialize.arrow.async"; +boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false; + +String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size"; +int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64; } diff --git a/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index d710fbb..0781f1e 100644 --- a/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -70,7 +70,8 @@ public class RowBatch { } } -private int offsetInOneBatch = 0; +// offset for iterate the rowBatch +private int offsetInRowBatch = 0; private int rowCountInOneBatch = 0; private int readRowCount = 0; private List rowBatch = new ArrayList<>(); @@ -87,50 +88,40 @@ public class RowBatch { new ByteArrayInputStream(nextResult.getRows()), rootAllocator ); +this.offsetInRowBatch = 0; try { this.root = arrowStreamReader.getVectorSchemaRoot(); +while (arrowStreamReader.loadNextBatch()) { +fieldVectors = root.getFieldVectors(); +if (fieldVectors.size() != schema.size()) { +logger.error("Schema size '{}' is not equal to arrow field size '{}'.", +fieldVectors.size(), schema.size()); +throw new DorisException("Load Doris data failed, schema size of fetch data is wrong."); +} +if (fieldVectors.size() == 0 || root.getRowCount() == 0) { +logger.debug("One batch in arrow has no data."); +continue; +} +rowCountInOneBatch = root.getRowCount(); +// init the rowBatch +for (int i = 0; i < rowCountInOneBatch; ++i) { +rowBatch.add(new Row(fieldVectors.size())); +} +convertArrowToRowBatch
[incubator-doris-spark-connector] 04/50: Fix npe in spark-doris-connector when query is complex (#2503)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 5d3f6372c462cea030a3172e040843389e7e4236 Author: vinson0526 AuthorDate: Thu Dec 19 14:53:29 2019 +0800 Fix npe in spark-doris-connector when query is complex (#2503) --- src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index 16c5feb..163ee16 100644 --- a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -101,7 +101,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { * @return true if hax next value */ def hasNext: Boolean = { -if ((rowBatch == null || !rowBatch.hasNext) && !eos) { +if (!eos && (rowBatch == null || !rowBatch.hasNext)) { if (rowBatch != null) { offset += rowBatch.getReadRowCount rowBatch.close - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 06/50: Convert from arrow to rowbatch (#2723)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 921a2caa836f1621557e3d56d67689e05f64786e Author: Youngwb AuthorDate: Fri Jan 10 14:11:15 2020 +0800 Convert from arrow to rowbatch (#2723) For #2722 In our test environment, Doris cluster used 1 fe and 7 be (32C+128G). When using spakr-doris connecter to query a table containing 67 columns, it took about 1 hour for the query to return 69 million rows of data. After the improvement, the same query condition took 2.5 minutes and the query performance was significantly improved --- .../apache/doris/spark/serialization/RowBatch.java | 136 +++-- 1 file changed, 96 insertions(+), 40 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index 668e72d..d710fbb 100644 --- a/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -73,6 +73,7 @@ public class RowBatch { private int offsetInOneBatch = 0; private int rowCountInOneBatch = 0; private int readRowCount = 0; +private List rowBatch = new ArrayList<>(); private final ArrowStreamReader arrowStreamReader; private final VectorSchemaRoot root; private List fieldVectors; @@ -115,6 +116,11 @@ public class RowBatch { } offsetInOneBatch = 0; rowCountInOneBatch = root.getRowCount(); +// init the rowBatch +for (int i = 0; i < rowCountInOneBatch; ++i) { +rowBatch.add(new Row(fieldVectors.size())); +} +convertArrowToRowBatch(); return true; } } catch (IOException e) { @@ -128,98 +134,135 @@ public class RowBatch { return false; } -public List next() throws DorisException { +private void addValueToRow(int rowIndex, Object obj) { +if (rowIndex > rowCountInOneBatch) { +String errMsg = "Get row offset: " + rowIndex + " larger than row size: " + +rowCountInOneBatch; +logger.error(errMsg); +throw new NoSuchElementException(errMsg); +} +rowBatch.get(rowIndex).put(obj); +} + +public void convertArrowToRowBatch() throws DorisException { try { -if (!hasNext()) { -String errMsg = "Get row offset:" + offsetInOneBatch + " larger than row size: " + rowCountInOneBatch; -logger.error(errMsg); -throw new NoSuchElementException(errMsg); -} -Row row = new Row(fieldVectors.size()); -for (int j = 0; j < fieldVectors.size(); j++) { -FieldVector curFieldVector = fieldVectors.get(j); +for (int col = 0; col < fieldVectors.size(); col++) { +FieldVector curFieldVector = fieldVectors.get(col); Types.MinorType mt = curFieldVector.getMinorType(); -if (curFieldVector.isNull(offsetInOneBatch)) { -row.put(null); -continue; -} -final String currentType = schema.get(j).getType(); +final String currentType = schema.get(col).getType(); switch (currentType) { case "NULL_TYPE": -row.put(null); +for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { +addValueToRow(rowIndex, null); +} break; case "BOOLEAN": Preconditions.checkArgument(mt.equals(Types.MinorType.BIT), typeMismatchMessage(currentType, mt)); BitVector bitVector = (BitVector) curFieldVector; -int bit = bitVector.get(offsetInOneBatch); -row.put(bit != 0); +for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { +Object fieldValue = bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0; +addValueToRow(rowIndex, fieldValue); +} break; case "TINYINT": Preconditions.checkArgument(mt.equals(Types.MinorType.TINYINT), typeMismatchMessage(currentType, mt)); TinyIntVector tinyIntVector = (TinyIntVector) curFieldVector; -row.put(tinyIntVector.get(offsetInOneBatch)); +for (int rowI
[incubator-doris-spark-connector] 08/50: Support param exec_mem_limit for spark-doris-connctor (#2775)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 628cf758e723b468050194dd9a6a7b7fa130dc07 Author: Youngwb AuthorDate: Sat Jan 18 00:14:39 2020 +0800 Support param exec_mem_limit for spark-doris-connctor (#2775) --- README.md| 1 + .../java/org/apache/doris/spark/cfg/ConfigurationOptions.java| 3 +++ src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala | 9 + src/main/thrift/doris/DorisExternalService.thrift| 3 +++ 4 files changed, 16 insertions(+) diff --git a/README.md b/README.md index 2b0d2ef..d32db83 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,7 @@ dorisSparkRDD.collect() | doris.request.query.timeout.s| 3600 | 查询doris的超时时间,默认值为1小时,-1表示无超时限制 | | doris.request.tablet.size| Integer.MAX_VALUE | 一个RDD Partition对应的Doris Tablet个数。此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 | | doris.batch.size | 1024 | 一次从BE读取数据的最大行数。增大此数值可减少Spark与Doris之间建立连接的次数。从而减轻网络延迟所带来的的额外时间开销。 | +| doris.exec.mem.limit | 2147483648| 单个查询的内存限制。默认为 2GB,单位为字节 | ### SQL and Dataframe Only diff --git a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index d9b4231..742c3eb 100644 --- a/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -53,5 +53,8 @@ public interface ConfigurationOptions { String DORIS_BATCH_SIZE = "doris.batch.size"; int DORIS_BATCH_SIZE_DEFAULT = 1024; +String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit"; +long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L; + String DORIS_VALUE_READER_CLASS = "doris.value.reader.class"; } diff --git a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index 163ee16..13a955a 100644 --- a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -71,8 +71,16 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT } +val execMemLimit = Try { + settings.getProperty(DORIS_EXEC_MEM_LIMIT, DORIS_EXEC_MEM_LIMIT_DEFAULT.toString).toLong +} getOrElse { + logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT)) + DORIS_EXEC_MEM_LIMIT_DEFAULT +} + params.setBatch_size(batchSize) params.setQuery_timeout(queryDorisTimeout) +params.setMem_limit(execMemLimit) params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, "")) params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "")) @@ -83,6 +91,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { s"tabletId: ${params.getTablet_ids}, " + s"batch size: $batchSize, " + s"query timeout: $queryDorisTimeout, " + +s"execution memory limit: $execMemLimit, " + s"user: ${params.getUser}, " + s"query plan: ${params.opaqued_query_plan}") diff --git a/src/main/thrift/doris/DorisExternalService.thrift b/src/main/thrift/doris/DorisExternalService.thrift index d3f7e8e..c169874 100644 --- a/src/main/thrift/doris/DorisExternalService.thrift +++ b/src/main/thrift/doris/DorisExternalService.thrift @@ -56,6 +56,9 @@ struct TScanOpenParams { 11: optional i16 keep_alive_min 12: optional i32 query_timeout + + // memory limit for a single query + 13: optional i64 mem_limit } struct TScanColumnDesc { - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 14/50: [Bug] Spark doris connector http v2 authentication fails, and HTTP v2 interface returns json nesting problem (#5366)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 3479e9a73359162b826e5c8ea52623c14ea7b6b3 Author: 张家锋 AuthorDate: Sun Feb 7 09:28:55 2021 +0800 [Bug] Spark doris connector http v2 authentication fails, and HTTP v2 interface returns json nesting problem (#5366) 1. Deal with the problem of inconsistent data format returned by http v1 and v2 2. Deal with user authentication failure --- .../org/apache/doris/spark/rest/RestService.java | 67 +- .../doris/spark/sql/SparkDorisConnector.scala | 27 + 2 files changed, 79 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/rest/RestService.java b/src/main/java/org/apache/doris/spark/rest/RestService.java index 2404507..3c8249c 100644 --- a/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -32,17 +32,23 @@ import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESS import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Base64; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.ArrayList; import java.util.Set; +import java.util.HashSet; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.cfg.Settings; @@ -113,32 +119,36 @@ public class RestService implements Serializable { String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, ""); String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""); -CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); -credentialsProvider.setCredentials( -AuthScope.ANY, -new UsernamePasswordCredentials(user, password)); -HttpClientContext context = HttpClientContext.create(); -context.setCredentialsProvider(credentialsProvider); logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user); IOException ex = null; int statusCode = -1; for (int attempt = 0; attempt < retries; attempt++) { -CloseableHttpClient httpClient = HttpClients.createDefault(); logger.debug("Attempt {} to request {}.", attempt, request.getURI()); try { -CloseableHttpResponse response = httpClient.execute(request, context); -statusCode = response.getStatusLine().getStatusCode(); +HttpURLConnection conn = getConnection(request, user, password); +statusCode = conn.getResponseCode(); if (statusCode != HttpStatus.SC_OK) { logger.warn("Failed to get response from Doris FE {}, http code is {}", request.getURI(), statusCode); continue; } -String res = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); +InputStream stream = (InputStream) conn.getContent(); +String res = IOUtils.toString(stream); logger.trace("Success get response from Doris FE: {}, response is: {}.", request.getURI(), res); -return res; + +ObjectMapper mapper = new ObjectMapper(); + +Map map = mapper.readValue(res, Map.class); +//Handle the problem of inconsistent data format returned by http v1 and v2 +if(map.containsKey("code") && map.containsKey("msg")) { +Object data = map.get("data"); +return mapper.writeValueAsString(data); +} else { +return res; +} } catch (IOException e) { ex = e; logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e); @@ -149,6 +159,33 @@ public class RestService implements Serializable { throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex); } + +/** + * Get http connection + * @param request + * @param user + * @param passwd + * @return + * @throws IOException + */ +private static HttpURLConnection getConnection(HttpRequestBase request, String user, String passwd) throws IOExcep
[incubator-doris-spark-connector] 25/50: Fix spark connector build error (#6948)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 18d3ade2f43633033dd634c0e9c2e172c70b4f40 Author: jiafeng.zhang AuthorDate: Fri Oct 29 14:59:05 2021 +0800 Fix spark connector build error (#6948) pom.xml error --- pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pom.xml b/pom.xml index d8ebfe7..b76c823 100644 --- a/pom.xml +++ b/pom.xml @@ -150,6 +150,8 @@ ${spark.version} test + + com.fasterxml.jackson.core jackson-databind 2.10.0 - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 07/50: Update arrow's version to 0.15.1 and shaded it in spark-doris-connector (#2769)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 576a133b9996616bed6e6c99e47986dcca235f3d Author: vinson0526 AuthorDate: Wed Jan 15 21:08:34 2020 +0800 Update arrow's version to 0.15.1 and shaded it in spark-doris-connector (#2769) --- pom.xml | 46 -- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index c8cd0e0..35986ad 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ 2.11 2.3.4 0.9.3 -0.8.0 +0.15.1 UTF-8 @@ -59,7 +59,6 @@ org.apache.arrow arrow-vector ${arrow.version} -provided @@ -138,6 +137,49 @@ +org.apache.maven.plugins +maven-shade-plugin +3.2.1 + + + +com.google.code.findbugs:* +org.slf4j:* + + + + +org.apache.arrow + org.apache.doris.arrow + + +io.netty + org.apache.doris.netty + + +com.fasterxml.jackson + org.apache.doris.jackson + + +org.apache.commons.codec + org.apache.doris.commons.codec + + +com.google.flatbuffers + org.apache.doris.flatbuffers + + + + + +package + +shade + + + + + org.jacoco jacoco-maven-plugin 0.7.8 - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 41/50: [improvement](spark-connector) Stream load http exception handling (#7514)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 040b959c482ff99920043cd9190d1c2a12445e68 Author: jiafeng.zhang AuthorDate: Sun Jan 9 16:54:55 2022 +0800 [improvement](spark-connector) Stream load http exception handling (#7514) Stream load http exception handling --- src/main/java/org/apache/doris/spark/DorisStreamLoad.java | 9 + 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index 4411fbc..dd7e48c 100644 --- a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -157,10 +157,10 @@ public class DorisStreamLoad implements Serializable{ public void load(String value) throws StreamLoadException { LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value); LoadResponse loadResponse = loadBatch(value); -LOG.info("Streamload Response:{}",loadResponse); if(loadResponse.status != 200){ throw new StreamLoadException("stream load error: " + loadResponse.respContent); }else{ +LOG.info("Streamload Response:{}",loadResponse); ObjectMapper obj = new ObjectMapper(); try { RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class); @@ -182,6 +182,7 @@ public class DorisStreamLoad implements Serializable{ HttpURLConnection feConn = null; HttpURLConnection beConn = null; +int status = -1; try { // build request and send to new be location beConn = getConnection(loadUrlStr, label); @@ -191,7 +192,7 @@ public class DorisStreamLoad implements Serializable{ bos.close(); // get respond -int status = beConn.getResponseCode(); +status = beConn.getResponseCode(); String respMsg = beConn.getResponseMessage(); InputStream stream = (InputStream) beConn.getContent(); BufferedReader br = new BufferedReader(new InputStreamReader(stream)); @@ -204,9 +205,9 @@ public class DorisStreamLoad implements Serializable{ } catch (Exception e) { e.printStackTrace(); -String err = "failed to execute spark streamload with label: " + label; +String err = "http request exception,load url : "+loadUrlStr+",failed to execute spark streamload with label: " + label; LOG.warn(err, e); -return new LoadResponse(-1, e.getMessage(), err); +return new LoadResponse(status, e.getMessage(), err); } finally { if (feConn != null) { feConn.disconnect(); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 43/50: [improvement](spark-connector) Throw an exception when the data push fails and there are too many retries (#7531)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 24c633c9ae69072e0e178c0c0d0d7aab432d2266 Author: 董涛 <782112...@qq.com> AuthorDate: Tue Jan 11 15:03:06 2022 +0800 [improvement](spark-connector) Throw an exception when the data push fails and there are too many retries (#7531) --- .../java/org/apache/doris/spark/DorisStreamLoad.java | 9 +++-- .../org/apache/doris/spark/sql/DorisSourceProvider.scala | 16 ++-- .../org/apache/doris/spark/sql/DorisStreamLoadSink.scala | 13 - 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index dd7e48c..ec3892d 100644 --- a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -138,7 +138,7 @@ public class DorisStreamLoad implements Serializable{ } } -public void load(List> rows) throws StreamLoadException { +public String listToString(List> rows){ StringJoiner lines = new StringJoiner(LINE_DELIMITER); for (List row : rows) { StringJoiner line = new StringJoiner(FIELD_DELIMITER); @@ -151,9 +151,14 @@ public class DorisStreamLoad implements Serializable{ } lines.add(line.toString()); } -load(lines.toString()); +return lines.toString(); } + +public void load(List> rows) throws StreamLoadException { +String records = listToString(rows); +load(records); +} public void load(String value) throws StreamLoadException { LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value); LoadResponse loadResponse = loadBatch(value); diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index 12b7608..9b7d3f0 100644 --- a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -27,16 +27,19 @@ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import org.slf4j.{Logger, LoggerFactory} - import java.io.IOException import java.util + +import org.apache.doris.spark.rest.RestService + import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.util.control.Breaks private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider with CreatableRelationProvider - with StreamSinkProvider { + with StreamSinkProvider + with Serializable { private val logger: Logger = LoggerFactory.getLogger(classOf[DorisSourceProvider].getName) @@ -97,14 +100,23 @@ private[sql] class DorisSourceProvider extends DataSourceRegister catch { case e: Exception => try { + logger.warn("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr) + //If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again + dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings,logger)) Thread.sleep(1000 * i) } catch { case ex: InterruptedException => +logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer)) Thread.currentThread.interrupt() throw new IOException("unable to flush; interrupted while doing another attempt", e) } } } + + if(!rowsBuffer.isEmpty){ +logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer)) +throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.") + } } } diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala index edd08f1..6e73698 100644 --- a/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala +++ b/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala @@ -23,9 +23,11 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.{DataFrame, SQLContext} import org.slf4j.{Logger, LoggerFactory} - import java.io.IOException import java.util + +import org.apache.doris.spark.rest.RestService + import scala.util.control.Breaks private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, setti
[incubator-doris-spark-connector] 46/50: [fix](httpv2) make http v2 and v1 interface compatible (#7848)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 8558180b347702d54c946e06b2a7522aba2bc058 Author: jiafeng.zhang AuthorDate: Mon Jan 31 22:12:34 2022 +0800 [fix](httpv2) make http v2 and v1 interface compatible (#7848) http v2 TableSchemaAction adds the return value of aggregation_type, and modifies the corresponding code of Flink/Spark Connector --- src/main/java/org/apache/doris/spark/rest/models/Field.java | 13 - .../java/org/apache/doris/spark/rest/models/Schema.java | 4 ++-- src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala | 2 +- .../java/org/apache/doris/spark/rest/TestRestService.java | 8 .../org/apache/doris/spark/serialization/TestRowBatch.java | 2 +- .../scala/org/apache/doris/spark/sql/TestSchemaUtils.scala | 8 6 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/rest/models/Field.java b/src/main/java/org/apache/doris/spark/rest/models/Field.java index 12cdab9..53c622b 100644 --- a/src/main/java/org/apache/doris/spark/rest/models/Field.java +++ b/src/main/java/org/apache/doris/spark/rest/models/Field.java @@ -26,14 +26,25 @@ public class Field { private int precision; private int scale; +private String aggregation_type; + public Field() { } -public Field(String name, String type, String comment, int precision, int scale) { +public Field(String name, String type, String comment, int precision, int scale, String aggregation_type) { this.name = name; this.type = type; this.comment = comment; this.precision = precision; this.scale = scale; +this.aggregation_type = aggregation_type; +} + +public String getAggregation_type() { +return aggregation_type; +} + +public void setAggregation_type(String aggregation_type) { +this.aggregation_type = aggregation_type; } public String getName() { diff --git a/src/main/java/org/apache/doris/spark/rest/models/Schema.java b/src/main/java/org/apache/doris/spark/rest/models/Schema.java index 285fe42..586a8ac 100644 --- a/src/main/java/org/apache/doris/spark/rest/models/Schema.java +++ b/src/main/java/org/apache/doris/spark/rest/models/Schema.java @@ -49,8 +49,8 @@ public class Schema { this.properties = properties; } -public void put(String name, String type, String comment, int scale, int precision) { -properties.add(new Field(name, type, comment, scale, precision)); +public void put(String name, String type, String comment, int scale, int precision, String aggregation_type) { +properties.add(new Field(name, type, comment, scale, precision, aggregation_type)); } public void put(Field f) { diff --git a/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index ffba27c..f595092 100644 --- a/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -103,7 +103,7 @@ private[spark] object SchemaUtils { */ def convertToSchema(tscanColumnDescs: Seq[TScanColumnDesc]): Schema = { val schema = new Schema(tscanColumnDescs.length) -tscanColumnDescs.foreach(desc => schema.put(new Field(desc.getName, desc.getType.name, "", 0, 0))) +tscanColumnDescs.foreach(desc => schema.put(new Field(desc.getName, desc.getType.name, "", 0, 0, ""))) schema } } diff --git a/src/test/java/org/apache/doris/spark/rest/TestRestService.java b/src/test/java/org/apache/doris/spark/rest/TestRestService.java index 5862dd0..8004590 100644 --- a/src/test/java/org/apache/doris/spark/rest/TestRestService.java +++ b/src/test/java/org/apache/doris/spark/rest/TestRestService.java @@ -120,12 +120,12 @@ public class TestRestService { @Test public void testFeResponseToSchema() throws Exception { -String res = "{\"properties\":[{\"type\":\"TINYINT\",\"name\":\"k1\",\"comment\":\"\"},{\"name\":\"k5\"," -+ "\"scale\":\"0\",\"comment\":\"\",\"type\":\"DECIMALV2\",\"precision\":\"9\"}],\"status\":200}"; +String res = "{\"properties\":[{\"type\":\"TINYINT\",\"name\":\"k1\",\"comment\":\"\",\"aggregation_type\":\"\"},{\"name\":\"k5\"," ++ "\"scale\":\"0\",\"comment\":\"\",\"type\":\"DECIMALV2\",\"precision\":\"9\",\"aggregation_type\":\"\"}],\"status\":200}"; Schema expected = new Schema(); expected.setStatus(200); -Field k1 = new Field("k1", "TINYINT", "", 0, 0); -Field k5 = new Field("k5", "DECIMALV2", "", 9, 0); +Field k1 = new Field("k1", "TINYINT", "", 0, 0, ""); +Field k5 = new Field("k5", "DECIMALV2", "", 9, 0, ""); expected.put(k1); expected.put(k5);
[incubator-doris-spark-connector] 18/50: [Feature] Support spark connector sink data to Doris (#6256)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit e73f7964089f5681a1644989ddedfb0bbcdd3b50 Author: huzk <1040080...@qq.com> AuthorDate: Mon Aug 16 22:40:43 2021 +0800 [Feature] Support spark connector sink data to Doris (#6256) support spark conector write dataframe to doris --- .../org/apache/doris/spark/DorisStreamLoad.java| 179 + .../spark/exception/StreamLoadException.java} | 30 ++-- .../org/apache/doris/spark/rest/RestService.java | 75 +++-- .../apache/doris/spark/rest/models/Backend.java} | 27 ++-- .../apache/doris/spark/rest/models/BackendRow.java | 64 .../doris/spark/rest/models/RespContent.java | 96 +++ .../doris/spark/sql/DorisSourceProvider.scala | 119 +- .../apache/doris/spark/sql/DorisWriterOption.scala | 41 + ...eProvider.scala => DorisWriterOptionKeys.scala} | 18 +-- .../doris/spark/sql/DataframeSinkDoris.scala} | 32 +++- 10 files changed, 631 insertions(+), 50 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java new file mode 100644 index 000..0de3746 --- /dev/null +++ b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.spark; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.doris.spark.exception.StreamLoadException; +import org.apache.doris.spark.rest.models.RespContent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Calendar; +import java.util.List; +import java.util.UUID; + +/** + * DorisStreamLoad + **/ +public class DorisStreamLoad implements Serializable{ + +private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); + +private final static List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout")); +private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";; +private String user; +private String passwd; +private String loadUrlStr; +private String hostPort; +private String db; +private String tbl; +private String authEncoding; + +public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) { +this.hostPort = hostPort; +this.db = db; +this.tbl = tbl; +this.user = user; +this.passwd = passwd; +this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl); +this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); +} + +public String getLoadUrlStr() { +return loadUrlStr; +} + +public String getHostPort() { +return hostPort; +} + +public void setHostPort(String hostPort) { +this.hostPort = hostPort; +this.loadUrlStr = String.format(loadUrlPattern, hostPort, this.db, this.tbl); +} + + +private HttpURLConnection getConnection(String urlStr, String label) throws IOException { +URL url = new URL(urlStr); +HttpURLConnection conn = (HttpURLConnection) url.openConnection(); +conn.setInstanceFollowRedirects(false); +conn.setRequestMethod("PUT"); +String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); +conn.setRequestProperty("Authorization", "Basic " + authEncoding); +conn.addRequestProperty("Expect", "100-continue"); +conn.addRequestProperty("Content-Type", "text/plain; charset=
[incubator-doris-spark-connector] 09/50: Remove unused KUDU codes (#3175)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit d3eebd46c84c4c99573fa90b1516383cf34135c5 Author: lichaoyong AuthorDate: Tue Mar 24 13:54:05 2020 +0800 Remove unused KUDU codes (#3175) KUDU table is no longer supported long time ago. Remove code related to it. --- src/main/thrift/doris/Status.thrift | 4 ++-- src/main/thrift/doris/Types.thrift | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/thrift/doris/Status.thrift b/src/main/thrift/doris/Status.thrift index 90612c3..2966a8a 100644 --- a/src/main/thrift/doris/Status.thrift +++ b/src/main/thrift/doris/Status.thrift @@ -28,8 +28,8 @@ enum TStatusCode { INTERNAL_ERROR, THRIFT_RPC_ERROR, TIMEOUT, -KUDU_NOT_ENABLED, -KUDU_NOT_SUPPORTED_ON_OS, +KUDU_NOT_ENABLED, // Deprecated +KUDU_NOT_SUPPORTED_ON_OS, // Deprecated MEM_ALLOC_FAILED, BUFFER_ALLOCATION_FAILED, MINIMUM_RESERVATION_UNAVAILABLE, diff --git a/src/main/thrift/doris/Types.thrift b/src/main/thrift/doris/Types.thrift index 659f179..2d902ba 100644 --- a/src/main/thrift/doris/Types.thrift +++ b/src/main/thrift/doris/Types.thrift @@ -312,7 +312,7 @@ enum TTableType { MYSQL_TABLE, OLAP_TABLE, SCHEMA_TABLE, -KUDU_TABLE, +KUDU_TABLE, // Deprecated BROKER_TABLE, ES_TABLE } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 17/50: [Bug] Modify spark, flink doris connector to send request to FE, fix the problem of POST method, it should be the same as the method when sending the request (
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit d121e12309cf42c59d47628d350b54b1088eca28 Author: jiafeng.zhang AuthorDate: Wed May 19 09:28:21 2021 +0800 [Bug] Modify spark, flink doris connector to send request to FE, fix the problem of POST method, it should be the same as the method when sending the request (#5788) Modify spark, flink doris connector to send request to FE, fix the problem of POST method, it should be the same as the method when sending the request --- .../org/apache/doris/spark/rest/RestService.java | 80 ++ 1 file changed, 50 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/rest/RestService.java b/src/main/java/org/apache/doris/spark/rest/RestService.java index 3c8249c..ec9cfec 100644 --- a/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -31,8 +31,10 @@ import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE; import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.PrintWriter; import java.io.Serializable; import java.net.HttpURLConnection; @@ -115,39 +117,36 @@ public class RestService implements Serializable { .build(); request.setConfig(requestConfig); - String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, ""); String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""); - logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user); - IOException ex = null; int statusCode = -1; for (int attempt = 0; attempt < retries; attempt++) { logger.debug("Attempt {} to request {}.", attempt, request.getURI()); try { -HttpURLConnection conn = getConnection(request, user, password); -statusCode = conn.getResponseCode(); -if (statusCode != HttpStatus.SC_OK) { +String response; +if (request instanceof HttpGet){ +response = getConnectionGet(request.getURI().toString(), user, password,logger); +} else { +response = getConnectionPost(request,user, password,logger); +} +if (response == null) { logger.warn("Failed to get response from Doris FE {}, http code is {}", request.getURI(), statusCode); continue; } -InputStream stream = (InputStream) conn.getContent(); -String res = IOUtils.toString(stream); logger.trace("Success get response from Doris FE: {}, response is: {}.", -request.getURI(), res); - +request.getURI(), response); ObjectMapper mapper = new ObjectMapper(); - -Map map = mapper.readValue(res, Map.class); +Map map = mapper.readValue(response, Map.class); //Handle the problem of inconsistent data format returned by http v1 and v2 -if(map.containsKey("code") && map.containsKey("msg")) { +if (map.containsKey("code") && map.containsKey("msg")) { Object data = map.get("data"); return mapper.writeValueAsString(data); } else { -return res; +return response; } } catch (IOException e) { ex = e; @@ -159,32 +158,53 @@ public class RestService implements Serializable { throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex); } +private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException { +URL realUrl = new URL(request); +// open connection +HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection(); +String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); +connection.setRequestProperty("Authorization", "Basic " + authEncoding); -/** - * Get http connection - * @param request - * @param user - * @param passwd - * @return - * @throws IOException - */ -private static HttpURLConnection getConnection(HttpRequestBase request, String user, String passwd) throws IOException { +connection.connect(); +re
[incubator-doris-spark-connector] 26/50: [Optimize] Spark connector supports multiple spark versions:2.1.x/2.3.x/2.4.x/3.x (#6956)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit dc9203a6582ffdc5b4b13cff877dedc607dbfc30 Author: wei zhao AuthorDate: Fri Oct 29 17:06:05 2021 +0800 [Optimize] Spark connector supports multiple spark versions:2.1.x/2.3.x/2.4.x/3.x (#6956) * Spark connector supports multiple spark versions:2.1.x/2.3.x/2.4.x/3.x Co-authored-by: wei.zhao --- .../org/apache/doris/spark/DorisStreamLoad.java| 6 +- .../doris/spark/rdd/AbstractDorisRDDIterator.scala | 12 +- .../apache/doris/spark/rdd/ScalaValueReader.scala | 2 +- .../doris/spark/sql/DorisSourceProvider.scala | 26 +++-- .../doris/spark/sql/DorisStreamLoadSink.scala | 98 + .../apache/doris/spark/sql/DorisStreamWriter.scala | 122 - .../doris/spark/sql/ScalaDorisRowValueReader.scala | 10 +- .../scala/org/apache/doris/spark/sql/Utils.scala | 38 +-- 8 files changed, 158 insertions(+), 156 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index dcf569f..ccf3a5e 100644 --- a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -150,6 +150,7 @@ public class DorisStreamLoad implements Serializable{ } public void load(String value) throws StreamLoadException { +LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value); LoadResponse loadResponse = loadBatch(value); LOG.info("Streamload Response:{}",loadResponse); if(loadResponse.status != 200){ @@ -169,7 +170,7 @@ public class DorisStreamLoad implements Serializable{ private LoadResponse loadBatch(String value) { Calendar calendar = Calendar.getInstance(); -String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s", +String label = String.format("spark_streamload_%s%02d%02d_%02d%02d%02d_%s", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), UUID.randomUUID().toString().replaceAll("-", "")); @@ -194,12 +195,11 @@ public class DorisStreamLoad implements Serializable{ while ((line = br.readLine()) != null) { response.append(line); } -//log.info("AuditLoader plugin load with label: {}, response code: {}, msg: {}, content: {}",label, status, respMsg, response.toString()); return new LoadResponse(status, respMsg, response.toString()); } catch (Exception e) { e.printStackTrace(); -String err = "failed to load audit via AuditLoader plugin with label: " + label; +String err = "failed to execute spark streamload with label: " + label; LOG.warn(err, e); return new LoadResponse(-1, e.getMessage(), err); } finally { diff --git a/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala b/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala index dc39773..5b2b36f 100644 --- a/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala +++ b/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala @@ -20,15 +20,15 @@ package org.apache.doris.spark.rdd import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_VALUE_READER_CLASS import org.apache.doris.spark.cfg.Settings import org.apache.doris.spark.rest.PartitionDefinition - import org.apache.spark.util.TaskCompletionListener -import org.apache.spark.internal.Logging import org.apache.spark.{TaskContext, TaskKilledException} +import org.slf4j.{Logger, LoggerFactory} private[spark] abstract class AbstractDorisRDDIterator[T]( context: TaskContext, -partition: PartitionDefinition) extends Iterator[T] with Logging { +partition: PartitionDefinition) extends Iterator[T] { + private val logger: Logger = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$")) private var initialized = false private var closed = false @@ -38,7 +38,7 @@ private[spark] abstract class AbstractDorisRDDIterator[T]( val settings = partition.settings() initReader(settings) val valueReaderName = settings.getProperty(DORIS_VALUE_READER_CLASS) -logDebug(s"Use value reader '$valueReaderName'.") +logger.debug(s"Use value reader '$valueReaderName'.") val cons = Class.forName(valueReaderName).getDeclaredConstructor(classOf[PartitionDefinition], classOf[Settings]) cons.newInstance(partition, settings).asInstanceOf[ScalaValueReader] } @@ -65,7 +65,7 @@ private[spark] abstract class AbstractDorisRDDIterator[T]( } def closeIfNee
[incubator-doris-spark-connector] 33/50: [Feature] Support Flink and Spark connector support String type (#7075)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 30530a93c80af887dee5b4dd224fa0eb63fdfad4 Author: wudi <676366...@qq.com> AuthorDate: Sat Nov 13 17:10:22 2021 +0800 [Feature] Support Flink and Spark connector support String type (#7075) Support String type for Flink and Spark connector --- src/main/java/org/apache/doris/spark/serialization/RowBatch.java | 1 + src/main/thrift/doris/Types.thrift | 8 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index ad3cfe5..bcc76d5 100644 --- a/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -256,6 +256,7 @@ public class RowBatch { case "LARGEINT": case "CHAR": case "VARCHAR": +case "STRING": Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR), typeMismatchMessage(currentType, mt)); VarCharVector varCharVector = (VarCharVector) curFieldVector; diff --git a/src/main/thrift/doris/Types.thrift b/src/main/thrift/doris/Types.thrift index 2d902ba..44ce606 100644 --- a/src/main/thrift/doris/Types.thrift +++ b/src/main/thrift/doris/Types.thrift @@ -73,7 +73,13 @@ enum TPrimitiveType { VARCHAR, HLL, DECIMALV2, - TIME + TIME, + OBJECT, + ARRAY, + MAP, + STRUCT, + STRING, + ALL } enum TTypeNodeType { - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 49/50: [fix](spark connector) fix spark connector unsupport STRING type. (#2)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit c5bf0b2fd31ff08ce1e819aabc85c744f77bb528 Author: haocean <1727386...@qq.com> AuthorDate: Thu Feb 10 22:43:09 2022 +0800 [fix](spark connector) fix spark connector unsupport STRING type. (#2) fix spark connector unsupported STRING type. --- .../src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala | 1 + .../src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index f595092..f45b9de 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -89,6 +89,7 @@ private[spark] object SchemaUtils { case "VARCHAR" => DataTypes.StringType case "DECIMALV2" => DecimalType(precision, scale) case "TIME"=> DataTypes.DoubleType + case "STRING" => DataTypes.StringType case "HLL" => throw new DorisException("Unsupported type " + dorisType) case _ => diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala index 97bbe0e..13004b4 100644 --- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala +++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala @@ -62,6 +62,7 @@ class TestSchemaUtils extends ExpectedExceptionTest { Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("VARCHAR", 0, 0)) Assert.assertEquals(DecimalType(10, 5), SchemaUtils.getCatalystType("DECIMALV2", 10, 5)) Assert.assertEquals(DataTypes.DoubleType, SchemaUtils.getCatalystType("TIME", 0, 0)) +Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("STRING", 0, 0)) thrown.expect(classOf[DorisException]) thrown.expectMessage(startsWith("Unsupported type")) - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 12/50: [Spark on Doris] Shade and provide the thrift lib in spark-doris-connector (#3631)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit d1299981bcae2258dbef12fa71d038842f0afb70 Author: Mingyu Chen AuthorDate: Tue May 19 14:20:21 2020 +0800 [Spark on Doris] Shade and provide the thrift lib in spark-doris-connector (#3631) Mainly changes: 1. Shade and provide the thrift lib in spark-doris-connector 2. Add a `build.sh` for spark-doris-connector 3. Move the README.md of spark-doris-connector to `docs/` 4. Change the line delimiter of `fe/src/test/java/org/apache/doris/analysis/AggregateTest.java` --- README.md | 150 -- build.sh | 59 pom.xml | 59 +--- 3 files changed, 112 insertions(+), 156 deletions(-) diff --git a/README.md b/README.md deleted file mode 100644 index 3c41b93..000 --- a/README.md +++ /dev/null @@ -1,150 +0,0 @@ - - -# Spark-Doris-Connector - -## Fetures - -- 当前版本只支持从`Doris`中读取数据。 -- 可以将`Doris`表映射为`DataFrame`或者`RDD`,推荐使用`DataFrame`。 -- 支持在`Doris`端完成数据过滤,减少数据传输量。 - -## Version Compatibility - -| Connector | Spark | Doris | Java | Scala | -| - | - | -- | | - | -| 1.0.0 | 2.x | master | 8| 2.11 | - - - -## Building - -```bash -mvn clean package -``` - -编译成功后,会在`target`目录下生成文件`doris-spark-1.0.0-SNAPSHOT.jar`。将此文件复制到`Spark`的`ClassPath`中即可使用`Spark-Doris-Connector`。例如,`Local`模式运行的`Spark`,将此文件放入`jars`文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 - -## QuickStart - -### SQL - -```sql -CREATE TEMPORARY VIEW spark_doris -USING doris -OPTIONS( - "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME", - "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT", - "user"="$YOUR_DORIS_USERNAME", - "password"="$YOUR_DORIS_PASSWORD" -); - -SELECT * FROM spark_doris; -``` - -### DataFrame - -```scala -val dorisSparkDF = spark.read.format("doris") - .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME") - .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") - .option("user", "$YOUR_DORIS_USERNAME") - .option("password", "$YOUR_DORIS_PASSWORD") - .load() - -dorisSparkDF.show(5) -``` - -### RDD - -```scala -import org.apache.doris.spark._ -val dorisSparkRDD = sc.dorisRDD( - tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"), - cfg = Some(Map( -"doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT", -"doris.request.auth.user" -> "$YOUR_DORIS_USERNAME", -"doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD" - )) -) - -dorisSparkRDD.collect() -``` - -## Configuration - -### General - -| Key | Default Value | Comment | -| | - | | -| doris.fenodes| --| Doris Restful接口地址,支持多个地址,使用逗号分隔| -| doris.table.identifier | --| DataFame/RDD对应的Doris表名 | -| doris.request.retries| 3 | 向Doris发送请求的重试次数 | -| doris.request.connect.timeout.ms | 3 | 向Doris发送请求的连接超时时间 | -| doris.request.read.timeout.ms| 3 | 向Doris发送请求的读取超时时间 | -| doris.request.query.timeout.s| 3600 | 查询doris的超时时间,默认值为1小时,-1表示无超时限制 | -| doris.request.tablet.size| Integer.MAX_VALUE | 一个RDD Partition对应的Doris Tablet个数。此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 | -| doris.batch.size | 1024 | 一次从BE读取数据的最大行数。增大此数值可减少Spark与Doris之间建立连接的次数。从而减轻网络延迟所带来的的额外时间开销。 | -| doris.exec.mem.limit | 2147483648| 单个查询的内存限制。默认为 2GB,单位为字节 | -| doris.deserialize.arrow.async| false | 是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch | -| doris.deserialize.queue.size | 64| 异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效| - -### SQL and Dataframe Only - -| Key | Default Value | Comment | -| --- | - | | -| user| --| 访问Doris的用户名 | -| password| --| 访问Doris的密码 | -| doris.filter.query.in.max.count | 100 | 谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。 | - -### RDD Only - -| Key
[incubator-doris-spark-connector] 23/50: [Feature] support spark connector sink data using sql (#6796)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 39e8483819c98d1ee2071c954085ac2c07f84904 Author: wei zhao AuthorDate: Sat Oct 9 15:47:36 2021 +0800 [Feature] support spark connector sink data using sql (#6796) Co-authored-by: wei.zhao --- .../org/apache/doris/spark/sql/DorisRelation.scala| 19 +-- .../apache/doris/spark/sql/DorisSourceProvider.scala | 7 ++- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala b/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala index 2f2a252..3e3616d 100644 --- a/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala +++ b/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala @@ -28,12 +28,12 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} private[sql] class DorisRelation( val sqlContext: SQLContext, parameters: Map[String, String]) -extends BaseRelation with TableScan with PrunedScan with PrunedFilteredScan { +extends BaseRelation with TableScan with PrunedScan with PrunedFilteredScan with InsertableRelation{ private lazy val cfg = { val conf = new SparkSettings(sqlContext.sparkContext.getConf) @@ -86,4 +86,19 @@ private[sql] class DorisRelation( new ScalaDorisRowRDD(sqlContext.sparkContext, paramWithScan.toMap, lazySchema) } + + // Insert Table + override def insert(data: DataFrame, overwrite: Boolean): Unit = { +//replace 'doris.request.auth.user' with 'user' and 'doris.request.auth.password' with 'password' +val insertCfg = cfg.copy().asProperties().asScala.map { + case (ConfigurationOptions.DORIS_REQUEST_AUTH_USER, v) => +("user", v) + case (ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD, v) => +("password", v) + case (k, v) => (k, v) +} +data.write.format(DorisSourceProvider.SHORT_NAME) + .options(insertCfg) + .save() + } } diff --git a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index 65f5250..3ac087d 100644 --- a/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -19,6 +19,7 @@ package org.apache.doris.spark.sql import org.apache.doris.spark.DorisStreamLoad import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} +import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter @@ -35,7 +36,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.util.control.Breaks private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider with CreatableRelationProvider with StreamWriteSupport with Logging { - override def shortName(): String = "doris" + override def shortName(): String = SHORT_NAME override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { new DorisRelation(sqlContext, Utils.params(parameters, log)) @@ -129,3 +130,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP new DorisStreamWriter(sparkSettings) } } + +object DorisSourceProvider { + val SHORT_NAME: String = "doris" +} \ No newline at end of file - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 39/50: [refactor] update parent pom version and optimize build scripts (#7548)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit ee6969ab68a3f7c7089a2d498e5ff3e904ada8d1 Author: Zhengguo Yang AuthorDate: Wed Jan 5 10:45:11 2022 +0800 [refactor] update parent pom version and optimize build scripts (#7548) --- build.sh| 49 pom.xml | 39 +-- pom_3.0.xml | 372 3 files changed, 56 insertions(+), 404 deletions(-) diff --git a/build.sh b/build.sh index 9fe907c..d747ba3 100755 --- a/build.sh +++ b/build.sh @@ -25,23 +25,38 @@ set -eo pipefail -ROOT=`dirname "$0"` -ROOT=`cd "$ROOT"; pwd` +usage() { + echo " + Usage: +$0 spark_version scala_version + e.g.: +$0 2.3.4 2.11 +$0 3.1.2 2.12 + " + exit 1 +} + +if [ $# -ne 2 ]; then +usage +fi + +ROOT=$(dirname "$0") +ROOT=$(cd "$ROOT"; pwd) export DORIS_HOME=${ROOT}/../../ export PATH=${DORIS_THIRDPARTY}/installed/bin:$PATH -. ${DORIS_HOME}/env.sh +. "${DORIS_HOME}"/env.sh # include custom environment variables if [[ -f ${DORIS_HOME}/custom_env.sh ]]; then -. ${DORIS_HOME}/custom_env.sh +. "${DORIS_HOME}"/custom_env.sh fi # check maven MVN_CMD=mvn -if [[ ! -z ${CUSTOM_MVN} ]]; then +if [[ -n ${CUSTOM_MVN} ]]; then MVN_CMD=${CUSTOM_MVN} fi if ! ${MVN_CMD} --version; then @@ -50,29 +65,15 @@ if ! ${MVN_CMD} --version; then fi export MVN_CMD -usage() { - echo " - Eg. -$0 2build with spark 2.x -$0 3build with spark 3.x - " - exit 1 -} - -if [ $# == 0 ]; then -usage -fi - rm -rf output/ -if [ "$1"x == "3x" ] -then - ${MVN_CMD} clean package -f pom_3.0.xml +if [ -z "$1" ]; then +export SPARK_VERSION="$1" fi -if [ "$1"x == "2x" ] -then - ${MVN_CMD} clean package +if [ -z "$2" ]; then +export SCALA_VERSION="$2" fi +${MVN_CMD} clean package mkdir -p output/ cp target/doris-spark-*.jar ./output/ diff --git a/pom.xml b/pom.xml index b62f193..a4f2e9c 100644 --- a/pom.xml +++ b/pom.xml @@ -19,18 +19,18 @@ under the License. --> -http://maven.apache.org/POM/4.0.0"; - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +http://www.w3.org/2001/XMLSchema-instance"; + xmlns="http://maven.apache.org/POM/4.0.0"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> 4.0.0 org.apache apache -18 +23 org.apache.doris doris-spark-connector -${spark.version}-${scala.version}-SNAPSHOT +${spark.version}-${scala.version}-1.0.0-SNAPSHOT Doris Spark Connector https://doris.apache.org/ @@ -68,8 +68,8 @@ -2.11 -2.3.4 +${env.SCALA_VERSION} +${env.SPARK_VERSION} 0.13.0 5.0.0 3.8.1 @@ -103,7 +103,30 @@ - + +spark-version + +true + +!env.SPARK_VERSION + + + +2.3.4 + + + +scala-version + +true + +!env.SCALA_VERSION + + + +2.11 + + general-env @@ -338,9 +361,9 @@ maven-javadoc-plugin ${maven-javadoc-plugin.version} +true 8 false -true diff --git a/pom_3.0.xml b/pom_3.0.xml deleted file mode 100644 index 3de0873..000 --- a/pom_3.0.xml +++ /dev/null @@ -1,372 +0,0 @@ - - - - -http://maven.apache.org/POM/4.0.0"; - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> -4.0.0 - -org.apache -apache -18 - -org.apache -doris-spark-connector -${spark.version}-${scala.version}-SNAPSHOT -Doris Spark Connector -https://doris.apache.org/ - - -Apache 2.0 License -https://www.apache.org/licenses/LICENSE-2.0.html -repo - - - - scm:git:g...@github.com:apache/incubator-doris.git - scm:git:g...@github.com:apache/incubator-doris.git -scm:git:g...@github.com:apache/incubator-doris.git -HEAD - - -GitHub -https://github.com/apache/incubator-doris/issues - - - - -Dev Mailing List -d...@doris.apache.org -dev-subscr...@doris.apache.org -dev-unsubscr...@doris.apache.org -
[incubator-doris-spark-connector] 40/50: [chore][docs] add deploy spark/flink connectors to maven release repo docs (#7616)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 5a6b28f1a0909c1f4f7fa1b84b42f576ff5d3d1a Author: Zhengguo Yang AuthorDate: Thu Jan 6 23:23:33 2022 +0800 [chore][docs] add deploy spark/flink connectors to maven release repo docs (#7616) --- pom.xml | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index a4f2e9c..ed3d014 100644 --- a/pom.xml +++ b/pom.xml @@ -41,9 +41,9 @@ - scm:git:g...@github.com:apache/incubator-doris.git - scm:git:g...@github.com:apache/incubator-doris.git -scm:git:g...@github.com:apache/incubator-doris.git + scm:git:https://g...@github.com/apache/incubator-doris.git + scm:git:https://g...@github.com/apache/incubator-doris.git +scm:git:https://g...@github.com/apache/incubator-doris.git HEAD @@ -77,6 +77,7 @@ 3.2.1 UTF-8 ${env.DORIS_THIRDPARTY} +github - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 11/50: [License] Add License to codes (#3272)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 0634c643aff867cca8d03b83b1b22989b3f94d77 Author: lichaoyong AuthorDate: Tue Apr 7 16:35:13 2020 +0800 [License] Add License to codes (#3272) --- .../org.apache.spark.sql.sources.DataSourceRegister | 19 ++- .../doris/spark/rest/TestPartitionDefinition.java | 21 - 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 15f36f6..15b2434 100644 --- a/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1 +1,18 @@ -org.apache.doris.spark.sql.DorisSourceProvider \ No newline at end of file +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +org.apache.doris.spark.sql.DorisSourceProvider diff --git a/src/test/java/org/apache/doris/spark/rest/TestPartitionDefinition.java b/src/test/java/org/apache/doris/spark/rest/TestPartitionDefinition.java index b5f477e..0bfa3aa 100644 --- a/src/test/java/org/apache/doris/spark/rest/TestPartitionDefinition.java +++ b/src/test/java/org/apache/doris/spark/rest/TestPartitionDefinition.java @@ -1,8 +1,19 @@ -/** - * Copyright (c) 2019. Baidu.com, Inc. All Rights Reserved. - * Author: zhangwenxin01 - * Date: 2019-08-07 - */ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. package org.apache.doris.spark.rest; - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 32/50: [SparkConnector] Add thrift dir for spark connector (#7074)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 031aac040ba1c26f782c73b2325c36d1481146c4 Author: tinke <62875019+tinker...@users.noreply.github.com> AuthorDate: Sat Nov 13 17:09:52 2021 +0800 [SparkConnector] Add thrift dir for spark connector (#7074) Add thrift dir for spark connector, to fix error when building spark-doris-connector --- pom.xml | 1 + pom_3.0.xml | 1 + 2 files changed, 2 insertions(+) diff --git a/pom.xml b/pom.xml index d756bfb..2c41bfc 100644 --- a/pom.xml +++ b/pom.xml @@ -199,6 +199,7 @@ maven-thrift-plugin 0.1.11 + ${doris.thirdparty}/installed/bin/thrift java:fullcamel diff --git a/pom_3.0.xml b/pom_3.0.xml index d09e81a..25aa355 100644 --- a/pom_3.0.xml +++ b/pom_3.0.xml @@ -198,6 +198,7 @@ maven-thrift-plugin 0.1.11 + ${doris.thirdparty}/installed/bin/thrift java:fullcamel - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 28/50: [Revert] Revert RestService.java (#6994)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 00589ffc6e6768c98b3b6a28871065c75b1f1cc4 Author: wei zhao AuthorDate: Thu Nov 4 12:13:18 2021 +0800 [Revert] Revert RestService.java (#6994) --- .../org/apache/doris/spark/rest/RestService.java | 139 ++--- .../apache/doris/spark/rest/TestRestService.java | 20 +++ 2 files changed, 116 insertions(+), 43 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/rest/RestService.java b/src/main/java/org/apache/doris/spark/rest/RestService.java index dce540c..bb91538 100644 --- a/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -31,19 +31,27 @@ import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE; import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintWriter; import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Base64; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.ArrayList; import java.util.Set; +import java.util.HashSet; import java.util.stream.Collectors; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.cfg.Settings; @@ -52,22 +60,17 @@ import org.apache.doris.spark.exception.ConnectedFailedException; import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.exception.IllegalArgumentException; import org.apache.doris.spark.exception.ShouldNeverHappenException; -import org.apache.doris.spark.rest.models.*; +import org.apache.doris.spark.rest.models.Backend; +import org.apache.doris.spark.rest.models.BackendRow; +import org.apache.doris.spark.rest.models.QueryPlan; +import org.apache.doris.spark.rest.models.Schema; +import org.apache.doris.spark.rest.models.Tablet; import org.apache.http.HttpStatus; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpRequestBase; -import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; @@ -83,7 +86,8 @@ public class RestService implements Serializable { private static final String API_PREFIX = "/api"; private static final String SCHEMA = "_schema"; private static final String QUERY_PLAN = "_query_plan"; -private static final String BACKENDS = "/api/show_proc?path=//backends"; +private static final String BACKENDS = "/rest/v1/system?path=//backends"; + /** * send request to Doris FE and get response json string. @@ -110,36 +114,37 @@ public class RestService implements Serializable { .build(); request.setConfig(requestConfig); - String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, ""); String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""); - -CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); -credentialsProvider.setCredentials( -AuthScope.ANY, -new UsernamePasswordCredentials(user, password)); -HttpClientContext context = HttpClientContext.create(); -context.setCredentialsProvider(credentialsProvider); logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user); - IOException ex = null; int statusCode = -1; for (int attempt = 0; attempt < retries; attempt++) { -CloseableHttpClient httpClient = HttpClients.createDefault(); logger.debug("Attempt {} to request {}.", attempt, request.getURI());
[incubator-doris-spark-connector] 22/50: [Feature] support spark connector sink stream data to doris (#6761)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit acf886938b70f614c1c8ad1676abf75c5719769d Author: chovy AuthorDate: Tue Sep 28 17:46:19 2021 +0800 [Feature] support spark connector sink stream data to doris (#6761) * [Feature] support spark connector sink stream data to doris * [Doc] Add spark-connector batch/stream writing instructions * add license and remove meaningless blanks code Co-authored-by: wei.zhao --- pom.xml| 6 + .../doris/spark/CachedDorisStreamLoadClient.java | 63 +++ .../org/apache/doris/spark/DorisStreamLoad.java| 37 ++- .../java/org/apache/doris/spark/cfg/Settings.java | 13 +++ .../org/apache/doris/spark/cfg/SparkSettings.java | 3 +- .../org/apache/doris/spark/rest/RestService.java | 9 +- .../doris/spark/sql/DorisSourceProvider.scala | 72 +--- .../apache/doris/spark/sql/DorisStreamWriter.scala | 122 + .../doris/spark/sql/TestStreamSinkDoris.scala | 53 + 9 files changed, 327 insertions(+), 51 deletions(-) diff --git a/pom.xml b/pom.xml index 5ba2c6e..e015f06 100644 --- a/pom.xml +++ b/pom.xml @@ -139,6 +139,12 @@ test + +org.apache.spark +spark-sql-kafka-0-10_${scala.version} +${spark.version} +test + diff --git a/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java b/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java new file mode 100644 index 000..01cada4 --- /dev/null +++ b/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import org.apache.doris.spark.cfg.SparkSettings; +import org.apache.doris.spark.exception.DorisException; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * a cached streamload client for each partition + */ +public class CachedDorisStreamLoadClient { +private static final long cacheExpireTimeout = 30 * 60; +private static LoadingCache dorisStreamLoadLoadingCache; + +static { +dorisStreamLoadLoadingCache = CacheBuilder.newBuilder() +.expireAfterWrite(cacheExpireTimeout, TimeUnit.SECONDS) +.removalListener(new RemovalListener() { +@Override +public void onRemoval(RemovalNotification removalNotification) { +//do nothing +} +}) +.build( +new CacheLoader() { +@Override +public DorisStreamLoad load(SparkSettings sparkSettings) throws IOException, DorisException { +DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(sparkSettings); +return dorisStreamLoad; +} +} +); +} + +public static DorisStreamLoad getOrCreate(SparkSettings settings) throws ExecutionException { +DorisStreamLoad dorisStreamLoad = dorisStreamLoadLoadingCache.get(settings); +return dorisStreamLoad; +} +} diff --git a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index 0de3746..dcf569f 100644 --- a/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -17,7 +17,11 @@ package org.apache.doris.spark; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.doris.spark.cfg.C
[incubator-doris-spark-connector] 20/50: [Improve]The connector supports spark 3.0, flink 1.13 (#6449)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 27381fe83c496f01795f67e3ae01a3509ed85d29 Author: jiafeng.zhang AuthorDate: Wed Aug 18 15:57:50 2021 +0800 [Improve]The connector supports spark 3.0, flink 1.13 (#6449) Modify the flink/spark compilation documentation --- build.sh| 14 ++- pom_3.0.xml | 290 2 files changed, 301 insertions(+), 3 deletions(-) diff --git a/build.sh b/build.sh index c37b14e..b4ea042 100755 --- a/build.sh +++ b/build.sh @@ -28,6 +28,7 @@ set -eo pipefail ROOT=`dirname "$0"` ROOT=`cd "$ROOT"; pwd` + export DORIS_HOME=${ROOT}/../../ # include custom environment variables @@ -37,6 +38,8 @@ fi # check maven MVN_CMD=mvn + + if [[ ! -z ${CUSTOM_MVN} ]]; then MVN_CMD=${CUSTOM_MVN} fi @@ -45,9 +48,14 @@ if ! ${MVN_CMD} --version; then exit 1 fi export MVN_CMD - -${MVN_CMD} clean package - +if [ $1 == 3 ] +then + ${MVN_CMD} clean package -f pom_3.0.xml +fi +if [ $1 == 2 ] +then + ${MVN_CMD} clean package +fi mkdir -p output/ cp target/doris-spark-1.0.0-SNAPSHOT.jar ./output/ diff --git a/pom_3.0.xml b/pom_3.0.xml new file mode 100644 index 000..4973ff8 --- /dev/null +++ b/pom_3.0.xml @@ -0,0 +1,290 @@ + + + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + +org.apache +doris-spark +1.0.0-SNAPSHOT + + +2.12 +3.1.2 +0.9.3 +1.0.1 +UTF-8 + + + + + +custom-env + + +env.CUSTOM_MAVEN_REPO + + + + + +custom-nexus +${env.CUSTOM_MAVEN_REPO} + + + + + +custom-nexus +${env.CUSTOM_MAVEN_REPO} + + + + + + +general-env + + +!env.CUSTOM_MAVEN_REPO + + + + + +central +central maven repo https +https://repo.maven.apache.org/maven2 + + + + + + + +org.apache.spark +spark-core_${scala.version} +${spark.version} +provided + + +org.apache.spark +spark-sql_${scala.version} +${spark.version} +provided + + +org.apache.thrift +libthrift +${libthrift.version} + + +org.apache.arrow +arrow-vector +${arrow.version} + + + +org.hamcrest +hamcrest-core +1.3 +test + + +org.mockito +mockito-scala_${scala.version} +1.4.7 + + +hamcrest-core +org.hamcrest + + +test + + +junit +junit +4.11 + + +hamcrest-core +org.hamcrest + + +test + + +com.fasterxml.jackson.core +jackson-databind +2.10.0 + + + +com.fasterxml.jackson.core +jackson-core +2.10.0 + + +io.netty +netty-all +4.1.27.Final +provided + + + + + + + +org.apache.thrift.tools +maven-thrift-plugin +0.1.11 + + +thrift-sources +generate-sources + +compile + + + + + +net.alchim31.maven +scala-maven-plugin +3.2.1 + + +scala-compile-first +process-resources + +compile + + + +scala-test-compile +process-test-resources + +testCompile +
[incubator-doris-spark-connector] 16/50: [Spark-Doris-Connector][Bug-Fix] Resolve deserialize exception when Spark Doris Connector in aync deserialize mode (#5336)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 293bd8f2f270842a2014c857634038581fedc8b9 Author: 924060929 <924060...@qq.com> AuthorDate: Thu Mar 4 17:48:59 2021 +0800 [Spark-Doris-Connector][Bug-Fix] Resolve deserialize exception when Spark Doris Connector in aync deserialize mode (#5336) Resolve deserialize exception when Spark Doris Connector in aync deserialize mode Co-authored-by: lanhuajian --- .../apache/doris/spark/rdd/ScalaValueReader.scala | 39 +++--- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index 1d22c42..f3334b9 100644 --- a/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -19,6 +19,7 @@ package org.apache.doris.spark.rdd import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent._ +import java.util.concurrent.locks.{Condition, Lock, ReentrantLock} import scala.collection.JavaConversions._ import scala.util.Try @@ -46,11 +47,14 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { protected val logger = Logger.getLogger(classOf[ScalaValueReader]) protected val client = new BackendClient(new Routing(partition.getBeAddress), settings) + protected val clientLock = +if (deserializeArrowToRowBatchAsync) new ReentrantLock() +else new NoOpLock protected var offset = 0 protected var eos: AtomicBoolean = new AtomicBoolean(false) protected var rowBatch: RowBatch = _ // flag indicate if support deserialize Arrow to RowBatch asynchronously - protected var deserializeArrowToRowBatchAsync: Boolean = Try { + protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try { settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC, DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean } getOrElse { logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)) @@ -123,7 +127,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { params } - protected val openResult: TScanOpenResult = client.openScanner(openParams) + protected val openResult: TScanOpenResult = lockClient(_.openScanner(openParams)) protected val contextId: String = openResult.getContext_id protected val schema: Schema = SchemaUtils.convertToSchema(openResult.getSelected_columns) @@ -134,7 +138,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { nextBatchParams.setContext_id(contextId) while (!eos.get) { nextBatchParams.setOffset(offset) -val nextResult = client.getNext(nextBatchParams) +val nextResult = lockClient(_.getNext(nextBatchParams)) eos.set(nextResult.isEos) if (!eos.get) { val rowBatch = new RowBatch(nextResult, schema) @@ -192,7 +196,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { val nextBatchParams = new TScanNextBatchParams nextBatchParams.setContext_id(contextId) nextBatchParams.setOffset(offset) -val nextResult = client.getNext(nextBatchParams) +val nextResult = lockClient(_.getNext(nextBatchParams)) eos.set(nextResult.isEos) if (!eos.get) { rowBatch = new RowBatch(nextResult, schema) @@ -218,6 +222,31 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { def close(): Unit = { val closeParams = new TScanCloseParams closeParams.context_id = contextId -client.closeScanner(closeParams) +lockClient(_.closeScanner(closeParams)) + } + + private def lockClient[T](action: BackendClient => T): T = { +clientLock.lock() +try { + action(client) +} finally { + clientLock.unlock() +} + } + + private class NoOpLock extends Lock { +override def lock(): Unit = {} + +override def lockInterruptibly(): Unit = {} + +override def tryLock(): Boolean = true + +override def tryLock(time: Long, unit: TimeUnit): Boolean = true + +override def unlock(): Unit = {} + +override def newCondition(): Condition = { + throw new UnsupportedOperationException("NoOpLock can't provide a condition") +} } } - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 19/50: [Doc] flink/spark connector: add sources/javadoc plugins (#6435)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit 3168584e2792c8b2300b7b0379a9c840c9a44f5b Author: wunan1210 AuthorDate: Mon Aug 16 22:41:24 2021 +0800 [Doc] flink/spark connector: add sources/javadoc plugins (#6435) spark-doris-connector/flink-doris-connect add plugins to generate javadoc and sources jar, so can be easy to distribute and debug. --- build.sh | 2 ++ pom.xml | 40 +++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/build.sh b/build.sh index 9119841..c37b14e 100755 --- a/build.sh +++ b/build.sh @@ -51,6 +51,8 @@ ${MVN_CMD} clean package mkdir -p output/ cp target/doris-spark-1.0.0-SNAPSHOT.jar ./output/ +cp target/doris-spark-1.0.0-SNAPSHOT-javadoc.jar ./output/ +cp target/doris-spark-1.0.0-SNAPSHOT-sources.jar ./output/ echo "*" echo "Successfully build Spark-Doris-Connector" diff --git a/pom.xml b/pom.xml index cdf1055..5ba2c6e 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,11 @@ 2.3.4 0.9.3 0.15.1 +3.8.1 +3.3.0 +3.2.1 UTF-8 +${basedir}/../../thirdparty @@ -259,12 +263,46 @@ org.apache.maven.plugins maven-compiler-plugin -3.8.1 +${maven-compiler-plugin.version} 8 8 + +org.apache.maven.plugins +maven-javadoc-plugin +${maven-javadoc-plugin.version} + +8 +false +true + + + +attach-javadocs + +jar + + + + + +org.apache.maven.plugins +maven-source-plugin +${maven-source-plugin.version} + +true + + + +compile + +jar + + + + - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org
[incubator-doris-spark-connector] 13/50: [Spark on Doris] fix the encode of varchar when convertArrowToRowBatch (#5202)
This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git commit ad9eff5f7ce932c71c6de78ee764ab10047b6f0e Author: HuangWei AuthorDate: Sun Jan 10 20:48:46 2021 +0800 [Spark on Doris] fix the encode of varchar when convertArrowToRowBatch (#5202) `convertArrowToRowBatch` use the default charset to encode String. Set it to UTF_8, because we use `arrow::utf8` on the Backends. --- .../apache/doris/spark/serialization/RowBatch.java | 22 +- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index 0781f1e..ad3cfe5 100644 --- a/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -20,6 +20,7 @@ package org.apache.doris.spark.serialization; import java.io.ByteArrayInputStream; import java.io.IOException; import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; @@ -52,10 +53,10 @@ import com.google.common.base.Preconditions; * row batch data container. */ public class RowBatch { -private static Logger logger = LoggerFactory.getLogger(RowBatch.class); +private static final Logger logger = LoggerFactory.getLogger(RowBatch.class); public static class Row { -private List cols; +private final List cols; Row(int colCount) { this.cols = new ArrayList<>(colCount); @@ -74,11 +75,10 @@ public class RowBatch { private int offsetInRowBatch = 0; private int rowCountInOneBatch = 0; private int readRowCount = 0; -private List rowBatch = new ArrayList<>(); +private final List rowBatch = new ArrayList<>(); private final ArrowStreamReader arrowStreamReader; -private final VectorSchemaRoot root; private List fieldVectors; -private RootAllocator rootAllocator; +private final RootAllocator rootAllocator; private final Schema schema; public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisException { @@ -88,9 +88,8 @@ public class RowBatch { new ByteArrayInputStream(nextResult.getRows()), rootAllocator ); -this.offsetInRowBatch = 0; try { -this.root = arrowStreamReader.getVectorSchemaRoot(); +VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); while (arrowStreamReader.loadNextBatch()) { fieldVectors = root.getFieldVectors(); if (fieldVectors.size() != schema.size()) { @@ -119,10 +118,7 @@ public class RowBatch { } public boolean hasNext() { -if (offsetInRowBatch < readRowCount) { -return true; -} -return false; +return offsetInRowBatch < readRowCount; } private void addValueToRow(int rowIndex, Object obj) { @@ -268,7 +264,7 @@ public class RowBatch { addValueToRow(rowIndex, null); continue; } -String value = new String(varCharVector.get(rowIndex)); +String value = new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8); addValueToRow(rowIndex, value); } break; @@ -284,7 +280,7 @@ public class RowBatch { } } -public List next() throws DorisException { +public List next() { if (!hasNext()) { String errMsg = "Get row offset:" + offsetInRowBatch + " larger than row size: " + readRowCount; logger.error(errMsg); - To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org