This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 9960b0960e7 branch-2.1: [bug](auto partition) Fix be crash with single replica insert (#48929) 9960b0960e7 is described below commit 9960b0960e718769c04fb6a4879983e49d8e99e1 Author: xy720 <22125576+xy...@users.noreply.github.com> AuthorDate: Fri Apr 11 15:23:51 2025 +0800 branch-2.1: [bug](auto partition) Fix be crash with single replica insert (#48929) Something may be wrong in #48536, and it reverts at #48926. We need to re-run the pipeline and see what was wrong. --- be/src/runtime/tablets_channel.cpp | 14 ++-- be/src/vec/sink/vrow_distribution.cpp | 3 + be/src/vec/sink/vrow_distribution.h | 3 + be/src/vec/sink/writer/vtablet_writer.cpp | 9 ++- .../apache/doris/service/FrontendServiceImpl.java | 35 ++++++++- gensrc/thrift/FrontendService.thrift | 4 ++ ...t_auto_partition_with_single_replica_insert.csv | 78 +++++++++++++++++++++ ...t_auto_partition_with_single_replica_insert.out | Bin 0 -> 7985 bytes ...uto_partition_with_single_replica_insert.groovy | 78 +++++++++++++++++++++ 9 files changed, 214 insertions(+), 10 deletions(-) diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index db3d558a1b9..4c7dce313f6 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -357,8 +357,6 @@ Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockReq // 5. commit all writers for (auto* writer : need_wait_writers) { - PSlaveTabletNodes slave_nodes; - // close may return failed, but no need to handle it here. // tablet_vec will only contains success tablet, and then let FE judge it. _commit_txn(writer, req, res); @@ -395,9 +393,15 @@ Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockReq void TabletsChannel::_commit_txn(DeltaWriter* writer, const PTabletWriterAddBlockRequest& req, PTabletWriterAddBlockResult* res) { - Status st = writer->commit_txn(_write_single_replica - ? req.slave_tablet_nodes().at(writer->tablet_id()) - : PSlaveTabletNodes {}); + PSlaveTabletNodes slave_nodes; + if (_write_single_replica) { + auto& nodes_map = req.slave_tablet_nodes(); + auto it = nodes_map.find(writer->tablet_id()); + if (it != nodes_map.end()) { + slave_nodes = it->second; + } + } + Status st = writer->commit_txn(slave_nodes); if (st.ok()) [[likely]] { auto* tablet_vec = res->mutable_tablet_vec(); PTabletInfo* tablet_info = tablet_vec->Add(); diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index e4f809988e7..c322b364b2e 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -94,6 +94,7 @@ Status VRowDistribution::automatic_create_partition() { request.__set_db_id(_vpartition->db_id()); request.__set_table_id(_vpartition->table_id()); request.__set_partitionValues(_partitions_need_create); + request.__set_write_single_replica(_write_single_replica); VLOG_NOTICE << "automatic partition rpc begin request " << request; TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; @@ -127,6 +128,7 @@ static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg result.nodes = std::move(arg.nodes); result.partitions = std::move(arg.partitions); result.tablets = std::move(arg.tablets); + result.slave_tablets = std::move(arg.slave_tablets); return result; } @@ -138,6 +140,7 @@ Status VRowDistribution::_replace_overwriting_partition() { request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id()); request.__set_db_id(_vpartition->db_id()); request.__set_table_id(_vpartition->table_id()); + request.__set_write_single_replica(_write_single_replica); // only request for partitions not recorded for replacement std::set<int64_t> id_deduper; diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index 9e4cce6b528..429b67bb068 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -79,6 +79,7 @@ public: const VExprContextSPtrs* vec_output_expr_ctxs = nullptr; std::shared_ptr<OlapTableSchemaParam> schema; void* caller = nullptr; + bool write_single_replica = false; CreatePartitionCallback create_partition_callback; }; friend class VTabletWriter; @@ -100,6 +101,7 @@ public: _vec_output_expr_ctxs = ctx.vec_output_expr_ctxs; _schema = ctx.schema; _caller = ctx.caller; + _write_single_replica = ctx.write_single_replica; _create_partition_callback = ctx.create_partition_callback; } @@ -219,6 +221,7 @@ private: CreatePartitionCallback _create_partition_callback = nullptr; void* _caller = nullptr; std::shared_ptr<OlapTableSchemaParam> _schema; + bool _write_single_replica = false; // reuse for find_tablet. save partitions found by find_tablets std::vector<VOlapTablePartition*> _partitions; diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 6ac88b5c963..56c35570fc8 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -125,7 +125,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart _has_inc_node = true; } LOG(INFO) << "init new node for instance " << _parent->_sender_id - << ", incremantal:" << incremental; + << ", node id:" << replica_node_id << ", incremantal:" << incremental; } else { channel = it->second; } @@ -973,7 +973,8 @@ void VNodeChannel::mark_close(bool hang_wait) { DCHECK(_pending_blocks.back().second->eos()); _close_time_ms = UnixMillis(); LOG(INFO) << channel_info() - << " mark closed, left pending batch size: " << _pending_blocks.size(); + << " mark closed, left pending batch size: " << _pending_blocks.size() + << " hang_wait: " << hang_wait; } _eos_is_produced = true; @@ -1101,7 +1102,8 @@ Status VTabletWriter::on_partitions_created(TCreatePartitionResult* result) { auto* new_locations = _pool->add(new std::vector<TTabletLocation>(result->tablets)); _location->add_locations(*new_locations); if (_write_single_replica) { - _slave_location->add_locations(*new_locations); + auto* slave_locations = _pool->add(new std::vector<TTabletLocation>(result->slave_tablets)); + _slave_location->add_locations(*slave_locations); } // update new node info @@ -1129,6 +1131,7 @@ Status VTabletWriter::_init_row_distribution() { .vec_output_expr_ctxs = &_vec_output_expr_ctxs, .schema = _schema, .caller = this, + .write_single_replica = _write_single_replica, .create_partition_callback = &vectorized::on_partitions_created}); return _row_distribution.open(_output_row_desc); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index ccba47396f9..699bdbf13d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -261,6 +261,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -268,6 +269,7 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import java.io.IOException; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -275,6 +277,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -3704,6 +3707,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { // build partition & tablets List<TOlapTablePartition> partitions = Lists.newArrayList(); List<TTabletLocation> tablets = Lists.newArrayList(); + List<TTabletLocation> slaveTablets = new ArrayList<>(); for (String partitionName : addPartitionClauseMap.keySet()) { Partition partition = table.getPartition(partitionName); TOlapTablePartition tPartition = new TOlapTablePartition(); @@ -3736,12 +3740,25 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (bePathsMap.keySet().size() < quorum) { LOG.warn("auto go quorum exception"); } - tablets.add(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet()))); + if (request.isSetWriteSingleReplica() && request.isWriteSingleReplica()) { + Long[] nodes = bePathsMap.keySet().toArray(new Long[0]); + Random random = new SecureRandom(); + Long masterNode = nodes[random.nextInt(nodes.length)]; + Multimap<Long, Long> slaveBePathsMap = bePathsMap; + slaveBePathsMap.removeAll(masterNode); + tablets.add(new TTabletLocation(tablet.getId(), + Lists.newArrayList(Sets.newHashSet(masterNode)))); + slaveTablets.add(new TTabletLocation(tablet.getId(), + Lists.newArrayList(slaveBePathsMap.keySet()))); + } else { + tablets.add(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet()))); + } } } } result.setPartitions(partitions); result.setTablets(tablets); + result.setSlaveTablets(slaveTablets); // build nodes List<TNodeInfo> nodeInfos = Lists.newArrayList(); @@ -3897,6 +3914,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { // so they won't be changed again. if other transaction changing it. just let it fail. List<TOlapTablePartition> partitions = new ArrayList<>(); List<TTabletLocation> tablets = new ArrayList<>(); + List<TTabletLocation> slaveTablets = new ArrayList<>(); PartitionInfo partitionInfo = olapTable.getPartitionInfo(); for (long partitionId : resultPartitionIds) { Partition partition = olapTable.getPartition(partitionId); @@ -3932,12 +3950,25 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (bePathsMap.keySet().size() < quorum) { LOG.warn("auto go quorum exception"); } - tablets.add(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet()))); + if (request.isSetWriteSingleReplica() && request.isWriteSingleReplica()) { + Long[] nodes = bePathsMap.keySet().toArray(new Long[0]); + Random random = new SecureRandom(); + Long masterNode = nodes[random.nextInt(nodes.length)]; + Multimap<Long, Long> slaveBePathsMap = bePathsMap; + slaveBePathsMap.removeAll(masterNode); + tablets.add(new TTabletLocation(tablet.getId(), + Lists.newArrayList(Sets.newHashSet(masterNode)))); + slaveTablets.add(new TTabletLocation(tablet.getId(), + Lists.newArrayList(slaveBePathsMap.keySet()))); + } else { + tablets.add(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet()))); + } } } } result.setPartitions(partitions); result.setTablets(tablets); + result.setSlaveTablets(slaveTablets); // build nodes List<TNodeInfo> nodeInfos = Lists.newArrayList(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 210f1ada0a9..4d13612989b 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1487,6 +1487,7 @@ struct TCreatePartitionRequest { 3: optional i64 table_id // for each partition column's partition values. [missing_rows, partition_keys]->Left bound(for range) or Point(for list) 4: optional list<list<Exprs.TNullableStringLiteral>> partitionValues + 5: optional bool write_single_replica = false } struct TCreatePartitionResult { @@ -1494,6 +1495,7 @@ struct TCreatePartitionResult { 2: optional list<Descriptors.TOlapTablePartition> partitions 3: optional list<Descriptors.TTabletLocation> tablets 4: optional list<Descriptors.TNodeInfo> nodes + 5: optional list<Descriptors.TTabletLocation> slave_tablets } // these two for auto detect replacing partition @@ -1502,6 +1504,7 @@ struct TReplacePartitionRequest { 2: optional i64 db_id 3: optional i64 table_id 4: optional list<i64> partition_ids // partition to replace. + 5: optional bool write_single_replica = false } struct TReplacePartitionResult { @@ -1509,6 +1512,7 @@ struct TReplacePartitionResult { 2: optional list<Descriptors.TOlapTablePartition> partitions 3: optional list<Descriptors.TTabletLocation> tablets 4: optional list<Descriptors.TNodeInfo> nodes + 5: optional list<Descriptors.TTabletLocation> slave_tablets } struct TGetMetaReplica { diff --git a/regression-test/data/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.csv b/regression-test/data/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.csv new file mode 100644 index 00000000000..747ac6788be --- /dev/null +++ b/regression-test/data/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.csv @@ -0,0 +1,78 @@ +-3590935922607536626,-,2025-02-16,星辰医疗科技有限公司 +-3590935906895636626,-,2025-02-16,未来健康产业 +123,-,2025-02-16,蓝海生物有限公司 +100000048812501,-,2025-02-16,阳光医疗集团 +1000000076784258,-,2025-02-16,华夏健康科技 +1000022060522735,-,2025-02-16,瑞丰生物医药 +1000022193719484,-,2025-02-16,盛世医疗服务 +1000031422678924,-,2025-02-16,康宁健康有限公司 +1000085651028900,-,2025-02-16,史前生物科技 +1000093620518989,-,2025-02-16,健康之路公司 +1000103471774704,-,2025-02-16,安康医疗科技 +1000138777615262,-,2025-02-16,福瑞堂生物 +1000156823071129,-,2025-02-16,优质生活科技 +1000191711015262,-,2025-02-16,健康未来企业 +1000633041475486,-,2025-02-16,天使医疗集团 +1000681518627336,-,2025-02-16,百年健康公司 +1002458253925730,-,2025-02-16,康乐园生物科技 +1008126191610424,-,2025-02-16,华康医药有限公司 +1071904784424147,-,2025-02-16,金桥健康产业 +1076564522324147,-,2025-02-16,乐活医疗科技 +1202217708798485,-,2025-02-16,健康家园公司 +1224474148903456,-,2025-02-16,康泰生物科技 +2043829367811999,-,2025-02-16,未来医疗集团 +2191851926844270,-,2025-02-16,健康之源公司 +2232379824950609,-,2025-02-16,安宁医药有限公司 +2350341369782152,-,2025-02-16,和谐生物科技 +2548383917911403,-,2025-02-16,康健医疗服务 +2640774381717600,-,2025-02-16,瑞康医药有限公司 +2754625269782961,-,2025-02-16,乐享健康产业 +3064667398063809,-,2025-02-16,健康先锋公司 +3102689636972458,-,2025-02-16,安康生物科技 +3291916164371209,-,2025-02-16,未来之星医疗 +3946909802002976,-,2025-02-16,健康梦想公司 +3965513055005942,-,2025-02-16,康乐生物科技 +4143117309325214,-,2025-02-16,安宁健康产业 +4175970196426577,-,2025-02-16,乐活医疗集团 +4294566787233969,-,2025-02-16,健康之道公司 +4610682351457207,-,2025-02-16,瑞丰医疗科技 +4674640812462217,-,2025-02-16,未来健康企业 +4676494238858307,-,2025-02-16,安康医药有限公司 +4937264996861701,-,2025-02-16,乐享健康公司 +4947288173569190,-,2025-02-16,康宁医疗集团 +5115179098054305,-,2025-02-16,健康家园科技 +10000000430024147,-,2025-02-16,阳光医疗有限公司 +10000021073673208,-,2025-02-16,未来生物科技 +10000130032642122,-,2025-02-16,和谐健康产业 +10000365660973707,-,2025-02-16,安宁医药公司 +10000453096993544,-,2025-02-16,傻乐生物科技 +10000789012345678,-,2025-02-16,星辉医疗科技 +10000890123456789,-,2025-02-16,未来健康服务 +10000901234567890,-,2025-02-16,蓝天生物科技 +10001012345678901,-,2025-02-16,阳光健康产业 +10001123456789012,-,2025-02-16,华康医疗集团 +10001234567890123,-,2025-02-16,易丰生物医药 +10001345678901234,-,2025-02-16,盛世健康科技 +10001456789012345,-,2025-02-16,康宁医疗服务 +10001567890123456,-,2025-02-16,和谐生物公司 +10001678901234567,-,2025-02-16,健康之路科技 +10001789012345678,-,2025-02-16,安康生物产业 +10001890123456789,-,2025-02-16,福瑞堂医疗 +10001901234567890,-,2025-02-16,未来生活科技 +10002012345678901,-,2025-02-16,美好未来企业 +10002123456789012,-,2025-02-16,金地医疗集团 +10002234567890123,-,2025-02-16,老头健康公司 +10002345678901234,-,2025-02-16,平安园生物科技 +10002456789012345,-,2025-02-16,闪电医药有限公司 +10002567890123456,-,2025-02-16,铜桥健康产业 +10002678901234567,-,2025-02-16,乐天医疗科技 +10002789012345678,-,2025-02-16,健康成长公司 +10002890123456789,-,2025-02-16,尖端生物科技 +10002901234567890,-,2025-02-16,保护伞医疗集团 +10003012345678901,-,2025-02-16,青春之源公司 +10003123456789012,-,2025-02-16,大森林医药有限公司 +10003234567890123,-,2025-02-16,毒蛇生物科技 +10003345678901234,-,2025-02-16,金地医疗服务 +10003456789012345,-,2025-02-16,瑞丰医药有限公司 +10003567890123456,-,2025-02-16,乐游娱乐产业 +10003678901234567,-,2025-02-16,康岩先锋公司 diff --git a/regression-test/data/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.out b/regression-test/data/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.out new file mode 100644 index 00000000000..e565de71e73 Binary files /dev/null and b/regression-test/data/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.out differ diff --git a/regression-test/suites/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.groovy b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.groovy new file mode 100644 index 00000000000..510b24602ac --- /dev/null +++ b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_with_single_replica_insert.groovy @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_auto_partition_with_single_replica_insert") { + def tableName1 = "test_auto_partition_with_single_replica_insert_1" + def tableName2 = "test_auto_partition_with_single_replica_insert_2" + sql "drop table if exists ${tableName1}" + sql """ + CREATE TABLE `${tableName1}` ( + `user_id` varchar(100) NULL, + `goods_id` varchar(100) NULL, + `dates` date NULL, + `chain_name` varchar(100) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `goods_id`, `dates`) + COMMENT 'OLAP' + AUTO PARTITION BY LIST (`chain_name`) + (PARTITION pchain5fname10 VALUES IN ("chain_name"), + PARTITION p4e0995e85ce1534e4e3a5 VALUES IN ("星辰医疗科技有限公司")) + DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + streamLoad { + table "${tableName1}" + set 'column_separator', ',' + file "test_auto_partition_with_single_replica_insert.csv" + time 20000 + } + sql " sync " + qt_select1 "select * from ${tableName1} order by user_id" + def result1 = sql "show partitions from ${tableName1}" + logger.info("${result1}") + assertEquals(result1.size(), 79) + + sql "drop table if exists ${tableName2}" + sql """ + CREATE TABLE `${tableName2}` ( + `user_id` varchar(100) NULL, + `goods_id` varchar(100) NULL, + `dates` date NULL, + `chain_name` varchar(100) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `goods_id`, `dates`) + COMMENT 'OLAP' + AUTO PARTITION BY LIST (`chain_name`) + (PARTITION pchain5fname10 VALUES IN ("chain_name"), + PARTITION p4e0995e85ce1534e4e3a5 VALUES IN ("星辰医疗科技有限公司")) + DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """set experimental_enable_nereids_planner = true""" + sql """set enable_memtable_on_sink_node = false""" + sql """set experimental_enable_single_replica_insert = true""" + sql "insert into ${tableName2} select user_id, goods_id, dates, chain_name from ${tableName1}" + sql " sync " + qt_select2 "select * from ${tableName2} order by user_id" + def result2 = sql "show partitions from ${tableName1}" + logger.info("${result2}") + assertEquals(result1.size(), 79) +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org