This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new d97642e9b5e [cherry-pick](branch-21) fix tablet sink shuffle without 
project not match the output tuple (#40299)(#41293) (#41327)
d97642e9b5e is described below

commit d97642e9b5eff17d15b6f9de8f5dcaf948ff07a8
Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com>
AuthorDate: Tue Oct 15 00:12:23 2024 +0800

    [cherry-pick](branch-21) fix tablet sink shuffle without project not match 
the output tuple (#40299)(#41293) (#41327)
    
    ## Proposed changes
    
    cherry-pick from master  (#40299)(#41293)
    
    <!--Describe your changes.-->
---
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 33 +++++++-
 be/src/pipeline/exec/exchange_sink_operator.h      |  5 +-
 .../glue/translator/PhysicalPlanTranslator.java    |  3 +-
 .../plans/commands/insert/OlapInsertExecutor.java  |  1 +
 .../org/apache/doris/planner/DataStreamSink.java   | 10 +++
 gensrc/thrift/DataSinks.thrift                     |  1 +
 .../data/nereids_p0/insert_into_table/random.out   |  9 +++
 .../nereids_p0/insert_into_table/random.groovy     | 87 ++++++++++++++++++++++
 8 files changed, 143 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index e4150b4f7ac..7584c0b0e45 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -18,6 +18,7 @@
 #include "exchange_sink_operator.h"
 
 #include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Partitions_types.h>
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/types.pb.h>
 
@@ -249,6 +250,10 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
                 
std::make_unique<vectorized::OlapTabletFinder>(_vpartition.get(), 
find_tablet_mode);
         _tablet_sink_tuple_desc = 
_state->desc_tbl().get_tuple_descriptor(p._tablet_sink_tuple_id);
         _tablet_sink_row_desc = p._pool->add(new 
RowDescriptor(_tablet_sink_tuple_desc, false));
+        _tablet_sink_expr_ctxs.resize(p._tablet_sink_expr_ctxs.size());
+        for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
+            RETURN_IF_ERROR(p._tablet_sink_expr_ctxs[i]->clone(state, 
_tablet_sink_expr_ctxs[i]));
+        }
         // if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, 
we handle the processing of auto_increment column
         // on exchange node rather than on TabletWriter
         _block_convertor =
@@ -265,7 +270,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
                  .txn_id = _txn_id,
                  .pool = p._pool.get(),
                  .location = _location,
-                 .vec_output_expr_ctxs = &_fake_expr_ctxs,
+                 .vec_output_expr_ctxs = &_tablet_sink_expr_ctxs,
                  .schema = _schema,
                  .caller = (void*)this,
                  .create_partition_callback = 
&ExchangeSinkLocalState::empty_callback_function});
@@ -355,7 +360,8 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
           _tablet_sink_partition(sink.tablet_sink_partition),
           _tablet_sink_location(sink.tablet_sink_location),
           _tablet_sink_tuple_id(sink.tablet_sink_tuple_id),
-          _tablet_sink_txn_id(sink.tablet_sink_txn_id) {
+          _tablet_sink_txn_id(sink.tablet_sink_txn_id),
+          _t_tablet_sink_exprs(&sink.tablet_sink_exprs) {
     DCHECK_GT(destinations.size(), 0);
     DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
            sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@@ -367,6 +373,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
            sink.output_partition.type == 
TPartitionType::TABLE_SINK_RANDOM_PARTITIONED);
     _name = "ExchangeSinkOperatorX";
     _pool = std::make_shared<ObjectPool>();
+    if (sink.__isset.output_tuple_id) {
+        _output_tuple_id = sink.output_tuple_id;
+    }
 }
 
 Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
@@ -374,6 +383,10 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) 
{
     if (_part_type == TPartitionType::RANGE_PARTITIONED) {
         return Status::InternalError("TPartitionType::RANGE_PARTITIONED should 
not be used");
     }
+    if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
+        
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs,
+                                                             
_tablet_sink_expr_ctxs));
+    }
     return Status::OK();
 }
 
@@ -386,6 +399,18 @@ Status ExchangeSinkOperatorX::prepare(RuntimeState* state) 
{
 Status ExchangeSinkOperatorX::open(RuntimeState* state) {
     DCHECK(state != nullptr);
     _compression_type = state->fragement_transmission_compression_type();
+    if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
+        if (_output_tuple_id == -1) {
+            RETURN_IF_ERROR(vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, 
state,
+                                                       _child_x->row_desc()));
+        } else {
+            auto* output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+            auto* output_row_desc = _pool->add(new 
RowDescriptor(output_tuple_desc, false));
+            RETURN_IF_ERROR(
+                    vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, 
*output_row_desc));
+        }
+        RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, 
state));
+    }
     return Status::OK();
 }
 
@@ -534,8 +559,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
             local_state._row_distribution._deal_batched = true;
             RETURN_IF_ERROR(local_state._send_new_partition_batch());
         }
+        // the convert_block maybe different with block after execute exprs
+        // when send data we still use block
         RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, 
num_channels,
-                                                  channel2rows, 
convert_block.get(), eos));
+                                                  channel2rows, block, eos));
     } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
         {
             SCOPED_TIMER(local_state._split_block_hash_compute_timer);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 5a7b8bf4201..a94392b906d 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -217,7 +217,7 @@ private:
 
     // for shuffle data by partition and tablet
     int64_t _txn_id = -1;
-    vectorized::VExprContextSPtrs _fake_expr_ctxs;
+    vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
     std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr;
     std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder = nullptr;
     std::shared_ptr<OlapTableSchemaParam> _schema = nullptr;
@@ -273,6 +273,7 @@ private:
     const std::vector<TExpr> _texprs;
 
     const RowDescriptor& _row_desc;
+    TTupleId _output_tuple_id = -1;
 
     TPartitionType::type _part_type;
 
@@ -299,6 +300,8 @@ private:
     const TTupleId _tablet_sink_tuple_id;
     int64_t _tablet_sink_txn_id = -1;
     std::shared_ptr<ObjectPool> _pool;
+    vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
+    const std::vector<TExpr>* _t_tablet_sink_exprs = nullptr;
 
     // for external table sink random partition
     // Control the number of channels according to the flow, thereby 
controlling the number of table sink writers.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index a3d3a1885f3..c0d1aeb917f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -353,8 +353,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             MultiCastDataSink multiCastDataSink = (MultiCastDataSink) 
inputFragment.getSink();
             DataStreamSink dataStreamSink = 
multiCastDataSink.getDataStreamSinks().get(
                     multiCastDataSink.getDataStreamSinks().size() - 1);
-            TupleDescriptor tupleDescriptor = 
generateTupleDesc(distribute.getOutput(), null, context);
-            exchangeNode.updateTupleIds(tupleDescriptor);
+            exchangeNode.updateTupleIds(dataStreamSink.getOutputTupleDesc());
             dataStreamSink.setExchNodeId(exchangeNode.getId());
             dataStreamSink.setOutputPartition(dataPartition);
             parentFragment.addChild(inputFragment);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 8fc6d6cbd14..f522a956899 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -174,6 +174,7 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
                         .createLocation(database.getId(), 
olapTableSink.getDstTable());
                 
dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
                 dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
+                dataStreamSink.setTabletSinkExprs(fragment.getOutputExprs());
             }
         } catch (Exception e) {
             throw new AnalysisException(e.getMessage(), e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
index b9cf516bc3d..ef42190fa25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
@@ -62,6 +62,7 @@ public class DataStreamSink extends DataSink {
     protected TOlapTableLocationParam tabletSinkLocationParam = null;
     protected TupleDescriptor tabletSinkTupleDesc = null;
     protected long tabletSinkTxnId = -1;
+    protected List<Expr> tabletSinkExprs = null;
 
     public DataStreamSink() {
 
@@ -145,6 +146,10 @@ public class DataStreamSink extends DataSink {
         this.tabletSinkLocationParam = locationParam;
     }
 
+    public void setTabletSinkExprs(List<Expr> tabletSinkExprs) {
+        this.tabletSinkExprs = tabletSinkExprs;
+    }
+
     public void setTabletSinkTxnId(long txnId) {
         this.tabletSinkTxnId = txnId;
     }
@@ -224,6 +229,11 @@ public class DataStreamSink extends DataSink {
         if (tabletSinkLocationParam != null) {
             tStreamSink.setTabletSinkLocation(tabletSinkLocationParam);
         }
+        if (tabletSinkExprs != null) {
+            for (Expr expr : tabletSinkExprs) {
+                tStreamSink.addToTabletSinkExprs(expr.treeToThrift());
+            }
+        }
         tStreamSink.setTabletSinkTxnId(tabletSinkTxnId);
         result.setStreamSink(tStreamSink);
         return result;
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 6610de4b688..dfdbbcc0a9f 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -188,6 +188,7 @@ struct TDataStreamSink {
   10: optional Descriptors.TOlapTableLocationParam tablet_sink_location
   11: optional i64 tablet_sink_txn_id
   12: optional Types.TTupleId tablet_sink_tuple_id
+  13: optional list<Exprs.TExpr> tablet_sink_exprs
 }
 
 struct TMultiCastDataStreamSink {
diff --git a/regression-test/data/nereids_p0/insert_into_table/random.out 
b/regression-test/data/nereids_p0/insert_into_table/random.out
index d42426a991f..c774e023267 100644
--- a/regression-test/data/nereids_p0/insert_into_table/random.out
+++ b/regression-test/data/nereids_p0/insert_into_table/random.out
@@ -135,3 +135,12 @@
 13     12      20480.0 48640045.000000 10944010779     2012-03-12      
2012-03-12T12:11:12     22.634
 13     12      20480.0 48640045.000000 10944010779     2012-03-12      
2012-03-12T12:11:12     22.634
 
+-- !sql_select --
+1      11      11
+
+-- !sql_select2 --
+1
+
+-- !sql_select3 --
+601022201389484209     2024-04-09T20:58:49     卖卖      {"is_poi_first_order":0}
+
diff --git a/regression-test/suites/nereids_p0/insert_into_table/random.groovy 
b/regression-test/suites/nereids_p0/insert_into_table/random.groovy
index 6cc5cb2b991..9edd855a9a8 100644
--- a/regression-test/suites/nereids_p0/insert_into_table/random.groovy
+++ b/regression-test/suites/nereids_p0/insert_into_table/random.groovy
@@ -43,4 +43,91 @@ suite('nereids_insert_random') {
     sql 'set delete_without_partition=true'
     sql '''delete from dup_t_type_cast_rd where id is not null'''
     sql '''delete from dup_t_type_cast_rd where id is null'''
+
+    sql 'set enable_strict_consistency_dml=true'
+    sql 'drop table if exists tbl_1'
+    sql 'drop table if exists tbl_4'
+    sql """CREATE TABLE tbl_1 (k1 INT, k2 INT) DISTRIBUTED BY HASH(k1) BUCKETS 
10 PROPERTIES ( "light_schema_change" = "false", "replication_num" = "1");"""
+    sql """INSERT INTO tbl_1 VALUES (1, 11);"""
+    sql 'sync'
+    sql """CREATE TABLE tbl_4 (k1 INT, k2 INT, v INT SUM) AGGREGATE KEY (k1, 
k2) DISTRIBUTED BY HASH(k1) BUCKETS 10  PROPERTIES ( "replication_num" = "1"); 
""" 
+    sql """INSERT INTO tbl_4 SELECT k1, k2, k2 FROM tbl_1;"""
+    sql 'sync'
+    qt_sql_select """ select * from tbl_4; """;
+
+
+    sql 'drop table if exists tbl_5'
+    sql 'drop table if exists tbl_6'
+    sql 'drop table if exists tbl_7'
+
+    sql """    
+        CREATE TABLE `tbl_5` (
+        `orderId` varchar(96) NOT NULL,
+        `updated_at` datetime NOT NULL,
+        `userLabel` varchar(255) NULL,
+        `userTag` variant NULL
+        ) ENGINE=OLAP
+        duplicate KEY(`orderId`, `updated_at`)
+        DISTRIBUTED BY HASH(`orderId`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+     """  
+     sql """   
+        CREATE TABLE tbl_6 
+        (
+        order_id VARCHAR(96) NOT NULL,
+        updated_at DATETIMEV2  NOT NULL
+        ) ENGINE=OLAP
+        duplicate KEY(`order_id`)
+        DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+        """ 
+
+        sql """ INSERT INTO `tbl_6` values('601022201389484209', '2024-04-09 
20:58:49');""" 
+
+        sql """
+        CREATE TABLE tbl_7 
+        (
+        orderId VARCHAR(96) NOT NULL,
+        userLabel VARIANT NULL
+        )ENGINE=OLAP
+        UNIQUE KEY(`orderId`)
+        DISTRIBUTED BY HASH(orderId) BUCKETS AUTO
+        PROPERTIES (
+        "replication_num" = "1"
+        );
+        """ 
+        sql """INSERT INTO `tbl_7` 
values('601022201389484209','{\"is_poi_first_order\":0}');""" 
+
+        sql 'sync'
+        qt_sql_select2 """ INSERT INTO
+                        tbl_5
+                        SELECT
+                        A.order_id as orderId,
+                        A.updated_at,
+                        CASE
+                                WHEN LOCATE('下单1次', CAST(B.userLabel AS 
STRING)) > 0
+                                OR LOCATE('买买', CAST(B.userLabel AS STRING)) > 
0 then '买买'
+                                when B.userLabel ["is_poi_first_order"] = 1 
then '买买'
+                                else '卖卖'
+                        end as userLabel,
+                        B.userLabel AS `userTag`
+                        FROM
+                        (
+                                select
+                                order_id,updated_at
+                                from
+                                tbl_6
+                        ) AS A
+                        LEFT JOIN (
+                                select
+                                orderId,userLabel
+                                from
+                                tbl_7
+                        ) AS B ON A.order_id = B.orderId; """;
+        qt_sql_select3 """ select * from tbl_5; """;
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to