This is an automated email from the ASF dual-hosted git repository. caiconghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bb334de00f [enhancement](load) Change transaction limit from global level to db level (#15830) bb334de00f is described below commit bb334de00ff79cfed2834401fc5b3e2000289605 Author: Henry2SS <45096548+henry...@users.noreply.github.com> AuthorDate: Wed Feb 8 18:04:26 2023 +0800 [enhancement](load) Change transaction limit from global level to db level (#15830) Add transaction size quota for database Co-authored-by: wuhangze <wuhan...@jd.com> --- docs/en/docs/admin-manual/config/fe-config.md | 22 +++++++++++++++ docs/zh-CN/docs/admin-manual/config/fe-config.md | 22 +++++++++++++++ .../main/java/org/apache/doris/common/Config.java | 6 +++++ fe/fe-core/src/main/cup/sql_parser.cup | 4 +++ .../doris/analysis/AlterDatabaseQuotaStmt.java | 7 +++-- .../org/apache/doris/analysis/ShowDataStmt.java | 6 +++++ .../java/org/apache/doris/catalog/Database.java | 31 ++++++++++++++++++++++ .../org/apache/doris/common/proc/DbsProcDir.java | 3 +++ .../org/apache/doris/common/util/ParseUtil.java | 13 +++++++++ .../apache/doris/datasource/InternalCatalog.java | 4 +++ .../doris/load/sync/canal/CanalSyncChannel.java | 5 ++-- .../doris/transaction/DatabaseTransactionMgr.java | 9 +++---- .../doris/analysis/AlterDatabaseQuotaStmtTest.java | 23 ++++++++++++++++ .../apache/doris/common/proc/DbsProcDirTest.java | 8 +++--- 14 files changed, 150 insertions(+), 13 deletions(-) diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index c23db57284..8586269220 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -2593,3 +2593,25 @@ MasterOnly:true Maximum number of error tablet showed in broker load. +#### `default_db_max_running_txn_num` + +Default:-1 + +IsMutable:true + +MasterOnly:true + +Used to set the default database transaction quota size. + +The default value setting to -1 means using `max_running_txn_num_per_db` instead of `default_db_max_running_txn_num`. + +To set the quota size of a single database, you can use: + +``` +Set the database transaction quota +ALTER DATABASE db_name SET TRANSACTION QUOTA quota; +View configuration +show data (Detail:HELP SHOW DATA) +``` + + diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index 59a5dcd031..19b3a64e89 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -2592,3 +2592,25 @@ SmallFileMgr 中存储的最大文件数 是否为 Master FE 节点独有的配置项:true broker load job 保存的失败tablet 信息的最大数量 + +#### `default_db_max_running_txn_num` + +默认值:-1 + +是否可以动态配置:true + +是否为 Master FE 节点独有的配置项:true + +用于设置默认数据库事务配额大小。 + +默认值设置为 -1 意味着使用 `max_running_txn_num_per_db` 而不是 `default_db_max_running_txn_num`。 + +设置单个数据库的配额大小可以使用: + +``` +设置数据库事务量配额 +ALTER DATABASE db_name SET TRANSACTION QUOTA quota; +查看配置 +show data (其他用法:HELP SHOW DATA) +``` + diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 5ea73a3ddb..6edbc5a160 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1985,5 +1985,11 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true) public static int pull_request_id = 0; + + /** + * Used to set default db transaction quota num. + */ + @ConfField(mutable = true, masterOnly = true) + public static long default_db_max_running_txn_num = -1; } diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 9e873edca4..5f727df186 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -1271,6 +1271,10 @@ alter_stmt ::= {: RESULT = new AlterDatabaseQuotaStmt(dbName, QuotaType.REPLICA, String.valueOf(number)); :} + | KW_ALTER KW_DATABASE ident:dbName KW_SET KW_TRANSACTION KW_QUOTA INTEGER_LITERAL:number + {: + RESULT = new AlterDatabaseQuotaStmt(dbName, QuotaType.TRANSACTION, String.valueOf(number)); + :} | KW_ALTER KW_DATABASE ident:dbName KW_RENAME ident:newDbName {: RESULT = new AlterDatabaseRename(dbName, newDbName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java index c2cb0fbb0a..90c01016e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java @@ -37,7 +37,8 @@ public class AlterDatabaseQuotaStmt extends DdlStmt { public enum QuotaType { NONE, DATA, - REPLICA + REPLICA, + TRANSACTION } public AlterDatabaseQuotaStmt(String dbName, QuotaType quotaType, String quotaValue) { @@ -75,6 +76,8 @@ public class AlterDatabaseQuotaStmt extends DdlStmt { quota = ParseUtil.analyzeDataVolumn(quotaValue); } else if (quotaType == QuotaType.REPLICA) { quota = ParseUtil.analyzeReplicaNumber(quotaValue); + } else if (quotaType == QuotaType.TRANSACTION) { + quota = ParseUtil.analyzeTransactionNumber(quotaValue); } } @@ -82,7 +85,7 @@ public class AlterDatabaseQuotaStmt extends DdlStmt { @Override public String toSql() { return "ALTER DATABASE " + dbName + " SET " - + (quotaType == QuotaType.DATA ? "DATA" : "REPLICA") + + quotaType.name() + " QUOTA " + quotaValue; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDataStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDataStmt.java index ce19a53135..53b2a1fc1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDataStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowDataStmt.java @@ -212,6 +212,12 @@ public class ShowDataStmt extends ShowStmt { + leftPair.second; List<String> leftRow = Arrays.asList("Left", readableLeft, String.valueOf(replicaCountLeft)); totalRows.add(leftRow); + + // txn quota + long txnQuota = db.getTransactionQuotaSize(); + List<String> transactionQuotaList = Arrays.asList("Transaction Quota", + String.valueOf(txnQuota), String.valueOf(txnQuota)); + totalRows.add(transactionQuotaList); } finally { db.readUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index cf7ac4e4ad..1fc88c34ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -71,6 +71,8 @@ import java.util.stream.Collectors; public class Database extends MetaObject implements Writable, DatabaseIf<Table> { private static final Logger LOG = LogManager.getLogger(Database.class); + private static final String TRANSACTION_QUOTA_SIZE = "transactionQuotaSize"; + private long id; private volatile String fullQualifiedName; private String clusterName; @@ -91,6 +93,8 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table> private volatile long replicaQuotaSize; + private volatile long transactionQuotaSize; + private volatile boolean isDropped; public enum DbState { @@ -118,6 +122,9 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table> this.lowerCaseToTableName = Maps.newConcurrentMap(); this.dataQuotaBytes = Config.default_db_data_quota_bytes; this.replicaQuotaSize = Config.default_db_replica_quota_size; + this.transactionQuotaSize = Config.default_db_max_running_txn_num == -1L + ? Config.max_running_txn_num_per_db + : Config.default_db_max_running_txn_num; this.dbState = DbState.NORMAL; this.attachDbName = ""; this.clusterName = ""; @@ -213,6 +220,19 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table> this.replicaQuotaSize = newQuota; } + public void setTransactionQuotaSize(long newQuota) { + writeLock(); + try { + Preconditions.checkArgument(newQuota >= 0L); + LOG.info("database[{}] try to set transaction quota from {} to {}", + fullQualifiedName, transactionQuotaSize, newQuota); + this.transactionQuotaSize = newQuota; + this.dbProperties.put(TRANSACTION_QUOTA_SIZE, String.valueOf(transactionQuotaSize)); + } finally { + writeUnlock(); + } + } + public long getDataQuota() { return dataQuotaBytes; } @@ -221,6 +241,10 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table> return replicaQuotaSize; } + public long getTransactionQuotaSize() { + return transactionQuotaSize; + } + public DatabaseProperty getDbProperties() { return dbProperties; } @@ -603,6 +627,13 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table> if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_105) { dbProperties = DatabaseProperty.read(in); + String txnQuotaStr = dbProperties.getOrDefault(TRANSACTION_QUOTA_SIZE, + String.valueOf(Config.max_running_txn_num_per_db)); + transactionQuotaSize = Long.parseLong(txnQuotaStr); + } else { + transactionQuotaSize = Config.default_db_max_running_txn_num == -1L + ? Config.max_running_txn_num_per_db + : Config.default_db_max_running_txn_num; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/DbsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/DbsProcDir.java index 47029c9e92..431c618a6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/DbsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/DbsProcDir.java @@ -43,6 +43,7 @@ public class DbsProcDir implements ProcDirInterface { public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() .add("DbId").add("DbName").add("TableNum").add("Size").add("Quota") .add("LastConsistencyCheckTime").add("ReplicaCount").add("ReplicaQuota") + .add("TransactionQuota") .build(); private Env env; @@ -114,11 +115,13 @@ public class DbsProcDir implements ProcDirInterface { ((Database) db).getLastCheckTime()) : FeConstants.null_string; long replicaCount = (db instanceof Database) ? ((Database) db).getReplicaCountWithLock() : 0; long replicaQuota = (db instanceof Database) ? ((Database) db).getReplicaQuota() : 0; + long transactionQuota = (db instanceof Database) ? ((Database) db).getTransactionQuotaSize() : 0; dbInfo.add(readableUsedQuota); dbInfo.add(readableQuota); dbInfo.add(lastCheckTime); dbInfo.add(replicaCount); dbInfo.add(replicaQuota); + dbInfo.add(transactionQuota); } finally { db.readUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java index ed3c4947fe..c47753d2d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java @@ -83,4 +83,17 @@ public class ParseUtil { return replicaNumber; } + public static long analyzeTransactionNumber(String transactionNumberStr) throws AnalysisException { + long transactionNumber = 0; + try { + transactionNumber = Long.parseLong(transactionNumberStr); + } catch (NumberFormatException nfe) { + throw new AnalysisException("invalid data volumn:" + transactionNumberStr); + } + if (transactionNumber <= 0L) { + throw new AnalysisException("Transaction quota size must larger than 0"); + } + return transactionNumber; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 31455d814c..6a9392ffee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -775,6 +775,8 @@ public class InternalCatalog implements CatalogIf<Database> { db.setDataQuota(stmt.getQuota()); } else if (quotaType == QuotaType.REPLICA) { db.setReplicaQuota(stmt.getQuota()); + } else if (quotaType == QuotaType.TRANSACTION) { + db.setTransactionQuotaSize(stmt.getQuota()); } long quota = stmt.getQuota(); DatabaseInfo dbInfo = new DatabaseInfo(dbName, "", quota, quotaType); @@ -792,6 +794,8 @@ public class InternalCatalog implements CatalogIf<Database> { db.setDataQuota(quota); } else if (quotaType == QuotaType.REPLICA) { db.setReplicaQuota(quota); + } else if (quotaType == QuotaType.TRANSACTION) { + db.setTransactionQuotaSize(quota); } } finally { db.writeUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java index 46e4f8df13..5ad6919551 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java @@ -124,7 +124,8 @@ public class CanalSyncChannel extends SyncChannel { String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN; GlobalTransactionMgr globalTransactionMgr = Env.getCurrentGlobalTransactionMgr(); DatabaseTransactionMgr databaseTransactionMgr = globalTransactionMgr.getDatabaseTransactionMgr(db.getId()); - if (databaseTransactionMgr.getRunningTxnNums() < Config.max_running_txn_num_per_db) { + long txnLimit = db.getTransactionQuotaSize(); + if (databaseTransactionMgr.getRunningTxnNums() < txnLimit) { TransactionEntry txnEntry = txnExecutor.getTxnEntry(); TTxnParams txnConf = txnEntry.getTxnConf(); TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING; @@ -185,7 +186,7 @@ public class CanalSyncChannel extends SyncChannel { } else { String failMsg = "current running txns on db " + db.getId() + " is " + databaseTransactionMgr.getRunningTxnNums() - + ", larger than limit " + Config.max_running_txn_num_per_db; + + ", larger than limit " + txnLimit; LOG.warn(failMsg); throw new BeginTransactionException(failMsg); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index dec2a4298f..e250602214 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -31,7 +31,6 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -126,7 +125,6 @@ public class DatabaseTransactionMgr { // it must exists in dbIdToTxnLabels, and vice versa private final Map<String, Set<Long>> labelToTxnIds = Maps.newHashMap(); - // count the number of running txns of database, except for the routine load txn private volatile int runningTxnNums = 0; private volatile int runningTxnReplicaNums = 0; @@ -1523,7 +1521,7 @@ public class DatabaseTransactionMgr { } protected void checkRunningTxnExceedLimit(TransactionState.LoadJobSourceType sourceType) - throws BeginTransactionException { + throws BeginTransactionException, MetaNotFoundException { switch (sourceType) { case ROUTINE_LOAD_TASK: // no need to check limit for routine load task: @@ -1532,9 +1530,10 @@ public class DatabaseTransactionMgr { // load, and other txn may not be able to submitted. break; default: - if (runningTxnNums >= Config.max_running_txn_num_per_db) { + long txnQuota = env.getInternalCatalog().getDbOrMetaException(dbId).getTransactionQuotaSize(); + if (runningTxnNums >= txnQuota) { throw new BeginTransactionException("current running txns on db " + dbId + " is " - + runningTxnNums + ", larger than limit " + Config.max_running_txn_num_per_db); + + runningTxnNums + ", larger than limit " + txnQuota); } break; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterDatabaseQuotaStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterDatabaseQuotaStmtTest.java index 73a7ed6cce..19b2365e25 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterDatabaseQuotaStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterDatabaseQuotaStmtTest.java @@ -154,4 +154,27 @@ public class AlterDatabaseQuotaStmtTest { Assert.fail("No exception throws."); } + @Test + public void testNormalAlterDatabaseTransactionQuotaStmt() throws AnalysisException, UserException { + long quotaSize = 10; + AlterDatabaseQuotaStmt stmt = new AlterDatabaseQuotaStmt("testDb", QuotaType.TRANSACTION, String.valueOf(quotaSize)); + stmt.analyze(analyzer); + String expectedSql = "ALTER DATABASE testCluster:testDb SET TRANSACTION QUOTA 10"; + Assert.assertEquals(expectedSql, stmt.toSql()); + Assert.assertEquals(quotaSize, stmt.getQuota()); + } + + @Test(expected = AnalysisException.class) + public void testTransactionMinusQuota() throws AnalysisException, UserException { + AlterDatabaseQuotaStmt stmt = new AlterDatabaseQuotaStmt("testDb", QuotaType.TRANSACTION, "-100"); + stmt.analyze(analyzer); + Assert.fail("No exception throws."); + } + + @Test(expected = AnalysisException.class) + public void testtransactionInvalidQuantity() throws AnalysisException, UserException { + AlterDatabaseQuotaStmt stmt = new AlterDatabaseQuotaStmt("testDb", QuotaType.TRANSACTION, "invalid_100_quota"); + stmt.analyze(analyzer); + Assert.fail("No exception throws."); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/proc/DbsProcDirTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/proc/DbsProcDirTest.java index 40dc802e63..6532d36c33 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/proc/DbsProcDirTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/proc/DbsProcDirTest.java @@ -193,11 +193,11 @@ public class DbsProcDirTest { Assert.assertTrue(result instanceof BaseProcResult); Assert.assertEquals(Lists.newArrayList("DbId", "DbName", "TableNum", "Size", "Quota", - "LastConsistencyCheckTime", "ReplicaCount", "ReplicaQuota"), + "LastConsistencyCheckTime", "ReplicaCount", "ReplicaQuota", "TransactionQuota"), result.getColumnNames()); List<List<String>> rows = Lists.newArrayList(); - rows.add(Arrays.asList(String.valueOf(db1.getId()), db1.getFullName(), "0", "0.000 ", "1024.000 TB", FeConstants.null_string, "0", "1073741824")); - rows.add(Arrays.asList(String.valueOf(db2.getId()), db2.getFullName(), "0", "0.000 ", "1024.000 TB", FeConstants.null_string, "0", "1073741824")); + rows.add(Arrays.asList(String.valueOf(db1.getId()), db1.getFullName(), "0", "0.000 ", "1024.000 TB", FeConstants.null_string, "0", "1073741824", "100")); + rows.add(Arrays.asList(String.valueOf(db2.getId()), db2.getFullName(), "0", "0.000 ", "1024.000 TB", FeConstants.null_string, "0", "1073741824", "100")); Assert.assertEquals(rows, result.getRows()); } @@ -228,7 +228,7 @@ public class DbsProcDirTest { dir = new DbsProcDir(env, catalog); result = dir.fetchResult(); Assert.assertEquals(Lists.newArrayList("DbId", "DbName", "TableNum", "Size", "Quota", - "LastConsistencyCheckTime", "ReplicaCount", "ReplicaQuota"), + "LastConsistencyCheckTime", "ReplicaCount", "ReplicaQuota", "TransactionQuota"), result.getColumnNames()); List<List<String>> rows = Lists.newArrayList(); Assert.assertEquals(rows, result.getRows()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org