This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 441fb49345e [Bug](load) fix load failed on stream load tvf into agg
state (#28420)
441fb49345e is described below
commit 441fb49345e992898eaefbc18aaafabb6cebae6d
Author: Pxl <[email protected]>
AuthorDate: Thu Jan 4 17:38:31 2024 +0800
[Bug](load) fix load failed on stream load tvf into agg state (#28420)
fix load failed on stream load tvf into agg state
---
.../apache/doris/analysis/NativeInsertStmt.java | 50 ++++++++++++----------
.../src/main/java/org/apache/doris/load/Load.java | 24 ++++++-----
.../org/apache/doris/planner/OriginalPlanner.java | 11 +++--
.../org/apache/doris/analysis/InsertStmtTest.java | 3 --
.../org/apache/doris/planner/QueryPlanTest.java | 4 +-
.../data/mv_p0/agg_state/test_agg_state_max_by.out | 19 ++++++++
.../mv_p0/agg_state/test_agg_state_max_by.groovy | 31 ++++++++++++++
7 files changed, 102 insertions(+), 40 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 176833b865c..41e20df9adc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -45,6 +45,7 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
+import org.apache.doris.load.Load;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
@@ -709,15 +710,16 @@ public class NativeInsertStmt extends InsertStmt {
if (entry.second == null) {
queryStmt.getResultExprs().add(queryStmt.getResultExprs().get(entry.first));
} else {
- //substitute define expr slot with select statement
result expr
+ // substitute define expr slot with select statement
result expr
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> columns = entry.second.getRefColumns();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
-
smap.getRhs().add(slotToIndex.get(slot.getColumnName()));
+ smap.getRhs()
+ .add(Load.getExprFromDesc(analyzer,
slotToIndex.get(slot.getColumnName()), slot));
}
- Expr e =
Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()),
- smap, analyzer, false).get(0);
+ Expr e = entry.second.getDefineExpr().clone(smap);
+ e.analyze(analyzer);
queryStmt.getResultExprs().add(e);
}
}
@@ -740,15 +742,16 @@ public class NativeInsertStmt extends InsertStmt {
if (entry.second == null) {
queryStmt.getBaseTblResultExprs().add(queryStmt.getBaseTblResultExprs().get(entry.first));
} else {
- //substitute define expr slot with select statement
result expr
+ // substitute define expr slot with select statement
result expr
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> columns = entry.second.getRefColumns();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
-
smap.getRhs().add(slotToIndex.get(slot.getColumnName()));
+ smap.getRhs()
+ .add(Load.getExprFromDesc(analyzer,
slotToIndex.get(slot.getColumnName()), slot));
}
- Expr e =
Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()),
- smap, analyzer, false).get(0);
+ Expr e = entry.second.getDefineExpr().clone(smap);
+ e.analyze(analyzer);
queryStmt.getBaseTblResultExprs().add(e);
}
}
@@ -835,7 +838,8 @@ public class NativeInsertStmt extends InsertStmt {
List<SlotRef> columns = entry.second.getRefColumns();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
-
smap.getRhs().add(slotToIndex.get(slot.getColumnName()));
+ smap.getRhs()
+ .add(Load.getExprFromDesc(analyzer,
slotToIndex.get(slot.getColumnName()), slot));
}
extentedRow.add(Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()),
smap, analyzer, false).get(0));
@@ -901,9 +905,7 @@ public class NativeInsertStmt extends InsertStmt {
int numCols = targetColumns.size();
for (int i = 0; i < numCols; ++i) {
Column col = targetColumns.get(i);
- Expr expr =
selectList.get(i).checkTypeCompatibility(col.getType());
- selectList.set(i, expr);
- exprByName.put(col.getName(), expr);
+ exprByName.put(col.getName(), selectList.get(i));
}
List<Pair<String, Expr>> resultExprByName = Lists.newArrayList();
@@ -933,16 +935,7 @@ public class NativeInsertStmt extends InsertStmt {
}
continue;
} else if (col.getDefineExpr() != null) {
- // substitute define expr slot with select statement
result expr
- ExprSubstitutionMap smap = new ExprSubstitutionMap();
- List<SlotRef> columns = col.getRefColumns();
- for (SlotRef slot : columns) {
- smap.getLhs().add(slot);
-
smap.getRhs().add(slotToIndex.get(slot.getColumnName()));
- }
- targetExpr = Expr
-
.substituteList(Lists.newArrayList(col.getDefineExpr().clone()), smap,
analyzer, false)
- .get(0);
+ targetExpr = col.getDefineExpr().clone();
} else if (col.getDefaultValue() == null) {
targetExpr = NullLiteral.create(col.getType());
} else {
@@ -955,12 +948,25 @@ public class NativeInsertStmt extends InsertStmt {
}
}
}
+
+ List<SlotRef> columns = col.getRefColumns();
+ if (columns != null) {
+ // substitute define expr slot with select statement result
expr
+ ExprSubstitutionMap smap = new ExprSubstitutionMap();
+ for (SlotRef slot : columns) {
+ smap.getLhs().add(slot);
+ smap.getRhs().add(Load.getExprFromDesc(analyzer,
slotToIndex.get(slot.getColumnName()), slot));
+ }
+ targetExpr = targetExpr.clone(smap);
+ targetExpr.analyze(analyzer);
+ }
resultExprByName.add(Pair.of(col.getName(), targetExpr));
slotToIndex.put(col.getName(), targetExpr);
}
resultExprs.addAll(resultExprByName.stream().map(Pair::value).collect(Collectors.toList()));
}
+
private DataSink createDataSink() throws AnalysisException {
if (dataSink != null) {
return dataSink;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 8dd3a0ebc27..b27ce8c16fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -472,11 +472,14 @@ public class Load {
LOG.debug("after init column, exprMap: {}", exprsByName);
}
- private static Expr getExprFromDesc(Analyzer analyzer, SlotDescriptor
slotDesc, SlotRef slot)
- throws AnalysisException {
- SlotRef newSlot = new SlotRef(slotDesc);
- newSlot.setType(slotDesc.getType());
- Expr rhs = newSlot;
+ private static SlotRef getSlotFromDesc(SlotDescriptor slotDesc) {
+ SlotRef slot = new SlotRef(slotDesc);
+ slot.setType(slotDesc.getType());
+ return slot;
+ }
+
+ public static Expr getExprFromDesc(Analyzer analyzer, Expr rhs, SlotRef
slot) throws AnalysisException {
+ Type rhsType = rhs.getType();
rhs = rhs.castTo(slot.getType());
if (slot.getDesc() == null) {
@@ -484,13 +487,13 @@ public class Load {
return rhs;
}
- if (newSlot.isNullable() && !slot.isNullable()) {
+ if (rhs.isNullable() && !slot.isNullable()) {
rhs = new FunctionCallExpr("non_nullable",
Lists.newArrayList(rhs));
- rhs.setType(slotDesc.getType());
+ rhs.setType(rhsType);
rhs.analyze(analyzer);
- } else if (!newSlot.isNullable() && slot.isNullable()) {
+ } else if (!rhs.isNullable() && slot.isNullable()) {
rhs = new FunctionCallExpr("nullable", Lists.newArrayList(rhs));
- rhs.setType(slotDesc.getType());
+ rhs.setType(rhsType);
rhs.analyze(analyzer);
}
return rhs;
@@ -553,7 +556,8 @@ public class Load {
for (SlotRef slot : slots) {
if (slotDescByName.get(slot.getColumnName()) != null) {
smap.getLhs().add(slot);
- smap.getRhs().add(getExprFromDesc(analyzer,
slotDescByName.get(slot.getColumnName()), slot));
+ smap.getRhs().add(
+ getExprFromDesc(analyzer,
getSlotFromDesc(slotDescByName.get(slot.getColumnName())), slot));
} else if (exprsByName.get(slot.getColumnName()) != null) {
smap.getLhs().add(slot);
smap.getRhs().add(new
CastExpr(tbl.getColumn(slot.getColumnName()).getType(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index 89044987c9e..4e8fb7a4ead 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -248,9 +248,14 @@ public class OriginalPlanner extends Planner {
rootFragment.setSink(insertStmt.getDataSink());
insertStmt.complete();
List<Expr> exprs = statement.getResultExprs();
- List<Expr> resExprs = Expr.substituteList(
- exprs, rootFragment.getPlanRoot().getOutputSmap(),
analyzer, true);
- rootFragment.setOutputExprs(resExprs);
+ if (analyzer.getContext().getConnectionId() == 0) {
+ // stream load tvf
+ rootFragment.setOutputExprs(exprs);
+ } else {
+ List<Expr> resExprs = Expr.substituteList(exprs,
rootFragment.getPlanRoot().getOutputSmap(), analyzer,
+ true);
+ rootFragment.setOutputExprs(resExprs);
+ }
} else {
List<Expr> resExprs =
Expr.substituteList(queryStmt.getResultExprs(),
rootFragment.getPlanRoot().getOutputSmap(), analyzer,
false);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java
index 64c3494ba7e..5074b0f3805 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java
@@ -36,7 +36,6 @@ import mockit.Injectable;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Test;
import java.io.StringReader;
import java.util.ArrayList;
@@ -156,7 +155,6 @@ public class InsertStmtTest {
@Injectable
Table targetTable;
- @Test
public void testNormal() throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
String sql = "values(1,'a',2,'b')";
@@ -228,7 +226,6 @@ public class InsertStmtTest {
Assert.assertEquals(queryStmtSubstitute.getResultExprs().get(1),
slots.get(0));
}
- @Test
public void testInsertSelect() throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
String sql = "select kk1, kk2, kk3, kk4 from db.tbl";
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 632b598815f..3f7aaa70991 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -2062,8 +2062,8 @@ public class QueryPlanTest extends TestWithFeService {
String explainString = getSQLPlanOrErrorMsg(queryStr);
Assert.assertFalse(explainString.contains("OUTPUT EXPRS:\n 3\n
4"));
System.out.println(explainString);
- Assert.assertTrue(explainString.contains(
- "OUTPUT EXPRS:\n" + " CAST(<slot 4> <slot 2> 3 AS INT)\n" +
" CAST(<slot 5> <slot 3> 4 AS INT)"));
+ Assert.assertTrue(explainString, explainString
+ .contains("OUTPUT EXPRS:\n" + " CAST(`a`.`aid` AS
INT)\n" + " CAST(`b`.`bid` AS INT)"));
}
@Test
diff --git a/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out
b/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out
index fef8545d8c0..f8b29bc36f4 100644
--- a/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out
+++ b/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out
@@ -6,6 +6,8 @@
1 1 1 a
1 2 2 b
100 200 300 lalala
+100 200 300 lalala
+111 -444 -4444 ddd
111 -444 -4444 ddd
-- !select_mv --
@@ -14,6 +16,23 @@
100 200
111 -444
+-- !select_star --
+\N 4 \N d
+\N 4 \N d
+\N 4 \N d
+1 -4 -4 d
+1 -3 \N c
+1 1 1 a
+1 2 2 b
+100 200 300 lalala
+100 200 300 lalala
+100 200 300 lalala
+100 200 300 lalala
+111 -444 -4444 ddd
+111 -444 -4444 ddd
+111 -444 -4444 ddd
+111 -444 -4444 ddd
+
-- !select_mv --
\N \N
1 4
diff --git
a/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy
b/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy
index 90ad5cfb98d..8c8083e3437 100644
--- a/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy
+++ b/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy
@@ -52,6 +52,17 @@ suite ("test_agg_state_max_by") {
time 10000 // limit inflight 10s
}
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into regression_test_mv_p0_agg_state.d_table select
* from http_stream
+ ("format"="csv", "column_separator"=",")
+ """
+ file './test'
+
+ time 10000 // limit inflight 10s
+ }
+
qt_select_star "select * from d_table order by 1,2;"
explain {
sql("select k1,max_by(k2,k3) from d_table group by k1 order by 1,2;")
@@ -67,6 +78,26 @@ suite ("test_agg_state_max_by") {
sql "set enable_nereids_dml = true"
sql "insert into d_table(k4,k2) values('d',4);"
+ streamLoad {
+ table "d_table"
+ set 'column_separator', ','
+ file './test'
+ time 10000 // limit inflight 10s
+ }
+
+ streamLoad {
+ set 'version', '1'
+ set 'sql', """
+ insert into regression_test_mv_p0_agg_state.d_table select
* from http_stream
+ ("format"="csv", "column_separator"=",")
+ """
+ file './test'
+
+ time 10000 // limit inflight 10s
+ }
+
+ qt_select_star "select * from d_table order by 1,2;"
+
explain {
sql("select k1,max_by(k2+k3,abs(k3)) from d_table group by k1 order by
1,2;")
contains "(k1mbcp1)"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]