This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 062c18bcb7b [opt](audit) add many audit info into audit log and table
(#51363)
062c18bcb7b is described below
commit 062c18bcb7bc9b7ae75e8db749bd8b758ebf2abe
Author: morrySnow <[email protected]>
AuthorDate: Thu Jun 19 17:43:14 2025 +0800
[opt](audit) add many audit info into audit log and table (#51363)
### What problem does this PR solve?
Problem Summary:
1. add below audit infos
- queried tables and views
- chosen materialized views
- parse time
- plan times
- get meta times
- schedule times
- hit sql cache
- is internal query
- is handled in fe
- spill info
- variable info
2. fix audit table could not be alter correctly
---
.../java/org/apache/doris/analysis/SelectStmt.java | 2 +-
.../apache/doris/blockrule/SqlBlockRuleMgr.java | 4 +-
.../org/apache/doris/catalog/InternalSchema.java | 142 +++++++++++------
.../doris/catalog/InternalSchemaInitializer.java | 107 +++++++------
.../doris/cloud/CacheHotspotManagerUtils.java | 4 +-
.../doris/common/profile/SummaryProfile.java | 113 +++++++++++++
.../doris/job/extensions/insert/InsertTask.java | 1 +
.../java/org/apache/doris/mtmv/MTMVPlanUtil.java | 1 +
.../org/apache/doris/nereids/NereidsPlanner.java | 5 +
.../exploration/mv/MaterializationContext.java | 29 +++-
.../rules/expression/QueryColumnCollector.java | 2 +-
.../doris/nereids/stats/StatsCalculator.java | 8 +-
.../commands/CreateMaterializedViewCommand.java | 3 +-
.../trees/plans/commands/info/AddColumnsOp.java | 9 ++
.../java/org/apache/doris/plugin/AuditEvent.java | 160 +++++++++++++++----
.../org/apache/doris/plugin/audit/AuditLoader.java | 47 +++++-
.../apache/doris/plugin/audit/AuditLogBuilder.java | 2 +-
.../java/org/apache/doris/qe/AuditLogHelper.java | 177 +++++++++++++++++----
.../java/org/apache/doris/qe/ConnectProcessor.java | 3 +-
.../main/java/org/apache/doris/qe/QueryState.java | 12 +-
.../java/org/apache/doris/qe/SessionVariable.java | 5 -
.../java/org/apache/doris/qe/StmtExecutor.java | 10 +-
.../main/java/org/apache/doris/qe/VariableMgr.java | 9 ++
.../apache/doris/service/FrontendServiceImpl.java | 1 -
.../apache/doris/statistics/OlapAnalysisJob.java | 56 -------
.../apache/doris/statistics/StatisticsCache.java | 6 +-
.../doris/statistics/StatsRecursiveDerive.java | 2 +-
.../doris/statistics/util/StatisticsUtil.java | 4 +-
28 files changed, 665 insertions(+), 259 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index c9da8e48c8b..c64004aab71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -2422,7 +2422,7 @@ public class SelectStmt extends QueryStmt implements
NotFallbackInParser {
strBuilder.append("DISTINCT ");
}
ConnectContext ctx = ConnectContext.get();
- if (ctx == null || ctx.getSessionVariable().internalSession ||
toSQLWithSelectList || resultExprs.isEmpty()) {
+ if (ctx == null || ctx.getState().isInternal() || toSQLWithSelectList
|| resultExprs.isEmpty()) {
for (int i = 0; i < selectList.getItems().size(); i++) {
strBuilder.append(selectList.getItems().get(i).toSql());
strBuilder.append((i + 1 != selectList.getItems().size()) ? ",
" : "");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
index 123da7f9b87..9e04b3c9156 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
@@ -238,7 +238,7 @@ public class SqlBlockRuleMgr implements Writable {
return;
}
if (ConnectContext.get() != null
- && ConnectContext.get().getSessionVariable().internalSession) {
+ && ConnectContext.get().getState().isInternal()) {
return;
}
// match global rule
@@ -277,7 +277,7 @@ public class SqlBlockRuleMgr implements Writable {
**/
public void checkLimitations(Long partitionNum, Long tabletNum, Long
cardinality, String user)
throws AnalysisException {
- if (ConnectContext.get().getSessionVariable().internalSession) {
+ if (ConnectContext.get().getState().isInternal()) {
return;
}
// match global rule
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
index a571334660a..9940bedbe49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
@@ -120,55 +120,103 @@ public class InternalSchema {
HISTO_STATS_SCHEMA.add(
new ColumnDef("update_time",
TypeDef.create(PrimitiveType.DATETIME), ColumnNullableType.NOT_NULLABLE));
- // audit table
+ // audit table must all nullable because maybe remove some columns in
feature
AUDIT_SCHEMA = new ArrayList<>();
- AUDIT_SCHEMA.add(new ColumnDef("query_id", TypeDef.createVarchar(48),
ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("time", TypeDef.createDatetimeV2(3),
ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("client_ip",
TypeDef.createVarchar(128), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("user", TypeDef.createVarchar(128),
ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("catalog", TypeDef.createVarchar(128),
ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("db", TypeDef.createVarchar(128),
ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("state", TypeDef.createVarchar(128),
ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("error_code",
TypeDef.create(PrimitiveType.INT), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA
- .add(new ColumnDef("error_message",
TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA
- .add(new ColumnDef("query_time",
TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA
- .add(new ColumnDef("scan_bytes",
TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("scan_rows",
TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA
- .add(new ColumnDef("return_rows",
TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA
- .add(new ColumnDef("shuffle_send_rows",
TypeDef.create(PrimitiveType.BIGINT),
- ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA
- .add(new ColumnDef("shuffle_send_bytes",
TypeDef.create(PrimitiveType.BIGINT),
- ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA
- .add(new ColumnDef("scan_bytes_from_local_storage",
TypeDef.create(PrimitiveType.BIGINT),
- ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA
- .add(new ColumnDef("scan_bytes_from_remote_storage",
TypeDef.create(PrimitiveType.BIGINT),
- ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("stmt_id",
TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("stmt_type", TypeDef.createVarchar(48),
ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("is_query",
TypeDef.create(PrimitiveType.TINYINT), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(
- new ColumnDef("is_nereids",
TypeDef.create(PrimitiveType.TINYINT), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("frontend_ip",
TypeDef.createVarchar(128), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA
- .add(new ColumnDef("cpu_time_ms",
TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("sql_hash", TypeDef.createVarchar(128),
ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(new ColumnDef("sql_digest",
TypeDef.createVarchar(128), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(
- new ColumnDef("peak_memory_bytes",
TypeDef.create(PrimitiveType.BIGINT), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(
- new ColumnDef("workload_group",
TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE));
- AUDIT_SCHEMA.add(
- new ColumnDef("compute_group",
TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE));
+ // uuid and time
+ AUDIT_SCHEMA.add(new ColumnDef("query_id",
+ TypeDef.createVarchar(48), ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("time",
+ TypeDef.createDatetimeV2(3), ColumnNullableType.NULLABLE));
+ // cs info
+ AUDIT_SCHEMA.add(new ColumnDef("client_ip",
+ TypeDef.createVarchar(128), ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("user",
+ TypeDef.createVarchar(128), ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("frontend_ip",
+ TypeDef.createVarchar(128), ColumnNullableType.NULLABLE));
+ // default ctl and db
+ AUDIT_SCHEMA.add(new ColumnDef("catalog",
+ TypeDef.createVarchar(128), ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("db",
+ TypeDef.createVarchar(128), ColumnNullableType.NULLABLE));
+ // query state
+ AUDIT_SCHEMA.add(new ColumnDef("state",
+ TypeDef.createVarchar(128), ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("error_code",
+ TypeDef.create(PrimitiveType.INT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("error_message",
+ TypeDef.create(PrimitiveType.STRING),
ColumnNullableType.NULLABLE));
+ // execution info
+ AUDIT_SCHEMA.add(new ColumnDef("query_time",
+ TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("cpu_time_ms",
+ TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("peak_memory_bytes",
+ TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("scan_bytes",
+ TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("scan_rows",
+ TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("return_rows",
+ TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_rows",
+ TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_bytes",
+ TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("spill_write_bytes_from_local_storage",
+ TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("spill_read_bytes_from_local_storage",
+ TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("scan_bytes_from_local_storage",
+ TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("scan_bytes_from_remote_storage",
+ TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ // plan info
+ AUDIT_SCHEMA.add(new ColumnDef("parse_time_ms",
+ TypeDef.create(PrimitiveType.INT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("plan_times_ms",
+ new TypeDef(new MapType(ScalarType.STRING, ScalarType.INT)),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("get_meta_times_ms",
+ new TypeDef(new MapType(ScalarType.STRING, ScalarType.INT)),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("schedule_times_ms",
+ new TypeDef(new MapType(ScalarType.STRING, ScalarType.INT)),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("hit_sql_cache",
+ TypeDef.create(PrimitiveType.TINYINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("handled_in_fe",
+ TypeDef.create(PrimitiveType.TINYINT),
ColumnNullableType.NULLABLE));
+ // queried tables, views and m-views
+ AUDIT_SCHEMA.add(new ColumnDef("queried_tables_and_views",
+ new TypeDef(new ArrayType(ScalarType.STRING)),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("chosen_m_views",
+ new TypeDef(new ArrayType(ScalarType.STRING)),
ColumnNullableType.NULLABLE));
+ // variable and configs
+ AUDIT_SCHEMA.add(new ColumnDef("changed_variables",
+ new TypeDef(new MapType(ScalarType.STRING,
ScalarType.STRING)), ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("sql_mode",
+ TypeDef.create(PrimitiveType.STRING),
ColumnNullableType.NULLABLE));
+ // type and digest
+ AUDIT_SCHEMA.add(new ColumnDef("stmt_type",
+ TypeDef.createVarchar(48), ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("stmt_id",
+ TypeDef.create(PrimitiveType.BIGINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("sql_hash",
+ TypeDef.createVarchar(128), ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("sql_digest",
+ TypeDef.createVarchar(128), ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("is_query",
+ TypeDef.create(PrimitiveType.TINYINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("is_nereids",
+ TypeDef.create(PrimitiveType.TINYINT),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("is_internal",
+ TypeDef.create(PrimitiveType.TINYINT),
ColumnNullableType.NULLABLE));
+ // resource
+ AUDIT_SCHEMA.add(new ColumnDef("workload_group",
+ TypeDef.create(PrimitiveType.STRING),
ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("compute_group",
+ TypeDef.create(PrimitiveType.STRING),
ColumnNullableType.NULLABLE));
// Keep stmt as last column. So that in fe.audit.log, it will be
easier to get sql string
- AUDIT_SCHEMA.add(new ColumnDef("stmt",
TypeDef.create(PrimitiveType.STRING), ColumnNullableType.NULLABLE));
+ AUDIT_SCHEMA.add(new ColumnDef("stmt",
+ TypeDef.create(PrimitiveType.STRING),
ColumnNullableType.NULLABLE));
}
// Get copied schema for statistic table
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index a57ae3714dd..933272a5dbc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -21,7 +21,6 @@ import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.ColumnDef;
import org.apache.doris.analysis.ColumnNullableType;
-import org.apache.doris.analysis.ColumnPosition;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DbName;
import org.apache.doris.analysis.DistributionDesc;
@@ -42,7 +41,12 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.ha.FrontendNodeType;
+import org.apache.doris.nereids.trees.plans.commands.AlterTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateDatabaseCommand;
+import org.apache.doris.nereids.trees.plans.commands.info.AddColumnsOp;
+import org.apache.doris.nereids.trees.plans.commands.info.AlterTableOp;
+import org.apache.doris.nereids.trees.plans.commands.info.ReorderColumnsOp;
+import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.plugin.audit.AuditLoader;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.statistics.util.StatisticsUtil;
@@ -58,6 +62,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
public class InternalSchemaInitializer extends Thread {
@@ -333,13 +338,13 @@ public class InternalSchemaInitializer extends Thread {
return false;
}
Database db = optionalDatabase.get();
- Optional<Table> optionalStatsTbl =
db.getTable(StatisticConstants.TABLE_STATISTIC_TBL_NAME);
- if (!optionalStatsTbl.isPresent()) {
+ Optional<Table> optionalTable =
db.getTable(StatisticConstants.TABLE_STATISTIC_TBL_NAME);
+ if (!optionalTable.isPresent()) {
return false;
}
// 2. check statistic tables
- Table statsTbl = optionalStatsTbl.get();
+ Table statsTbl = optionalTable.get();
Optional<Column> optionalColumn =
statsTbl.fullSchema.stream().filter(c ->
c.getName().equals("count")).findFirst();
if (!optionalColumn.isPresent() ||
!optionalColumn.get().isAllowNull()) {
@@ -352,65 +357,69 @@ public class InternalSchemaInitializer extends Thread {
}
return false;
}
- optionalStatsTbl =
db.getTable(StatisticConstants.PARTITION_STATISTIC_TBL_NAME);
- if (!optionalStatsTbl.isPresent()) {
+ optionalTable =
db.getTable(StatisticConstants.PARTITION_STATISTIC_TBL_NAME);
+ if (!optionalTable.isPresent()) {
return false;
}
// 3. check audit table
- optionalStatsTbl = db.getTable(AuditLoader.AUDIT_LOG_TABLE);
- if (!optionalStatsTbl.isPresent()) {
+ optionalTable = db.getTable(AuditLoader.AUDIT_LOG_TABLE);
+ if (!optionalTable.isPresent()) {
return false;
}
// 4. check and update audit table schema
- OlapTable auditTable = (OlapTable) optionalStatsTbl.get();
- List<ColumnDef> expectedSchema = InternalSchema.AUDIT_SCHEMA;
+ OlapTable auditTable = (OlapTable) optionalTable.get();
// 5. check if we need to add new columns
- List<AlterClause> alterClauses = Lists.newArrayList();
- for (int i = 0; i < expectedSchema.size(); i++) {
- ColumnDef def = expectedSchema.get(i);
- if (auditTable.getColumn(def.getName()) == null) {
- // add column if it doesn't exist
- try {
- ColumnDef columnDef = new ColumnDef(def.getName(),
def.getTypeDef(), def.isAllowNull());
- // find the previous column name to determine the position
- String afterColumn = null;
- if (i > 0) {
- for (int j = i - 1; j >= 0; j--) {
- String prevColName =
expectedSchema.get(j).getName();
- if (auditTable.getColumn(prevColName) != null) {
- afterColumn = prevColName;
- break;
- }
- }
- }
- ColumnPosition position = afterColumn == null ?
ColumnPosition.FIRST :
- new ColumnPosition(afterColumn);
- ModifyColumnClause clause = new
ModifyColumnClause(columnDef, position, null,
- Maps.newHashMap());
- clause.setColumn(columnDef.toColumn());
- alterClauses.add(clause);
- } catch (Exception e) {
- LOG.warn("Failed to create alter clause for column: " +
def.getName(), e);
- return false;
- }
- }
+ return alterAuditSchemaIfNeeded(auditTable);
+ }
+
+ private boolean alterAuditSchemaIfNeeded(OlapTable auditTable) {
+ List<ColumnDef> expectedSchema = InternalSchema.AUDIT_SCHEMA;
+ List<String> expectedColumnNames = expectedSchema.stream()
+ .map(ColumnDef::getName)
+ .map(String::toLowerCase)
+ .collect(Collectors.toList());
+ List<Column> currentColumns = auditTable.getBaseSchema();
+ List<String> currentColumnNames = currentColumns.stream()
+ .map(Column::getName)
+ .map(String::toLowerCase)
+ .collect(Collectors.toList());
+ // check if all expected columns are exists and in the right order
+ if (currentColumnNames.size() >= expectedColumnNames.size()
+ && expectedColumnNames.equals(currentColumnNames.subList(0,
expectedColumnNames.size()))) {
+ return true;
}
- // apply schema changes if needed
- if (!alterClauses.isEmpty()) {
- try {
- TableName tableName = new
TableName(InternalCatalog.INTERNAL_CATALOG_NAME,
- FeConstants.INTERNAL_DB_NAME,
AuditLoader.AUDIT_LOG_TABLE);
- AlterTableStmt alterStmt = new AlterTableStmt(tableName,
alterClauses);
- Env.getCurrentEnv().alterTable(alterStmt);
- } catch (Exception e) {
- LOG.warn("Failed to alter audit table schema", e);
- return false;
+ List<AlterTableOp> alterClauses = Lists.newArrayList();
+ // add new columns
+ List<Column> addColumns = Lists.newArrayList();
+ for (ColumnDef expected : expectedSchema) {
+ if
(!currentColumnNames.contains(expected.getName().toLowerCase())) {
+ addColumns.add(new Column(expected.getName(),
expected.getType(), expected.isAllowNull()));
}
}
+ if (!addColumns.isEmpty()) {
+ AddColumnsOp addColumnsOp = new AddColumnsOp(null,
Maps.newHashMap(), addColumns);
+ alterClauses.add(addColumnsOp);
+ }
+ // reorder columns
+ List<String> removedColumnNames =
Lists.newArrayList(currentColumnNames);
+ removedColumnNames.removeAll(expectedColumnNames);
+ List<String> newColumnOrders = Lists.newArrayList(expectedColumnNames);
+ newColumnOrders.addAll(removedColumnNames);
+ ReorderColumnsOp reorderColumnsOp = new
ReorderColumnsOp(newColumnOrders, null, Maps.newHashMap());
+ alterClauses.add(reorderColumnsOp);
+ TableNameInfo auditTableName = new
TableNameInfo(InternalCatalog.INTERNAL_CATALOG_NAME,
+ FeConstants.INTERNAL_DB_NAME, AuditLoader.AUDIT_LOG_TABLE);
+ AlterTableCommand alterTableCommand = new
AlterTableCommand(auditTableName, alterClauses);
+ try {
+ Env.getCurrentEnv().alterTable(alterTableCommand);
+ } catch (Exception e) {
+ LOG.warn("Failed to alter audit table schema", e);
+ return false;
+ }
return true;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManagerUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManagerUtils.java
index 5f1efa77d20..e5b849ec338 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManagerUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManagerUtils.java
@@ -189,7 +189,6 @@ public class CacheHotspotManagerUtils {
public static void execUpdate(String sql) throws Exception {
try (AutoCloseConnectContext r = buildConnectContext()) {
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext,
sql);
- r.connectContext.setExecutor(stmtExecutor);
stmtExecutor.execute();
}
}
@@ -210,7 +209,6 @@ public class CacheHotspotManagerUtils {
try (AutoCloseConnectContext r = buildConnectContext()) {
execCreateDatabase();
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext,
CREATE_CACHE_TABLE);
- r.connectContext.setExecutor(stmtExecutor);
stmtExecutor.execute();
}
Database db =
Env.getCurrentInternalCatalog().getDbNullable(FeConstants.INTERNAL_DB_NAME);
@@ -227,8 +225,8 @@ public class CacheHotspotManagerUtils {
public static AutoCloseConnectContext buildConnectContext() {
ConnectContext connectContext = new ConnectContext();
+ connectContext.getState().setInternal(true);
SessionVariable sessionVariable = connectContext.getSessionVariable();
- sessionVariable.internalSession = true;
//
sessionVariable.setMaxExecMemByte(StatisticConstants.STATISTICS_MAX_MEM_PER_QUERY_IN_BYTES);
sessionVariable.setEnableInsertStrict(true);
sessionVariable.setInsertMaxFilterRatio(1);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index baf011a48d2..83a99d97a13 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -786,6 +786,99 @@ public class SummaryProfile {
}
}
+ public int getParseSqlTimeMs() {
+ return getTimeMs(parseSqlFinishTime, parseSqlStartTime);
+ }
+
+ public int getPlanTimeMs() {
+ return getTimeMs(queryPlanFinishTime, parseSqlFinishTime);
+ }
+
+ public int getNereidsLockTableTimeMs() {
+ return getTimeMs(nereidsLockTableFinishTime, parseSqlFinishTime);
+ }
+
+ public int getNereidsAnalysisTimeMs() {
+ return getTimeMs(nereidsAnalysisFinishTime,
nereidsLockTableFinishTime);
+ }
+
+ public int getNereidsRewriteTimeMs() {
+ return getTimeMs(nereidsRewriteFinishTime, nereidsAnalysisFinishTime);
+ }
+
+ public int getNereidsCollectTablePartitionTimeMs() {
+ return getTimeMs(nereidsCollectTablePartitionFinishTime,
nereidsRewriteFinishTime)
+ + (int) nereidsCollectTablePartitionTime;
+ }
+
+ public int getNereidsOptimizeTimeMs() {
+ return getTimeMs(nereidsOptimizeFinishTime,
nereidsCollectTablePartitionFinishTime);
+ }
+
+ public int getNereidsTranslateTimeMs() {
+ return getTimeMs(nereidsTranslateFinishTime,
nereidsOptimizeFinishTime);
+ }
+
+ public int getNereidsGarbageCollectionTimeMs() {
+ return (int) (nereidsGarbageCollectionTime);
+ }
+
+ public int getNereidsBeFoldConstTimeMs() {
+ return (int) (nereidsBeFoldConstTime);
+ }
+
+ public int getNereidsDistributeTimeMs() {
+ return getTimeMs(nereidsDistributeFinishTime,
nereidsTranslateFinishTime);
+ }
+
+ public int getInitScanNodeTimeMs() {
+ return getTimeMs(initScanNodeFinishTime, initScanNodeStartTime);
+ }
+
+ public int getFinalizeScanNodeTimeMs() {
+ return getTimeMs(finalizeScanNodeFinishTime,
finalizeScanNodeStartTime);
+ }
+
+ public int getCreateScanRangeTimeMs() {
+ return getTimeMs(createScanRangeFinishTime, getSplitsFinishTime);
+ }
+
+ public int getScheduleTimeMs() {
+ return getTimeMs(queryScheduleFinishTime, queryPlanFinishTime);
+ }
+
+ public int getFragmentAssignTimsMs() {
+ return getTimeMs(assignFragmentTime, queryPlanFinishTime);
+ }
+
+ public int getFragmentSerializeTimeMs() {
+ return getTimeMs(fragmentSerializeTime, assignFragmentTime);
+ }
+
+ public int getFragmentRPCPhase1TimeMs() {
+ return getTimeMs(fragmentSendPhase1Time, fragmentSerializeTime);
+ }
+
+ public int getFragmentRPCPhase2TimeMs() {
+ return getTimeMs(fragmentSendPhase2Time, fragmentSendPhase1Time);
+ }
+
+ public double getFragmentCompressedSizeByte() {
+ return fragmentCompressedSize;
+ }
+
+ public long getFragmentRPCCount() {
+ return fragmentRpcCount;
+ }
+
+ public int getTimeMs(long end, long start) {
+ if (end == -1 || start == -1) {
+ return -1;
+ } else {
+ return (int) (end - start);
+ }
+ }
+
public String getPrettyParseSqlTime() {
return getPrettyTime(parseSqlFinishTime, parseSqlStartTime,
TUnit.TIME_MS);
}
@@ -864,6 +957,26 @@ public class SummaryProfile {
return RuntimeProfile.printCounter(getTableVersionCount, TUnit.UNIT);
}
+ public long getGetPartitionVersionTime() {
+ return getPartitionVersionTime;
+ }
+
+ public long getGetPartitionVersionCount() {
+ return getPartitionVersionCount;
+ }
+
+ public long getGetPartitionVersionByHasDataCount() {
+ return getPartitionVersionByHasDataCount;
+ }
+
+ public long getGetTableVersionTime() {
+ return getTableVersionTime;
+ }
+
+ public long getGetTableVersionCount() {
+ return getTableVersionCount;
+ }
+
private String getPrettyTime(long end, long start, TUnit unit) {
if (start == -1 || end == -1) {
return "N/A";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 7790a6f6198..3f2e211be05 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -151,6 +151,7 @@ public class InsertTask extends AbstractTask {
ctx.setQualifiedUser(userIdentity.getQualifiedUser());
ctx.setCurrentUserIdentity(userIdentity);
ctx.getState().reset();
+ ctx.getState().setInternal(true);
ctx.setThreadLocalInfo();
TUniqueId queryId = generateQueryId();
ctx.setQueryId(queryId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
index 2139d2937a3..61e953fa87c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
@@ -89,6 +89,7 @@ public class MTMVPlanUtil {
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setCurrentUserIdentity(UserIdentity.ADMIN);
ctx.getState().reset();
+ ctx.getState().setInternal(true);
ctx.setThreadLocalInfo();
// Debug session variable should be disabled when refreshed
ctx.getSessionVariable().skipDeletePredicate = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index c3594e1e3cc..083f79fe597 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -781,6 +781,7 @@ public class NereidsPlanner extends Planner {
+ parsedPlan.treeString() + "\n\n"
+ "========== LOCK TABLE "
+
getTimeMetricString(SummaryProfile::getPrettyNereidsLockTableTime) + "
==========\n"
+ + "\n\n"
+ "========== ANALYZED PLAN "
+
getTimeMetricString(SummaryProfile::getPrettyNereidsAnalysisTime) + "
==========\n"
+ analyzedPlan.treeString() + "\n\n"
@@ -883,6 +884,10 @@ public class NereidsPlanner extends Planner {
return cascadesContext.getConnectContext();
}
+ public StatementContext getStatementContext() {
+ return statementContext;
+ }
+
public static PhysicalProperties buildInitRequireProperties() {
return PhysicalProperties.GATHER;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
index 46acd9861a4..1ffa3829cad 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
@@ -367,13 +367,14 @@ public abstract class MaterializationContext {
}
/**
- * ToSummaryString, this contains only summary info.
+ * get qualifiers for all mvs rewrite success and chosen by current query.
+ *
+ * @param materializationContexts all mv candidates context for current
query
+ * @param physicalPlan the chosen plan for current query
+ * @return chosen mvs' qualifier set
*/
- public static String toSummaryString(List<MaterializationContext>
materializationContexts,
- Plan physicalPlan) {
- if (materializationContexts.isEmpty()) {
- return "";
- }
+ public static Set<List<String>> getChosenMvsQualifiers(
+ List<MaterializationContext> materializationContexts, Plan
physicalPlan) {
Set<MaterializationContext> rewrittenSuccessMaterializationSet =
materializationContexts.stream()
.filter(MaterializationContext::isSuccess)
.collect(Collectors.toSet());
@@ -389,6 +390,19 @@ public abstract class MaterializationContext {
return null;
}
}, null);
+ return chosenMaterializationQualifiers;
+ }
+
+ /**
+ * ToSummaryString, this contains only summary info.
+ */
+ public static String toSummaryString(List<MaterializationContext>
materializationContexts,
+ Plan physicalPlan) {
+ if (materializationContexts.isEmpty()) {
+ return "";
+ }
+ Set<List<String>> chosenMaterializationQualifiers =
getChosenMvsQualifiers(
+ materializationContexts, physicalPlan);
StringBuilder builder = new StringBuilder();
builder.append("\nMaterializedView");
@@ -403,7 +417,8 @@ public abstract class MaterializationContext {
}
// rewrite success but not chosen
builder.append("\nMaterializedViewRewriteSuccessButNotChose:\n");
- Set<List<String>> rewriteSuccessButNotChoseQualifiers =
rewrittenSuccessMaterializationSet.stream()
+ Set<List<String>> rewriteSuccessButNotChoseQualifiers =
materializationContexts.stream()
+ .filter(MaterializationContext::isSuccess)
.map(MaterializationContext::generateMaterializationIdentifier)
.filter(materializationQualifier ->
!chosenMaterializationQualifiers.contains(materializationQualifier))
.collect(Collectors.toSet());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/QueryColumnCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/QueryColumnCollector.java
index bc1670221a7..e2457759a5d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/QueryColumnCollector.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/QueryColumnCollector.java
@@ -58,7 +58,7 @@ public class QueryColumnCollector extends
DefaultPlanRewriter<CollectorContext>
@Override
public Plan rewriteRoot(Plan plan, JobContext jobContext) {
ConnectContext connectContext = ConnectContext.get();
- if (connectContext != null &&
connectContext.getSessionVariable().internalSession) {
+ if (connectContext != null && connectContext.getState().isInternal()) {
return plan;
}
CollectorContext context = new CollectorContext();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
index 05d98054a4b..d5dc53492c3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
@@ -547,7 +547,7 @@ public class StatsCalculator extends
DefaultPlanVisitor<Statistics, Void> {
// for system table or FeUt, use ColumnStatistic.UNKNOWN
if (StatisticConstants.isSystemTable(olapTable) ||
!FeConstants.enableInternalSchemaDb
|| ConnectContext.get() == null
- || ConnectContext.get().getSessionVariable().internalSession) {
+ || ConnectContext.get().getState().isInternal()) {
for (Slot slot : ((Plan) olapScan).getOutput()) {
builder.putColumnStatistics(slot, ColumnStatistic.UNKNOWN);
}
@@ -1217,7 +1217,7 @@ public class StatsCalculator extends
DefaultPlanVisitor<Statistics, Void> {
private ColumnStatistic getColumnStatistic(TableIf table, String colName,
long idxId) {
ConnectContext connectContext = ConnectContext.get();
- if (connectContext != null &&
connectContext.getSessionVariable().internalSession) {
+ if (connectContext != null && connectContext.getState().isInternal()) {
return ColumnStatistic.UNKNOWN;
}
long catalogId;
@@ -1248,7 +1248,7 @@ public class StatsCalculator extends
DefaultPlanVisitor<Statistics, Void> {
private ColumnStatistic getColumnStatistic(TableIf table, String colName,
long idxId, List<String> partitionNames) {
ConnectContext connectContext = ConnectContext.get();
- if (connectContext != null &&
connectContext.getSessionVariable().internalSession) {
+ if (connectContext != null && connectContext.getState().isInternal()) {
return ColumnStatistic.UNKNOWN;
}
long catalogId;
@@ -1318,7 +1318,7 @@ public class StatsCalculator extends
DefaultPlanVisitor<Statistics, Void> {
// for FeUt, use ColumnStatistic.UNKNOWN
if (!FeConstants.enableInternalSchemaDb
|| ConnectContext.get() == null
- || ConnectContext.get().getSessionVariable().internalSession) {
+ || ConnectContext.get().getState().isInternal()) {
builder.setRowCount(Math.max(1, tableRowCount));
for (Slot slot : catalogRelation.getOutput()) {
builder.putColumnStatistics(slot, ColumnStatistic.UNKNOWN);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java
index 92cf266f584..503d691e481 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java
@@ -242,7 +242,8 @@ public class CreateMaterializedViewCommand extends Command
implements ForwardWit
@Override
public Plan visit(Plan plan, ValidateContext context) {
- throw new AnalysisException(String.format("%s is not supported",
plan.getClass().getSimpleName()));
+ throw new AnalysisException(String.format("%s is not supported in
sync materialized view",
+ plan.getClass().getSimpleName()));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddColumnsOp.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddColumnsOp.java
index 20cb2e98a94..b064fb3d29f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddColumnsOp.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddColumnsOp.java
@@ -30,6 +30,7 @@ import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -51,6 +52,14 @@ public class AddColumnsOp extends AlterTableOp {
this.properties = properties;
}
+ public AddColumnsOp(String rollupName, Map<String, String> properties,
List<Column> columns) {
+ super(AlterOpType.SCHEMA_CHANGE);
+ this.columnDefs = Collections.emptyList();
+ this.rollupName = rollupName;
+ this.properties = properties;
+ this.columns = columns;
+ }
+
public List<Column> getColumns() {
return columns;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 84d58b28d48..49e0cd6ecec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -50,73 +50,116 @@ public class AuditEvent {
// all fields which is about to be audit should be annotated by
"@AuditField"
// make them all "public" so that easy to visit.
+
+ // uuid and time
+ @AuditField(value = "QueryId")
+ public String queryId = "";
@AuditField(value = "Timestamp")
public long timestamp = -1;
+
+ // cs info
@AuditField(value = "Client")
public String clientIp = "";
@AuditField(value = "User")
public String user = "";
+ @AuditField(value = "FeIp")
+ public String feIp = "";
+
+ // default ctl and db
@AuditField(value = "Ctl")
public String ctl = "";
@AuditField(value = "Db")
public String db = "";
- @AuditField(value = "CommandType")
- public String commandType = "";
+
+ // query state
@AuditField(value = "State")
public String state = "";
@AuditField(value = "ErrorCode")
public int errorCode = 0;
@AuditField(value = "ErrorMessage")
public String errorMessage = "";
+
+ // execution info
@AuditField(value = "Time(ms)")
public long queryTime = -1;
+ @AuditField(value = "CpuTimeMS")
+ public long cpuTimeMs = -1;
+ @AuditField(value = "PeakMemoryBytes")
+ public long peakMemoryBytes = -1;
@AuditField(value = "ScanBytes")
public long scanBytes = -1;
@AuditField(value = "ScanRows")
public long scanRows = -1;
@AuditField(value = "ReturnRows")
public long returnRows = -1;
- @AuditField(value = "StmtId")
- public long stmtId = -1;
- @AuditField(value = "QueryId")
- public String queryId = "";
- @AuditField(value = "IsQuery")
- public boolean isQuery = false;
- @AuditField(value = "IsNereids")
- public boolean isNereids = false;
- @AuditField(value = "FeIp")
- public String feIp = "";
- @AuditField(value = "StmtType")
- public String stmtType = "";
- @AuditField(value = "Stmt")
- public String stmt = "";
- @AuditField(value = "CpuTimeMS")
- public long cpuTimeMs = -1;
@AuditField(value = "ShuffleSendBytes")
public long shuffleSendBytes = -1;
@AuditField(value = "ShuffleSendRows")
public long shuffleSendRows = -1;
+ @AuditField(value = "SpillWriteBytesToLocalStorage")
+ public long spillWriteBytesToLocalStorage = -1;
+ @AuditField(value = "SpillReadBytesFromLocalStorage")
+ public long spillReadBytesFromLocalStorage = -1;
+ @AuditField(value = "ScanBytesFromLocalStorage")
+ public long scanBytesFromLocalStorage = -1;
+ @AuditField(value = "ScanBytesFromRemoteStorage")
+ public long scanBytesFromRemoteStorage = -1;
+
+ // plan info
+ @AuditField(value = "ParseTimeMs")
+ public int parseTimeMs = -1;
+ @AuditField(value = "PlanTimesMs")
+ public String planTimesMs = "";
+ @AuditField(value = "GetMetaTimesMs")
+ public String getMetaTimesMs = "";
+ @AuditField(value = "ScheduleTimesMs")
+ public String scheduleTimesMs = "";
+ @AuditField(value = "HitSqlCache")
+ public boolean hitSqlCache = false;
+ @AuditField(value = "isHandledInFe")
+ public boolean isHandledInFe = false;
+
+ // table, view, m-view
+ @AuditField(value = "queriedTablesAndViews")
+ public String queriedTablesAndViews = "";
+ @AuditField(value = "chosenMViews")
+ public String chosenMViews = "";
+
+ // variable and configs
+ @AuditField(value = "ChangedVariables")
+ public String changedVariables = "";
+ @AuditField(value = "FuzzyVariables")
+ public String fuzzyVariables = "";
+ @AuditField(value = "SqlMode")
+ public String sqlMode = "";
+
+ // type and digest
+ @AuditField(value = "CommandType")
+ public String commandType = "";
+ @AuditField(value = "StmtType")
+ public String stmtType = "";
+ @AuditField(value = "StmtId")
+ public long stmtId = -1;
@AuditField(value = "SqlHash")
public String sqlHash = "";
- @AuditField(value = "PeakMemoryBytes")
- public long peakMemoryBytes = -1;
@AuditField(value = "SqlDigest")
public String sqlDigest = "";
+ @AuditField(value = "IsQuery")
+ public boolean isQuery = false;
+ @AuditField(value = "IsNereids")
+ public boolean isNereids = false;
+ @AuditField(value = "IsInternal")
+ public boolean isInternal = false;
+
+ // resource
@AuditField(value = "ComputeGroupName")
public String cloudClusterName = "";
@AuditField(value = "WorkloadGroup")
public String workloadGroup = "";
- // note: newly added fields should be always before fuzzyVariables
- @AuditField(value = "FuzzyVariables")
- public String fuzzyVariables = "";
- @AuditField(value = "ScanBytesFromLocalStorage")
- public long scanBytesFromLocalStorage = -1;
- @AuditField(value = "ScanBytesFromRemoteStorage")
- public long scanBytesFromRemoteStorage = -1;
- @AuditField(value = "SpillWriteBytesToLocalStorage")
- public long spillWriteBytesToLocalStorage = -1;
- @AuditField(value = "SpillReadBytesFromLocalStorage")
- public long spillReadBytesFromLocalStorage = -1;
+
+ // stmt should be last one
+ @AuditField(value = "Stmt")
+ public String stmt = "";
public long pushToAuditLogQueueTime;
@@ -231,6 +274,11 @@ public class AuditEvent {
return this;
}
+ public AuditEventBuilder setisInternal(boolean isInternal) {
+ auditEvent.isInternal = isInternal;
+ return this;
+ }
+
public AuditEventBuilder setFeIp(String feIp) {
auditEvent.feIp = feIp;
return this;
@@ -291,6 +339,56 @@ public class AuditEvent {
return this;
}
+ public AuditEventBuilder setParseTimeMs(int parseTimeMs) {
+ auditEvent.parseTimeMs = parseTimeMs;
+ return this;
+ }
+
+ public AuditEventBuilder setPlanTimesMs(String planTimesMs) {
+ auditEvent.planTimesMs = planTimesMs;
+ return this;
+ }
+
+ public AuditEventBuilder setGetMetaTimeMs(String getMetaTimeMs) {
+ auditEvent.getMetaTimesMs = getMetaTimeMs;
+ return this;
+ }
+
+ public AuditEventBuilder setScheduleTimeMs(String scheduleTimeMs) {
+ auditEvent.scheduleTimesMs = scheduleTimeMs;
+ return this;
+ }
+
+ public AuditEventBuilder setHitSqlCache(boolean hitSqlCache) {
+ auditEvent.hitSqlCache = hitSqlCache;
+ return this;
+ }
+
+ public AuditEventBuilder setHandledInFe(boolean handledInFe) {
+ auditEvent.isHandledInFe = handledInFe;
+ return this;
+ }
+
+ public AuditEventBuilder setChangedVariables(String changedVariables) {
+ auditEvent.changedVariables = changedVariables;
+ return this;
+ }
+
+ public AuditEventBuilder setSqlMode(String sqlMode) {
+ auditEvent.sqlMode = sqlMode;
+ return this;
+ }
+
+ public AuditEventBuilder setQueriedTablesAndViews(String
queriedTablesAndViews) {
+ auditEvent.queriedTablesAndViews = queriedTablesAndViews;
+ return this;
+ }
+
+ public AuditEventBuilder setChosenMViews(String chosenMViews) {
+ auditEvent.chosenMViews = chosenMViews;
+ return this;
+ }
+
public AuditEvent build() {
return this.auditEvent;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index 722ab48669b..c7a1642fa2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -139,34 +139,69 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
// should be same order as InternalSchema.AUDIT_SCHEMA
+
+ // uuid and time
logBuffer.append(event.queryId).append("\t");
logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("\t");
+
+ // cs info
logBuffer.append(event.clientIp).append("\t");
logBuffer.append(event.user).append("\t");
+ logBuffer.append(event.feIp).append("\t");
+
+ // default ctl and db
logBuffer.append(event.ctl).append("\t");
logBuffer.append(event.db).append("\t");
+
+ // query state
logBuffer.append(event.state).append("\t");
logBuffer.append(event.errorCode).append("\t");
logBuffer.append(event.errorMessage).append("\t");
+
+ // execution info
logBuffer.append(event.queryTime).append("\t");
+ logBuffer.append(event.cpuTimeMs).append("\t");
+ logBuffer.append(event.peakMemoryBytes).append("\t");
logBuffer.append(event.scanBytes).append("\t");
logBuffer.append(event.scanRows).append("\t");
logBuffer.append(event.returnRows).append("\t");
logBuffer.append(event.shuffleSendRows).append("\t");
logBuffer.append(event.shuffleSendBytes).append("\t");
+ logBuffer.append(event.spillWriteBytesToLocalStorage).append("\t");
+ logBuffer.append(event.spillReadBytesFromLocalStorage).append("\t");
logBuffer.append(event.scanBytesFromLocalStorage).append("\t");
logBuffer.append(event.scanBytesFromRemoteStorage).append("\t");
- logBuffer.append(event.stmtId).append("\t");
+
+ // plan info
+ logBuffer.append(event.parseTimeMs).append("\t");
+ logBuffer.append(event.planTimesMs).append("\t");
+ logBuffer.append(event.getMetaTimesMs).append("\t");
+ logBuffer.append(event.scheduleTimesMs).append("\t");
+ logBuffer.append(event.hitSqlCache ? 1 : 0).append("\t");
+ logBuffer.append(event.isHandledInFe ? 1 : 0).append("\t");
+
+ // queried tables, views and m-views
+ logBuffer.append(event.queriedTablesAndViews).append("\t");
+ logBuffer.append(event.chosenMViews).append("\t");
+
+ // variable and configs
+ logBuffer.append(event.changedVariables).append("\t");
+ logBuffer.append(event.sqlMode).append("\t");
+
+
+ // type and digest
logBuffer.append(event.stmtType).append("\t");
- logBuffer.append(event.isQuery ? 1 : 0).append("\t");
- logBuffer.append(event.isNereids ? 1 : 0).append("\t");
- logBuffer.append(event.feIp).append("\t");
- logBuffer.append(event.cpuTimeMs).append("\t");
+ logBuffer.append(event.stmtId).append("\t");
logBuffer.append(event.sqlHash).append("\t");
logBuffer.append(event.sqlDigest).append("\t");
- logBuffer.append(event.peakMemoryBytes).append("\t");
+ logBuffer.append(event.isQuery ? 1 : 0).append("\t");
+ logBuffer.append(event.isNereids ? 1 : 0).append("\t");
+ logBuffer.append(event.isInternal ? 1 : 0).append("\t");
+
+ // resource
logBuffer.append(event.workloadGroup).append("\t");
logBuffer.append(event.cloudClusterName).append("\t");
+
// already trim the query in
org.apache.doris.qe.AuditLogHelper#logAuditLog
String stmt = event.stmt;
if (LOG.isDebugEnabled()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
index 4208d5def2e..a6b9062bbea 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
@@ -131,7 +131,7 @@ public class AuditLogBuilder extends Plugin implements
AuditPlugin {
String auditLog = getAuditLogString(event);
AuditLog.getQueryAudit().log(auditLog);
// slow query
- if (event != null && event.queryTime > Config.qe_slow_log_ms) {
+ if (event != null && !event.isInternal && event.queryTime >
Config.qe_slow_log_ms) {
AuditLog.getSlowAudit().log(auditLog);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 27b7c1ba880..e17de8e10ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -28,14 +28,17 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
+import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlCommand;
+import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.rules.exploration.mv.MaterializationContext;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.InlineTable;
import org.apache.doris.nereids.trees.plans.commands.NeedAuditEncryption;
@@ -49,6 +52,7 @@ import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.service.FrontendOptions;
import com.google.common.base.Strings;
+import com.google.common.collect.Sets;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -60,10 +64,20 @@ import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
public class AuditLogHelper {
private static final Logger LOG =
LogManager.getLogger(AuditLogHelper.class);
+ private static final Set<String> LOG_PLAN_INFO_TYPES =
Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
+
+ static {
+ LOG_PLAN_INFO_TYPES.add("SELECT");
+ LOG_PLAN_INFO_TYPES.add("INSERT");
+ LOG_PLAN_INFO_TYPES.add("UPDATE");
+ LOG_PLAN_INFO_TYPES.add("DELETE");
+ }
/**
* Add a new method to wrap original logAuditLog to catch all exceptions.
Because write audit
@@ -197,16 +211,18 @@ public class AuditLogHelper {
LOG.warn("Failed to get cloud cluster", e);
}
String cluster = Config.isCloudMode() ? cloudCluster : "";
+ String stmtType = getStmtType(parsedStmt);
AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder();
// ATTN: MUST reset, otherwise, the same AuditEventBuilder instance
will be used in the next query.
auditEventBuilder.reset();
auditEventBuilder
+ .setEventType(EventType.AFTER_QUERY)
+ .setQueryId(ctx.queryId() == null ? "NaN" :
DebugUtil.printId(ctx.queryId()))
.setTimestamp(ctx.getStartTime())
.setClientIp(ctx.getClientIP())
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
- .setSqlHash(ctx.getSqlHash())
- .setEventType(EventType.AFTER_QUERY)
+ .setFeIp(FrontendOptions.getLocalHostAddress())
.setCtl(catalog == null ?
InternalCatalog.INTERNAL_CATALOG_NAME : catalog.getName())
.setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
.setState(ctx.getState().toString())
@@ -214,25 +230,130 @@ public class AuditLogHelper {
.setErrorMessage((ctx.getState().getErrorMessage() == null ?
"" :
ctx.getState().getErrorMessage().replace("\n", "
").replace("\t", " ")))
.setQueryTime(elapseMs)
+ .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
+ .setPeakMemoryBytes(statistics == null ? 0 :
statistics.getMaxPeakMemoryBytes())
.setScanBytes(statistics == null ? 0 :
statistics.getScanBytes())
.setScanRows(statistics == null ? 0 : statistics.getScanRows())
+ .setReturnRows(ctx.getReturnRows())
.setSpillWriteBytesToLocalStorage(statistics == null ? 0 :
statistics.getSpillWriteBytesToLocalStorage())
.setSpillReadBytesFromLocalStorage(statistics == null ? 0 :
statistics.getSpillReadBytesFromLocalStorage())
- .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
- .setPeakMemoryBytes(statistics == null ? 0 :
statistics.getMaxPeakMemoryBytes())
- .setReturnRows(ctx.getReturnRows())
+ .setScanBytesFromLocalStorage(statistics == null ? 0 :
+ statistics.getScanBytesFromLocalStorage())
+ .setScanBytesFromRemoteStorage(statistics == null ? 0 :
+ statistics.getScanBytesFromRemoteStorage())
+ .setFuzzyVariables(!printFuzzyVariables ? "" :
ctx.getSessionVariable().printFuzzyVariables())
+ .setCommandType(ctx.getCommand().toString())
+ .setStmtType(stmtType)
.setStmtId(ctx.getStmtId())
- .setQueryId(ctx.queryId() == null ? "NaN" :
DebugUtil.printId(ctx.queryId()))
+ .setSqlHash(ctx.getSqlHash())
+ .setIsQuery(ctx.getState().isQuery())
+ .setIsNereids(ctx.getState().isNereids())
+ .setisInternal(ctx.getState().isInternal())
.setCloudCluster(Strings.isNullOrEmpty(cluster) ? "UNKNOWN" :
cluster)
- .setWorkloadGroup(ctx.getWorkloadGroupName())
- .setFuzzyVariables(!printFuzzyVariables ? "" :
ctx.getSessionVariable().printFuzzyVariables())
- .setCommandType(ctx.getCommand().toString());
+ .setWorkloadGroup(ctx.getWorkloadGroupName());
+
+ // sql mode
+ if (ctx.sessionVariable != null) {
+ try {
+
auditEventBuilder.setSqlMode(SqlModeHelper.decode(ctx.sessionVariable.getSqlMode()));
+ } catch (Exception e) {
+ LOG.warn("decode sql mode {} failed.",
ctx.sessionVariable.getSqlMode(), e);
+ }
+ }
+
+ // TODO only for slow query?
+ if (ctx.getExecutor() != null &&
LOG_PLAN_INFO_TYPES.contains(stmtType)) {
+ auditEventBuilder.setHitSqlCache(ctx.getExecutor().isCached());
+
auditEventBuilder.setHandledInFe(ctx.getExecutor().isHandleQueryInFe());
+
+ SummaryProfile summaryProfile =
ctx.getExecutor().getSummaryProfile();
+ // parse time
+
auditEventBuilder.setParseTimeMs(summaryProfile.getParseSqlTimeMs());
+ // plan time
+ String planTimesMs = "{"
+ + "\"plan\"" + ":" + summaryProfile.getPlanTimeMs() + ","
+ + "\"garbage_collect\"" + ":" +
summaryProfile.getNereidsGarbageCollectionTimeMs() + ","
+ + "\"lock_tables\"" + ":" +
summaryProfile.getNereidsLockTableTimeMs() + ","
+ + "\"analyze\"" + ":" +
summaryProfile.getNereidsAnalysisTimeMs() + ","
+ + "\"rewrite\"" + ":" +
summaryProfile.getNereidsRewriteTimeMs() + ","
+ + "\"fold_const_by_be\"" + ":" +
summaryProfile.getNereidsBeFoldConstTimeMs() + ","
+ + "\"collect_partitions\"" + ":" +
summaryProfile.getNereidsCollectTablePartitionTimeMs() + ","
+ + "\"optimize\"" + ":" +
summaryProfile.getNereidsOptimizeTimeMs() + ","
+ + "\"translate\"" + ":" +
summaryProfile.getNereidsTranslateTimeMs() + ","
+ + "\"init_scan_node\"" + ":" +
summaryProfile.getInitScanNodeTimeMs() + ","
+ + "\"finalize_scan_node\"" + ":" +
summaryProfile.getFinalizeScanNodeTimeMs() + ","
+ + "\"create_scan_range\"" + ":" +
summaryProfile.getCreateScanRangeTimeMs() + ","
+ + "\"distribute\"" + ":" +
summaryProfile.getNereidsDistributeTimeMs()
+ + "}";
+ auditEventBuilder.setPlanTimesMs(planTimesMs);
+ // get meta time
+ String metaTimeMs = "{"
+ + "\"get_partition_version_time_ms\"" + ":" +
summaryProfile.getGetPartitionVersionTime() + ","
+ + "\"get_partition_version_count_has_data\""
+ + ":" +
summaryProfile.getGetPartitionVersionByHasDataCount() + ","
+ + "\"get_partition_version_count\"" + ":" +
summaryProfile.getGetPartitionVersionCount() + ","
+ + "\"get_table_version_time_ms\"" + ":" +
summaryProfile.getGetTableVersionTime() + ","
+ + "\"get_table_version_count\"" + ":" +
summaryProfile.getGetTableVersionCount()
+ + "}";
+ auditEventBuilder.setGetMetaTimeMs(metaTimeMs);
+ // schedule time
+ String scheduleTimeMs = "{"
+ + "\"schedule_time_ms\"" + ":" +
summaryProfile.getScheduleTimeMs() + ","
+ + "\"fragment_assign_time_ms\"" + ":" +
summaryProfile.getFragmentAssignTimsMs() + ","
+ + "\"fragment_serialize_time_ms\"" + ":" +
summaryProfile.getFragmentSerializeTimeMs() + ","
+ + "\"fragment_rpc_phase_1_time_ms\"" + ":" +
summaryProfile.getFragmentRPCPhase1TimeMs() + ","
+ + "\"fragment_rpc_phase_2_time_ms\"" + ":" +
summaryProfile.getFragmentRPCPhase2TimeMs() + ","
+ + "\"fragment_compressed_size_byte\"" + ":" +
summaryProfile.getFragmentCompressedSizeByte() + ","
+ + "\"fragment_rpc_count\"" + ":" +
summaryProfile.getFragmentRPCCount()
+ + "}";
+ auditEventBuilder.setScheduleTimeMs(scheduleTimeMs);
+ // changed variables
+ if (ctx.sessionVariable != null) {
+ List<List<String>> changedVars =
VariableMgr.dumpChangedVars(ctx.sessionVariable);
+ StringBuilder changedVarsStr = new StringBuilder();
+ changedVarsStr.append("{");
+ for (int i = 0; i < changedVars.size(); i++) {
+ if (i > 0) {
+ changedVarsStr.append(",");
+ }
+
changedVarsStr.append("\"").append(changedVars.get(i).get(0)).append("\"")
+
.append(":").append("\"").append(changedVars.get(i).get(1)).append("\"");
+ }
+ changedVarsStr.append("}");
+
auditEventBuilder.setChangedVariables(changedVarsStr.toString());
+ }
+
+ if (ctx.getExecutor() != null && ctx.getExecutor().planner() !=
null
+ && ctx.getExecutor().planner() instanceof NereidsPlanner) {
+ // queried tables and views list, in audit log schema, its
data type is array<string>
+ NereidsPlanner nereidsPlanner = (NereidsPlanner)
ctx.getExecutor().planner();
+ String tables = "[" + nereidsPlanner.getStatementContext()
+ .getTables().keySet().stream()
+ .map(list -> "\"" + String.join(".", list) + "\"")
+ .collect(Collectors.joining(",")) + "]";
+ auditEventBuilder.setQueriedTablesAndViews(tables);
+
+ if (nereidsPlanner.getCascadesContext() != null
+ &&
nereidsPlanner.getCascadesContext().getMaterializationContexts() != null
+ && nereidsPlanner.getPhysicalPlan() != null) {
+ Set<List<String>> chosenMvCtx = MaterializationContext
+
.getChosenMvsQualifiers(nereidsPlanner.getCascadesContext()
+ .getMaterializationContexts(),
nereidsPlanner.getPhysicalPlan());
+ String chosenMvsStr = "["
+ + chosenMvCtx.stream()
+ .map(list -> "\"" + String.join(".", list) + "\"")
+ .collect(Collectors.joining(","))
+ + "]";
+ auditEventBuilder.setChosenMViews(chosenMvsStr);
+ }
+ }
+ }
if (ctx.getState().isQuery()) {
if (MetricRepo.isInit) {
- if (!ctx.getSessionVariable().internalSession) {
+ if (!ctx.getState().isInternal()) {
MetricRepo.COUNTER_QUERY_ALL.increase(1L);
MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L);
}
@@ -245,41 +366,32 @@ public class AuditLogHelper {
return;
}
MetricRepo.increaseClusterQueryAll(cloudCluster);
- if (ctx.getState().getStateType() == MysqlStateType.ERR
- && ctx.getState().getErrType() !=
QueryState.ErrType.ANALYSIS_ERR) {
- // err query
- if (!ctx.getSessionVariable().internalSession) {
+ if (!ctx.getState().isInternal()) {
+ if (ctx.getState().getStateType() == MysqlStateType.ERR
+ && ctx.getState().getErrType() !=
QueryState.ErrType.ANALYSIS_ERR) {
+ // err query
MetricRepo.COUNTER_QUERY_ERR.increase(1L);
MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L);
MetricRepo.increaseClusterQueryErr(cloudCluster);
- }
- } else if (ctx.getState().getStateType() == MysqlStateType.OK
- || ctx.getState().getStateType() ==
MysqlStateType.EOF) {
- // ok query
- if (!ctx.getSessionVariable().internalSession) {
+ } else if (ctx.getState().getStateType() ==
MysqlStateType.OK
+ || ctx.getState().getStateType() ==
MysqlStateType.EOF) {
+ // ok query
MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs);
MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs);
MetricRepo.updateClusterQueryLatency(cloudCluster,
elapseMs);
- }
-
- if (elapseMs > Config.qe_slow_log_ms) {
- String sqlDigest = DigestUtils.md5Hex(((Queriable)
parsedStmt).toDigest());
- auditEventBuilder.setSqlDigest(sqlDigest);
- MetricRepo.COUNTER_QUERY_SLOW.increase(1L);
+ if (elapseMs > Config.qe_slow_log_ms) {
+ String sqlDigest = DigestUtils.md5Hex(((Queriable)
parsedStmt).toDigest());
+ auditEventBuilder.setSqlDigest(sqlDigest);
+ MetricRepo.COUNTER_QUERY_SLOW.increase(1L);
+ }
}
}
}
- auditEventBuilder.setIsQuery(true)
- .setScanBytesFromLocalStorage(
+ auditEventBuilder.setScanBytesFromLocalStorage(
statistics == null ? 0 :
statistics.getScanBytesFromLocalStorage())
.setScanBytesFromRemoteStorage(
statistics == null ? 0 :
statistics.getScanBytesFromRemoteStorage());
- } else {
- auditEventBuilder.setIsQuery(false);
}
- auditEventBuilder.setIsNereids(ctx.getState().isNereids);
-
- auditEventBuilder.setFeIp(FrontendOptions.getLocalHostAddress());
boolean isAnalysisErr = ctx.getState().getStateType() ==
MysqlStateType.ERR
&& ctx.getState().getErrType() ==
QueryState.ErrType.ANALYSIS_ERR;
@@ -296,7 +408,6 @@ public class AuditLogHelper {
}
}
auditEventBuilder.setStmt(handleStmt(encryptSql, parsedStmt));
- auditEventBuilder.setStmtType(getStmtType(parsedStmt));
if (!Env.getCurrentEnv().isMaster()) {
if (ctx.executor != null && ctx.executor.isForwardToMaster()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 8a7f9a0adfa..244fdadc8e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -257,7 +257,7 @@ public abstract class ConnectProcessor {
}
public void executeQuery(String originStmt) throws Exception {
- if (MetricRepo.isInit && !ctx.getSessionVariable().internalSession) {
+ if (MetricRepo.isInit && !ctx.getState().isInternal()) {
MetricRepo.COUNTER_REQUEST_ALL.increase(1L);
if (Config.isCloudMode()) {
try {
@@ -630,7 +630,6 @@ public abstract class ConnectProcessor {
// 0 for compatibility.
int idx = request.isSetStmtIdx() ? request.getStmtIdx() : 0;
executor = new StmtExecutor(ctx, new
OriginStatement(request.getSql(), idx), true);
- ctx.setExecutor(executor);
// Set default catalog only if the catalog exists.
if (request.isSetDefaultCatalog()) {
CatalogIf catalog =
ctx.getEnv().getCatalogMgr().getCatalog(request.getDefaultCatalog());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java
index ea8e9510340..f3b9cacb026 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java
@@ -49,7 +49,8 @@ public class QueryState {
private int warningRows = 0;
// make it public for easy to use
public int serverStatus = 0;
- public boolean isNereids = false;
+ private boolean isNereids = false;
+ private boolean isInternal = false;
private ShowResultSet rs = null;
public QueryState() {
@@ -152,6 +153,15 @@ public class QueryState {
return isNereids;
}
+ public boolean isInternal() {
+
+ return isInternal;
+ }
+
+ public void setInternal(boolean internal) {
+ isInternal = internal;
+ }
+
public void setResultSet(ShowResultSet rs) {
this.rs = rs;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 9f8e9bd6917..8f1ff789d5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -411,8 +411,6 @@ public class SessionVariable implements Serializable,
Writable {
public static final String NEREIDS_TRACE_EVENT_MODE =
"nereids_trace_event_mode";
- public static final String INTERNAL_SESSION = "internal_session";
-
public static final String PARTITION_PRUNING_EXPAND_THRESHOLD =
"partition_pruning_expand_threshold";
public static final String ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN
@@ -1726,9 +1724,6 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_ELIMINATE_SORT_NODE)
public boolean enableEliminateSortNode = true;
- @VariableMgr.VarAttr(name = INTERNAL_SESSION)
- public boolean internalSession = false;
-
@VariableMgr.VarAttr(name = PARTITION_PRUNING_EXPAND_THRESHOLD, fuzzy =
true)
public int partitionPruningExpandThreshold = 10;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 60b1b152afd..c6f8968b7e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -305,6 +305,9 @@ public class StmtExecutor {
public StmtExecutor(ConnectContext context, OriginStatement originStmt,
boolean isProxy) {
Preconditions.checkState(context.getConnectType().equals(ConnectType.MYSQL));
this.context = context;
+ if (context != null) {
+ context.setExecutor(this);
+ }
this.originStmt = originStmt;
this.serializer = context.getMysqlChannel().getSerializer();
this.isProxy = isProxy;
@@ -414,7 +417,7 @@ public class StmtExecutor {
.collect(Collectors.joining(",")));
builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.getParallelExecInstanceNum()));
builder.traceId(context.getSessionVariable().getTraceId());
- builder.isNereids(context.getState().isNereids ? "Yes" : "No");
+ builder.isNereids(context.getState().isNereids() ? "Yes" : "No");
return builder.build();
}
@@ -546,6 +549,10 @@ public class StmtExecutor {
return isHandleQueryInFe;
}
+ public boolean isCached() {
+ return isCached;
+ }
+
// query with a random sql
public void execute() throws Exception {
UUID uuid = UUID.randomUUID();
@@ -3451,6 +3458,7 @@ public class StmtExecutor {
+ " but parsedStmt is " +
parsedStmt.getClass().getName());
context.getState().setNereids(true);
context.getState().setIsQuery(true);
+ context.getState().setInternal(true);
planner = new NereidsPlanner(statementContext);
planner.plan(parsedStmt,
context.getSessionVariable().toThrift());
} catch (Exception e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
index b49133a6b7f..2edf541282a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java
@@ -802,6 +802,15 @@ public class VariableMgr {
try {
for (Map.Entry<String, VarContext> entry :
ctxByDisplayVarName.entrySet()) {
VarContext ctx = entry.getValue();
+ VarAttr varAttr = ctx.getField().getAnnotation(VarAttr.class);
+ // not show removed variables
+ if (VariableAnnotation.REMOVED.equals(varAttr.varType())) {
+ continue;
+ }
+ // not show invisible variables
+ if ((VariableMgr.INVISIBLE & varAttr.flag()) != 0) {
+ continue;
+ }
List<String> row = Lists.newArrayList();
String varName = entry.getKey();
String curValue = getValue(sessionVar, ctx.getField());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index e9948bc9496..1f03e5a84fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2133,7 +2133,6 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
LOG.info("block initHttpStreamPlan");
}
StmtExecutor executor = new StmtExecutor(ctx, originStmt);
- ctx.setExecutor(executor);
httpStreamParams = executor.generateHttpStreamPlan(ctx.queryId());
Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisJob.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisJob.java
deleted file mode 100644
index 877a4f5bd09..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisJob.java
+++ /dev/null
@@ -1,56 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import java.util.List;
-
-public class OlapAnalysisJob {
-
-
-
- private List<String> columns;
-
- private static String collectPartionStatsSQLTemplate =
- " SELECT "
- + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-',
${partId}) AS id, "
- + "${catalogId} AS catalog_id, "
- + "${dbId} AS db_id, "
- + "${tblId} AS tbl_id, "
- + "${idxId} AS idx_id, "
- + "'${colId}' AS col_id, "
- + "${partId} AS part_id, "
- + "COUNT(1) AS row_count, "
- + "NDV(`${colName}`) AS ndv, "
- + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END)
AS null_count, "
- + "MIN(`${colName}`) AS min, "
- + "MAX(`${colName}`) AS max, "
- + "${dataSizeFunction} AS data_size, "
- + "NOW() ";
-
-
- protected void beforeExecution() {
- }
-
- public void execute() {
- }
-
- protected void afterExecution() {
-
- }
-
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
index 88d67f0d447..568cceed5a3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
@@ -90,7 +90,7 @@ public class StatisticsCache {
public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long
tblId, long idxId, String colName) {
ConnectContext ctx = ConnectContext.get();
- if (ctx != null && ctx.getSessionVariable().internalSession) {
+ if (ctx != null && ctx.getState().isInternal()) {
return ColumnStatistic.UNKNOWN;
}
// Need to change base index id to -1 for OlapTable.
@@ -114,7 +114,7 @@ public class StatisticsCache {
public PartitionColumnStatistic getPartitionColumnStatistics(long
catalogId, long dbId, long tblId, long idxId,
String partName, String
colName) {
ConnectContext ctx = ConnectContext.get();
- if (ctx != null && ctx.getSessionVariable().internalSession) {
+ if (ctx != null && ctx.getState().isInternal()) {
return PartitionColumnStatistic.UNKNOWN;
}
// Need to change base index id to -1 for OlapTable.
@@ -157,7 +157,7 @@ public class StatisticsCache {
private Optional<Histogram> getHistogram(long ctlId, long dbId, long
tblId, long idxId, String colName) {
ConnectContext ctx = ConnectContext.get();
- if (ctx != null && ctx.getSessionVariable().internalSession) {
+ if (ctx != null && ctx.getState().isInternal()) {
return Optional.empty();
}
StatisticsCacheKey k = new StatisticsCacheKey(ctlId, dbId, tblId,
idxId, colName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsRecursiveDerive.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsRecursiveDerive.java
index 0dee7932192..bdeeaf610dd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsRecursiveDerive.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsRecursiveDerive.java
@@ -39,7 +39,7 @@ public class StatsRecursiveDerive {
* which will store the derivation result of statistical information in
the corresponding node
*/
public void statsRecursiveDerive(PlanNode node) throws UserException {
- if (ConnectContext.get().getSessionVariable().internalSession) {
+ if (ConnectContext.get().getState().isInternal()) {
node.setStatsDeriveResult(new StatsDeriveResult(0));
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index f8d8daec8b1..b0e5c96b19a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -165,7 +165,6 @@ public class StatisticsUtil {
}
}
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext,
sql);
- r.connectContext.setExecutor(stmtExecutor);
return stmtExecutor.executeInternalQuery();
}
}
@@ -175,7 +174,6 @@ public class StatisticsUtil {
AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false);
try {
stmtExecutor = new StmtExecutor(r.connectContext, sql);
- r.connectContext.setExecutor(stmtExecutor);
stmtExecutor.execute();
QueryState state = r.connectContext.getState();
if (state.getStateType().equals(QueryState.MysqlStateType.ERR)) {
@@ -212,8 +210,8 @@ public class StatisticsUtil {
public static AutoCloseConnectContext buildConnectContext(boolean
useFileCacheForStat) {
ConnectContext connectContext = new ConnectContext();
+ connectContext.getState().setInternal(true);
SessionVariable sessionVariable = connectContext.getSessionVariable();
- sessionVariable.internalSession = true;
sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes);
sessionVariable.cpuResourceLimit =
Config.cpu_resource_limit_per_analyze_task;
sessionVariable.setEnableInsertStrict(true);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]