This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new d97642e9b5e [cherry-pick](branch-21) fix tablet sink shuffle without project not match the output tuple (#40299)(#41293) (#41327) d97642e9b5e is described below commit d97642e9b5eff17d15b6f9de8f5dcaf948ff07a8 Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Tue Oct 15 00:12:23 2024 +0800 [cherry-pick](branch-21) fix tablet sink shuffle without project not match the output tuple (#40299)(#41293) (#41327) ## Proposed changes cherry-pick from master (#40299)(#41293) <!--Describe your changes.--> --- be/src/pipeline/exec/exchange_sink_operator.cpp | 33 +++++++- be/src/pipeline/exec/exchange_sink_operator.h | 5 +- .../glue/translator/PhysicalPlanTranslator.java | 3 +- .../plans/commands/insert/OlapInsertExecutor.java | 1 + .../org/apache/doris/planner/DataStreamSink.java | 10 +++ gensrc/thrift/DataSinks.thrift | 1 + .../data/nereids_p0/insert_into_table/random.out | 9 +++ .../nereids_p0/insert_into_table/random.groovy | 87 ++++++++++++++++++++++ 8 files changed, 143 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index e4150b4f7ac..7584c0b0e45 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -18,6 +18,7 @@ #include "exchange_sink_operator.h" #include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/Partitions_types.h> #include <gen_cpp/Types_types.h> #include <gen_cpp/types.pb.h> @@ -249,6 +250,10 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(), find_tablet_mode); _tablet_sink_tuple_desc = _state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id); _tablet_sink_row_desc = p._pool->add(new RowDescriptor(_tablet_sink_tuple_desc, false)); + _tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size()); + for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i])); + } // if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column // on exchange node rather than on TabletWriter _block_convertor = @@ -265,7 +270,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { .txn_id = _txn_id, .pool = p._pool.get(), .location = _location, - .vec_output_expr_ctxs = &_fake_expr_ctxs, + .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs, .schema = _schema, .caller = (void*)this, .create_partition_callback = &ExchangeSinkLocalState::empty_callback_function}); @@ -355,7 +360,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _tablet_sink_partition(sink.tablet_sink_partition), _tablet_sink_location(sink.tablet_sink_location), _tablet_sink_tuple_id(sink.tablet_sink_tuple_id), - _tablet_sink_txn_id(sink.tablet_sink_txn_id) { + _tablet_sink_txn_id(sink.tablet_sink_txn_id), + _t_tablet_sink_exprs(&sink.tablet_sink_exprs) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || sink.output_partition.type == TPartitionType::HASH_PARTITIONED || @@ -367,6 +373,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( sink.output_partition.type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED); _name = "ExchangeSinkOperatorX"; _pool = std::make_shared<ObjectPool>(); + if (sink.__isset.output_tuple_id) { + _output_tuple_id = sink.output_tuple_id; + } } Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { @@ -374,6 +383,10 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { if (_part_type == TPartitionType::RANGE_PARTITIONED) { return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used"); } + if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs, + _tablet_sink_expr_ctxs)); + } return Status::OK(); } @@ -386,6 +399,18 @@ Status ExchangeSinkOperatorX::prepare(RuntimeState* state) { Status ExchangeSinkOperatorX::open(RuntimeState* state) { DCHECK(state != nullptr); _compression_type = state->fragement_transmission_compression_type(); + if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { + if (_output_tuple_id == -1) { + RETURN_IF_ERROR(vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, + _child_x->row_desc())); + } else { + auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); + auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc, false)); + RETURN_IF_ERROR( + vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, *output_row_desc)); + } + RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state)); + } return Status::OK(); } @@ -534,8 +559,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block local_state._row_distribution._deal_batched = true; RETURN_IF_ERROR(local_state._send_new_partition_batch()); } + // the convert_block maybe different with block after execute exprs + // when send data we still use block RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, - channel2rows, convert_block.get(), eos)); + channel2rows, block, eos)); } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { { SCOPED_TIMER(local_state._split_block_hash_compute_timer); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 5a7b8bf4201..a94392b906d 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -217,7 +217,7 @@ private: // for shuffle data by partition and tablet int64_t _txn_id = -1; - vectorized::VExprContextSPtrs _fake_expr_ctxs; + vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs; std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr; std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder = nullptr; std::shared_ptr<OlapTableSchemaParam> _schema = nullptr; @@ -273,6 +273,7 @@ private: const std::vector<TExpr> _texprs; const RowDescriptor& _row_desc; + TTupleId _output_tuple_id = -1; TPartitionType::type _part_type; @@ -299,6 +300,8 @@ private: const TTupleId _tablet_sink_tuple_id; int64_t _tablet_sink_txn_id = -1; std::shared_ptr<ObjectPool> _pool; + vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs; + const std::vector<TExpr>* _t_tablet_sink_exprs = nullptr; // for external table sink random partition // Control the number of channels according to the flow, thereby controlling the number of table sink writers. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index a3d3a1885f3..c0d1aeb917f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -353,8 +353,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink(); DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get( multiCastDataSink.getDataStreamSinks().size() - 1); - TupleDescriptor tupleDescriptor = generateTupleDesc(distribute.getOutput(), null, context); - exchangeNode.updateTupleIds(tupleDescriptor); + exchangeNode.updateTupleIds(dataStreamSink.getOutputTupleDesc()); dataStreamSink.setExchNodeId(exchangeNode.getId()); dataStreamSink.setOutputPartition(dataPartition); parentFragment.addChild(inputFragment); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index 8fc6d6cbd14..f522a956899 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -174,6 +174,7 @@ public class OlapInsertExecutor extends AbstractInsertExecutor { .createLocation(database.getId(), olapTableSink.getDstTable()); dataStreamSink.setTabletSinkLocationParam(locationParams.get(0)); dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId()); + dataStreamSink.setTabletSinkExprs(fragment.getOutputExprs()); } } catch (Exception e) { throw new AnalysisException(e.getMessage(), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java index b9cf516bc3d..ef42190fa25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java @@ -62,6 +62,7 @@ public class DataStreamSink extends DataSink { protected TOlapTableLocationParam tabletSinkLocationParam = null; protected TupleDescriptor tabletSinkTupleDesc = null; protected long tabletSinkTxnId = -1; + protected List<Expr> tabletSinkExprs = null; public DataStreamSink() { @@ -145,6 +146,10 @@ public class DataStreamSink extends DataSink { this.tabletSinkLocationParam = locationParam; } + public void setTabletSinkExprs(List<Expr> tabletSinkExprs) { + this.tabletSinkExprs = tabletSinkExprs; + } + public void setTabletSinkTxnId(long txnId) { this.tabletSinkTxnId = txnId; } @@ -224,6 +229,11 @@ public class DataStreamSink extends DataSink { if (tabletSinkLocationParam != null) { tStreamSink.setTabletSinkLocation(tabletSinkLocationParam); } + if (tabletSinkExprs != null) { + for (Expr expr : tabletSinkExprs) { + tStreamSink.addToTabletSinkExprs(expr.treeToThrift()); + } + } tStreamSink.setTabletSinkTxnId(tabletSinkTxnId); result.setStreamSink(tStreamSink); return result; diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 6610de4b688..dfdbbcc0a9f 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -188,6 +188,7 @@ struct TDataStreamSink { 10: optional Descriptors.TOlapTableLocationParam tablet_sink_location 11: optional i64 tablet_sink_txn_id 12: optional Types.TTupleId tablet_sink_tuple_id + 13: optional list<Exprs.TExpr> tablet_sink_exprs } struct TMultiCastDataStreamSink { diff --git a/regression-test/data/nereids_p0/insert_into_table/random.out b/regression-test/data/nereids_p0/insert_into_table/random.out index d42426a991f..c774e023267 100644 --- a/regression-test/data/nereids_p0/insert_into_table/random.out +++ b/regression-test/data/nereids_p0/insert_into_table/random.out @@ -135,3 +135,12 @@ 13 12 20480.0 48640045.000000 10944010779 2012-03-12 2012-03-12T12:11:12 22.634 13 12 20480.0 48640045.000000 10944010779 2012-03-12 2012-03-12T12:11:12 22.634 +-- !sql_select -- +1 11 11 + +-- !sql_select2 -- +1 + +-- !sql_select3 -- +601022201389484209 2024-04-09T20:58:49 卖卖 {"is_poi_first_order":0} + diff --git a/regression-test/suites/nereids_p0/insert_into_table/random.groovy b/regression-test/suites/nereids_p0/insert_into_table/random.groovy index 6cc5cb2b991..9edd855a9a8 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/random.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/random.groovy @@ -43,4 +43,91 @@ suite('nereids_insert_random') { sql 'set delete_without_partition=true' sql '''delete from dup_t_type_cast_rd where id is not null''' sql '''delete from dup_t_type_cast_rd where id is null''' + + sql 'set enable_strict_consistency_dml=true' + sql 'drop table if exists tbl_1' + sql 'drop table if exists tbl_4' + sql """CREATE TABLE tbl_1 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 10 PROPERTIES ( "light_schema_change" = "false", "replication_num" = "1");""" + sql """INSERT INTO tbl_1 VALUES (1, 11);""" + sql 'sync' + sql """CREATE TABLE tbl_4 (k1 INT, k2 INT, v INT SUM) AGGREGATE KEY (k1, k2) DISTRIBUTED BY HASH(k1) BUCKETS 10 PROPERTIES ( "replication_num" = "1"); """ + sql """INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1;""" + sql 'sync' + qt_sql_select """ select * from tbl_4; """; + + + sql 'drop table if exists tbl_5' + sql 'drop table if exists tbl_6' + sql 'drop table if exists tbl_7' + + sql """ + CREATE TABLE `tbl_5` ( + `orderId` varchar(96) NOT NULL, + `updated_at` datetime NOT NULL, + `userLabel` varchar(255) NULL, + `userTag` variant NULL + ) ENGINE=OLAP + duplicate KEY(`orderId`, `updated_at`) + DISTRIBUTED BY HASH(`orderId`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ + CREATE TABLE tbl_6 + ( + order_id VARCHAR(96) NOT NULL, + updated_at DATETIMEV2 NOT NULL + ) ENGINE=OLAP + duplicate KEY(`order_id`) + DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ INSERT INTO `tbl_6` values('601022201389484209', '2024-04-09 20:58:49');""" + + sql """ + CREATE TABLE tbl_7 + ( + orderId VARCHAR(96) NOT NULL, + userLabel VARIANT NULL + )ENGINE=OLAP + UNIQUE KEY(`orderId`) + DISTRIBUTED BY HASH(orderId) BUCKETS AUTO + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql """INSERT INTO `tbl_7` values('601022201389484209','{\"is_poi_first_order\":0}');""" + + sql 'sync' + qt_sql_select2 """ INSERT INTO + tbl_5 + SELECT + A.order_id as orderId, + A.updated_at, + CASE + WHEN LOCATE('下单1次', CAST(B.userLabel AS STRING)) > 0 + OR LOCATE('买买', CAST(B.userLabel AS STRING)) > 0 then '买买' + when B.userLabel ["is_poi_first_order"] = 1 then '买买' + else '卖卖' + end as userLabel, + B.userLabel AS `userTag` + FROM + ( + select + order_id,updated_at + from + tbl_6 + ) AS A + LEFT JOIN ( + select + orderId,userLabel + from + tbl_7 + ) AS B ON A.order_id = B.orderId; """; + qt_sql_select3 """ select * from tbl_5; """; + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org