This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6297ef10e9 [enhancement](plugin) import audit logs for slow queries
into a separate table (#14100)
6297ef10e9 is described below
commit 6297ef10e90dee1403d03b2d146acef486696a2a
Author: yongjinhou <[email protected]>
AuthorDate: Fri Nov 11 09:06:01 2022 +0800
[enhancement](plugin) import audit logs for slow queries into a separate
table (#14100)
* import audit logs for slow queries into a separate table
---
docs/en/docs/ecosystem/audit-plugin.md | 42 +++++++-
docs/zh-CN/docs/ecosystem/audit-plugin.md | 40 ++++++-
.../auditloader/src/main/assembly/plugin.conf | 10 +-
.../doris/plugin/audit/AuditLoaderPlugin.java | 118 ++++++++++++++-------
.../doris/plugin/audit/DorisStreamLoader.java | 24 +++--
5 files changed, 183 insertions(+), 51 deletions(-)
diff --git a/docs/en/docs/ecosystem/audit-plugin.md
b/docs/en/docs/ecosystem/audit-plugin.md
index 559656455c..2e533d07c7 100644
--- a/docs/en/docs/ecosystem/audit-plugin.md
+++ b/docs/en/docs/ecosystem/audit-plugin.md
@@ -52,10 +52,46 @@ You can place this file on an http download server or
copy(or unzip) it to the s
### Installation
-After deployment is complete, and before installing the plugin, you need to
create the audit database and tables previously specified in `plugin.conf`. The
table creation statement is as follows:
+After deployment is complete, and before installing the plugin, you need to
create the audit database and tables previously specified in `plugin.conf`. The
database and table creation statement is as follows:
```
-create table doris_audit_tbl__
+create database doris_audit_db__;
+
+create table doris_audit_db__.doris_audit_log_tbl__
+(
+ query_id varchar(48) comment "Unique query id",
+ `time` datetime not null comment "Query start time",
+ client_ip varchar(32) comment "Client IP",
+ user varchar(64) comment "User name",
+ db varchar(96) comment "Database of this query",
+ state varchar(8) comment "Query result state. EOF, ERR, OK",
+ query_time bigint comment "Query execution time in millisecond",
+ scan_bytes bigint comment "Total scan bytes of this query",
+ scan_rows bigint comment "Total scan rows of this query",
+ return_rows bigint comment "Returned rows of this query",
+ stmt_id int comment "An incremental id of statement",
+ is_query tinyint comment "Is this statemt a query. 1 or 0",
+ frontend_ip varchar(32) comment "Frontend ip of executing this statement",
+ cpu_time_ms bigint comment "Total scan cpu time in millisecond of this
query",
+ sql_hash varchar(48) comment "Hash value for this query",
+ sql_digest varchar(48) comment "Sql digest for this query",
+ peak_memory_bytes bigint comment "Peak memory bytes used on all backends
of this query",
+ stmt string comment "The original statement, trimed if longer than 2G "
+) engine=OLAP
+duplicate key(query_id, `time`, client_ip)
+partition by range(`time`) ()
+distributed by hash(query_id) buckets 1
+properties(
+ "dynamic_partition.time_unit" = "DAY",
+ "dynamic_partition.start" = "-30",
+ "dynamic_partition.end" = "3",
+ "dynamic_partition.prefix" = "p",
+ "dynamic_partition.buckets" = "1",
+ "dynamic_partition.enable" = "true",
+ "replication_num" = "3"
+);
+
+create table doris_audit_db__.doris_slow_log_tbl__
(
query_id varchar(48) comment "Unique query id",
`time` datetime not null comment "Query start time",
@@ -71,7 +107,7 @@ create table doris_audit_tbl__
is_query tinyint comment "Is this statemt a query. 1 or 0",
frontend_ip varchar(32) comment "Frontend ip of executing this statement",
cpu_time_ms bigint comment "Total scan cpu time in millisecond of this
query",
- sql_hash varchar(50) comment "Hash value for this query",
+ sql_hash varchar(48) comment "Hash value for this query",
sql_digest varchar(48) comment "Sql digest for this query",
peak_memory_bytes bigint comment "Peak memory bytes used on all backends
of this query",
stmt string comment "The original statement, trimed if longer than 2G"
diff --git a/docs/zh-CN/docs/ecosystem/audit-plugin.md
b/docs/zh-CN/docs/ecosystem/audit-plugin.md
index e363f50413..aa1d752428 100644
--- a/docs/zh-CN/docs/ecosystem/audit-plugin.md
+++ b/docs/zh-CN/docs/ecosystem/audit-plugin.md
@@ -52,10 +52,46 @@ auditloader
plugin的配置位于`${DORIS}/fe_plugins/auditloader/src/main/assem
### 安装
-部署完成后,安装插件前,需要创建之前在 `plugin.conf` 中指定的审计数据库和表。其中建表语句如下:
+部署完成后,安装插件前,需要创建之前在 `plugin.conf` 中指定的审计数据库和表。其中建库与建表语句如下:
```
-create table doris_audit_tbl__
+create database doris_audit_db__;
+
+create table doris_audit_db__.doris_audit_log_tbl__
+(
+ query_id varchar(48) comment "Unique query id",
+ `time` datetime not null comment "Query start time",
+ client_ip varchar(32) comment "Client IP",
+ user varchar(64) comment "User name",
+ db varchar(96) comment "Database of this query",
+ state varchar(8) comment "Query result state. EOF, ERR, OK",
+ query_time bigint comment "Query execution time in millisecond",
+ scan_bytes bigint comment "Total scan bytes of this query",
+ scan_rows bigint comment "Total scan rows of this query",
+ return_rows bigint comment "Returned rows of this query",
+ stmt_id int comment "An incremental id of statement",
+ is_query tinyint comment "Is this statemt a query. 1 or 0",
+ frontend_ip varchar(32) comment "Frontend ip of executing this statement",
+ cpu_time_ms bigint comment "Total scan cpu time in millisecond of this
query",
+ sql_hash varchar(48) comment "Hash value for this query",
+ sql_digest varchar(48) comment "Sql digest for this query",
+ peak_memory_bytes bigint comment "Peak memory bytes used on all backends
of this query",
+ stmt string comment "The original statement, trimed if longer than 2G"
+) engine=OLAP
+duplicate key(query_id, `time`, client_ip)
+partition by range(`time`) ()
+distributed by hash(query_id) buckets 1
+properties(
+ "dynamic_partition.time_unit" = "DAY",
+ "dynamic_partition.start" = "-30",
+ "dynamic_partition.end" = "3",
+ "dynamic_partition.prefix" = "p",
+ "dynamic_partition.buckets" = "1",
+ "dynamic_partition.enable" = "true",
+ "replication_num" = "3"
+);
+
+create table doris_audit_db__.doris_slow_log_tbl__
(
query_id varchar(48) comment "Unique query id",
`time` datetime not null comment "Query start time",
diff --git a/fe_plugins/auditloader/src/main/assembly/plugin.conf
b/fe_plugins/auditloader/src/main/assembly/plugin.conf
index 14fdd32b98..31f7bd3f35 100755
--- a/fe_plugins/auditloader/src/main/assembly/plugin.conf
+++ b/fe_plugins/auditloader/src/main/assembly/plugin.conf
@@ -36,8 +36,14 @@ frontend_host_port=127.0.0.1:8030
# Database of the audit table
database=doris_audit_db__
-# Audit table name, to save the audit data.
-table=doris_audit_tbl__
+# Audit table name, to save the audit log data.
+audit_log_table=doris_audit_log_tbl__
+
+# Audit table name, to save the slow log data.
+slow_log_table=doris_slow_log_tbl__
+
+# Whether import slow logs into a separate slow table, default is false
+enable_slow_log=false
# Doris user. This user must have LOAD_PRIV to the audit table.
user=root
diff --git
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 9497af68ee..992b35f686 100755
---
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -24,6 +24,7 @@ import org.apache.doris.plugin.Plugin;
import org.apache.doris.plugin.PluginContext;
import org.apache.doris.plugin.PluginException;
import org.apache.doris.plugin.PluginInfo;
+import org.apache.doris.common.Config;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -56,8 +57,10 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
private static final ThreadLocal<SimpleDateFormat> dateFormatContainer =
ThreadLocal.withInitial(
() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
- private StringBuilder auditBuffer = new StringBuilder();
- private long lastLoadTime = 0;
+ private StringBuilder auditLogBuffer = new StringBuilder();
+ private StringBuilder slowLogBuffer = new StringBuilder();
+ private long lastLoadTimeAuditLog = 0;
+ private long lastLoadTimeSlowLog = 0;
private BlockingQueue<AuditEvent> auditEventQueue;
private DorisStreamLoader streamLoader;
@@ -75,7 +78,8 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
if (isInit) {
return;
}
- this.lastLoadTime = System.currentTimeMillis();
+ this.lastLoadTimeAuditLog = System.currentTimeMillis();
+ this.lastLoadTimeSlowLog = System.currentTimeMillis();
loadConfig(ctx, info.getProperties());
@@ -146,28 +150,35 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
}
private void assembleAudit(AuditEvent event) {
- auditBuffer.append(event.queryId).append("\t");
- auditBuffer.append(longToTimeString(event.timestamp)).append("\t");
- auditBuffer.append(event.clientIp).append("\t");
- auditBuffer.append(event.user).append("\t");
- auditBuffer.append(event.db).append("\t");
- auditBuffer.append(event.state).append("\t");
- auditBuffer.append(event.queryTime).append("\t");
- auditBuffer.append(event.scanBytes).append("\t");
- auditBuffer.append(event.scanRows).append("\t");
- auditBuffer.append(event.returnRows).append("\t");
- auditBuffer.append(event.stmtId).append("\t");
- auditBuffer.append(event.isQuery ? 1 : 0).append("\t");
- auditBuffer.append(event.feIp).append("\t");
- auditBuffer.append(event.cpuTimeMs).append("\t");
- auditBuffer.append(event.sqlHash).append("\t");
- auditBuffer.append(event.sqlDigest).append("\t");
- auditBuffer.append(event.peakMemoryBytes).append("\t");
+ if (conf.enableSlowLog && event.queryTime > Config.qe_slow_log_ms) {
+ fillLogBuffer(event, slowLogBuffer);
+ }
+ fillLogBuffer(event, auditLogBuffer);
+ }
+
+ private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
+ logBuffer.append(event.queryId).append("\t");
+ logBuffer.append(longToTimeString(event.timestamp)).append("\t");
+ logBuffer.append(event.clientIp).append("\t");
+ logBuffer.append(event.user).append("\t");
+ logBuffer.append(event.db).append("\t");
+ logBuffer.append(event.state).append("\t");
+ logBuffer.append(event.queryTime).append("\t");
+ logBuffer.append(event.scanBytes).append("\t");
+ logBuffer.append(event.scanRows).append("\t");
+ logBuffer.append(event.returnRows).append("\t");
+ logBuffer.append(event.stmtId).append("\t");
+ logBuffer.append(event.isQuery ? 1 : 0).append("\t");
+ logBuffer.append(event.feIp).append("\t");
+ logBuffer.append(event.cpuTimeMs).append("\t");
+ logBuffer.append(event.sqlHash).append("\t");
+ logBuffer.append(event.sqlDigest).append("\t");
+ logBuffer.append(event.peakMemoryBytes).append("\t");
// trim the query to avoid too long
// use `getBytes().length` to get real byte length
String stmt = truncateByBytes(event.stmt).replace("\n", "
").replace("\t", " ");
LOG.debug("receive audit event with stmt: {}", stmt);
- auditBuffer.append(stmt).append("\n");
+ logBuffer.append(stmt).append("\n");
}
private String truncateByBytes(String str) {
@@ -186,21 +197,34 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
return new String(charBuffer.array(), 0, charBuffer.position());
}
- private void loadIfNecessary(DorisStreamLoader loader) {
- if (auditBuffer.length() < conf.maxBatchSize &&
System.currentTimeMillis() - lastLoadTime < conf.maxBatchIntervalSec * 1000) {
- return;
+ private void loadIfNecessary(DorisStreamLoader loader, boolean slowLog) {
+ StringBuilder logBuffer = slowLog ? slowLogBuffer : auditLogBuffer;
+ long lastLoadTime = slowLog ? lastLoadTimeSlowLog :
lastLoadTimeAuditLog;
+ long currentTime = System.currentTimeMillis();
+
+ if (logBuffer.length() >= conf.maxBatchSize || currentTime -
lastLoadTime >= conf.maxBatchIntervalSec * 1000) {
+ // begin to load
+ try {
+ DorisStreamLoader.LoadResponse response =
loader.loadBatch(logBuffer, slowLog);
+ LOG.debug("audit loader response: {}", response);
+ } catch (Exception e) {
+ LOG.debug("encounter exception when putting current audit
batch, discard current batch", e);
+ } finally {
+ // make a new string builder to receive following events.
+ resetLogBufferAndLastLoadTime(currentTime, slowLog);
+ }
}
- lastLoadTime = System.currentTimeMillis();
- // begin to load
- try {
- DorisStreamLoader.LoadResponse response =
loader.loadBatch(auditBuffer);
- LOG.debug("audit loader response: {}", response);
- } catch (Exception e) {
- LOG.debug("encounter exception when putting current audit batch,
discard current batch", e);
- } finally {
- // make a new string builder to receive following events.
- this.auditBuffer = new StringBuilder();
+ return;
+ }
+
+ private void resetLogBufferAndLastLoadTime(long currentTime, boolean
slowLog) {
+ if (slowLog) {
+ this.slowLogBuffer = new StringBuilder();
+ lastLoadTimeSlowLog = currentTime;
+ } else {
+ this.auditLogBuffer = new StringBuilder();
+ lastLoadTimeAuditLog = currentTime;
}
return;
@@ -215,6 +239,9 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
public static final String PROP_PASSWORD = "password";
public static final String PROP_DATABASE = "database";
public static final String PROP_TABLE = "table";
+ public static final String PROP_AUDIT_LOG_TABLE = "audit_log_table";
+ public static final String PROP_SLOW_LOG_TABLE = "slow_log_table";
+ public static final String PROP_ENABLE_SLOW_LOG = "enable_slow_log";
// the max stmt length to be loaded in audit table.
public static final String MAX_STMT_LENGTH = "max_stmt_length";
@@ -225,7 +252,9 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
public String user = "root";
public String password = "";
public String database = "doris_audit_db__";
- public String table = "doris_audit_tbl__";
+ public String auditLogTable = "doris_audit_log_tbl__";
+ public String slowLogTable = "doris_slow_log_tbl__";
+ public boolean enableSlowLog = false;
// the identity of FE which run this plugin
public String feIdentity = "";
public int max_stmt_length = 4096;
@@ -253,8 +282,18 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
if (properties.containsKey(PROP_DATABASE)) {
database = properties.get(PROP_DATABASE);
}
+ // If plugin.conf is not changed, the audit logs are imported
to previous table
if (properties.containsKey(PROP_TABLE)) {
- table = properties.get(PROP_TABLE);
+ auditLogTable = properties.get(PROP_TABLE);
+ }
+ if (properties.containsKey(PROP_AUDIT_LOG_TABLE)) {
+ auditLogTable = properties.get(PROP_AUDIT_LOG_TABLE);
+ }
+ if (properties.containsKey(PROP_SLOW_LOG_TABLE)) {
+ slowLogTable = properties.get(PROP_SLOW_LOG_TABLE);
+ }
+ if (properties.containsKey(PROP_ENABLE_SLOW_LOG)) {
+ enableSlowLog =
Boolean.valueOf(properties.get(PROP_ENABLE_SLOW_LOG));
}
if (properties.containsKey(MAX_STMT_LENGTH)) {
max_stmt_length =
Integer.parseInt(properties.get(MAX_STMT_LENGTH));
@@ -278,7 +317,12 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
AuditEvent event = auditEventQueue.poll(5,
TimeUnit.SECONDS);
if (event != null) {
assembleAudit(event);
- loadIfNecessary(loader);
+ // process slow audit logs
+ if (conf.enableSlowLog) {
+ loadIfNecessary(loader, true);
+ }
+ // process all audit logs
+ loadIfNecessary(loader, false);
}
} catch (InterruptedException ie) {
LOG.debug("encounter exception when loading current audit
batch", ie);
diff --git
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
index 28763bbdca..844ca04892 100644
---
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
+++
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
@@ -38,21 +38,25 @@ public class DorisStreamLoader {
private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
private String hostPort;
private String db;
- private String tbl;
+ private String auditLogTbl;
+ private String slowLogTbl;
private String user;
private String passwd;
- private String loadUrlStr;
+ private String auditLogLoadUrlStr;
+ private String slowLogLoadUrlStr;
private String authEncoding;
private String feIdentity;
public DorisStreamLoader(AuditLoaderPlugin.AuditLoaderConf conf) {
this.hostPort = conf.frontendHostPort;
this.db = conf.database;
- this.tbl = conf.table;
+ this.auditLogTbl = conf.auditLogTable;
+ this.slowLogTbl = conf.slowLogTable;
this.user = conf.user;
this.passwd = conf.password;
- this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
+ this.auditLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db,
auditLogTbl);
+ this.slowLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db,
slowLogTbl);
this.authEncoding =
Base64.getEncoder().encodeToString(String.format("%s:%s", user,
passwd).getBytes(StandardCharsets.UTF_8));
// currently, FE identity is FE's IP, so we replace the "." in IP to
make it suitable for label
this.feIdentity = conf.feIdentity.replaceAll("\\.", "_");
@@ -112,9 +116,9 @@ public class DorisStreamLoader {
return response.toString();
}
- public LoadResponse loadBatch(StringBuilder sb) {
+ public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) {
Calendar calendar = Calendar.getInstance();
- String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
+ String label = String.format("_log_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1,
calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY),
calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
feIdentity);
@@ -123,7 +127,13 @@ public class DorisStreamLoader {
HttpURLConnection beConn = null;
try {
// build request and send to fe
- feConn = getConnection(loadUrlStr, label);
+ if (slowLog) {
+ label = "slow" + label;
+ feConn = getConnection(slowLogLoadUrlStr, label);
+ } else {
+ label = "audit" + label;
+ feConn = getConnection(auditLogLoadUrlStr, label);
+ }
int status = feConn.getResponseCode();
// fe send back http response code TEMPORARY_REDIRECT 307 and new
be location
if (status != 307) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]