This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 924d87d75d2 [enhancement](fe-memory) support label num threshold to
reduce fe memory consumption (#22889) (#29991)
924d87d75d2 is described below
commit 924d87d75d2a3e693c013fdf07b57a9f5a2d1163
Author: Siyang Tang <[email protected]>
AuthorDate: Tue Jan 16 15:30:59 2024 +0800
[enhancement](fe-memory) support label num threshold to reduce fe memory
consumption (#22889) (#29991)
Co-authored-by: Yongqiang YANG
<[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 8 +++
.../main/java/org/apache/doris/catalog/Env.java | 1 +
.../org/apache/doris/load/loadv2/LoadManager.java | 67 +++++++++++++++++-----
.../doris/load/routineload/RoutineLoadJob.java | 2 +-
.../doris/load/routineload/RoutineLoadManager.java | 50 ++++++++++++++--
.../load/routineload/RoutineLoadScheduler.java | 17 +++---
.../org/apache/doris/load/sync/SyncChecker.java | 1 +
.../org/apache/doris/load/sync/SyncJobManager.java | 67 ++++++++++++++++++----
.../doris/transaction/DatabaseTransactionMgr.java | 27 +++++++--
.../apache/doris/load/loadv2/LoadManagerTest.java | 46 ++++++++++++++-
.../load/routineload/RoutineLoadManagerTest.java | 43 +++++++++++++-
.../apache/doris/load/sync/SyncJobManagerTest.java | 36 ++++++++++++
.../transaction/DatabaseTransactionMgrTest.java | 12 +++-
13 files changed, 328 insertions(+), 49 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 0941ae047ef..adffe19d3cb 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2279,4 +2279,12 @@ public class Config extends ConfigBase {
"The max number work threads of http upload submitter."
})
public static int http_load_submitter_max_worker_threads = 2;
+
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "load label个数阈值,超过该个数后,对于已经完成导入作业或者任务,其label会被删除,被删除的 label
可以被重用。",
+ "The threshold of load labels' number. After this number is
exceeded, "
+ + "the labels of the completed import jobs or tasks will
be deleted, "
+ + "and the deleted labels can be reused."
+ })
+ public static int label_num_threshold = 2000;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index e0844491299..3184f1de37d 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -2296,6 +2296,7 @@ public class Env {
loadManager.removeOldLoadJob();
exportMgr.removeOldExportJobs();
deleteHandler.removeOldDeleteInfos();
+ loadManager.removeOverLimitLoadJob();
}
};
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index dc02a0cf145..b31d040e679 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -66,8 +66,11 @@ import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@@ -78,6 +81,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
@@ -404,26 +408,59 @@ public class LoadManager implements Writable {
**/
public void removeOldLoadJob() {
long currentTimeMs = System.currentTimeMillis();
+ removeLoadJobIf(job -> job.isExpired(currentTimeMs));
+ }
+ /**
+ * Remove completed jobs if total job num exceed Config.label_num_threshold
+ */
+ public void removeOverLimitLoadJob() {
+ if (Config.label_num_threshold < 0 || idToLoadJob.size() <=
Config.label_num_threshold) {
+ return;
+ }
+ writeLock();
+ try {
+ Deque<LoadJob> finishedJobs = idToLoadJob
+ .values()
+ .stream()
+ .filter(LoadJob::isCompleted)
+ .sorted(Comparator.comparingLong(o -> o.finishTimestamp))
+ .collect(Collectors.toCollection(ArrayDeque::new));
+ while (!finishedJobs.isEmpty()
+ && idToLoadJob.size() > Config.label_num_threshold) {
+ LoadJob loadJob = finishedJobs.pollFirst();
+ idToLoadJob.remove(loadJob.getId());
+ jobRemovedTrigger(loadJob);
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void jobRemovedTrigger(LoadJob job) {
+ Map<String, List<LoadJob>> map =
dbIdToLabelToLoadJobs.get(job.getDbId());
+ List<LoadJob> list = map.get(job.getLabel());
+ list.remove(job);
+ if (job instanceof SparkLoadJob) {
+ ((SparkLoadJob) job).clearSparkLauncherLog();
+ }
+ if (list.isEmpty()) {
+ map.remove(job.getLabel());
+ }
+ if (map.isEmpty()) {
+ dbIdToLabelToLoadJobs.remove(job.getDbId());
+ }
+ }
+
+ private void removeLoadJobIf(Predicate<LoadJob> pred) {
writeLock();
try {
Iterator<Map.Entry<Long, LoadJob>> iter =
idToLoadJob.entrySet().iterator();
while (iter.hasNext()) {
LoadJob job = iter.next().getValue();
- if (job.isExpired(currentTimeMs)) {
+ if (pred.test(job)) {
iter.remove();
- Map<String, List<LoadJob>> map =
dbIdToLabelToLoadJobs.get(job.getDbId());
- List<LoadJob> list = map.get(job.getLabel());
- list.remove(job);
- if (job instanceof SparkLoadJob) {
- ((SparkLoadJob) job).clearSparkLauncherLog();
- }
- if (list.isEmpty()) {
- map.remove(job.getLabel());
- }
- if (map.isEmpty()) {
- dbIdToLabelToLoadJobs.remove(job.getDbId());
- }
+ jobRemovedTrigger(job);
}
}
} finally {
@@ -510,8 +547,8 @@ public class LoadManager implements Writable {
* @param accurateMatch true: filter jobs which's label is labelValue.
false: filter jobs which's label like itself.
* @param statesValue used to filter jobs which's state within the
statesValue set.
* @return The result is the list of jobInfo.
- * JobInfo is a list which includes the comparable object: jobId, label,
state etc.
- * The result is unordered.
+ * JobInfo is a list which includes the comparable object: jobId,
label, state etc.
+ * The result is unordered.
*/
public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String
labelValue, boolean accurateMatch,
Set<String> statesValue) throws AnalysisException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index fdad28bff7e..10b2d0cf734 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1633,7 +1633,7 @@ public abstract class RoutineLoadJob extends
AbstractTxnStateChangeCallback impl
abstract Map<String, String> getCustomProperties();
- public boolean needRemove() {
+ public boolean isExpired() {
if (!isFinal()) {
return false;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 24fafc6677b..dbf3d6a09a5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -62,7 +62,10 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -70,6 +73,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
public class RoutineLoadManager implements Writable {
@@ -247,7 +251,7 @@ public class RoutineLoadManager implements Writable {
if
(!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
dbFullName,
PrivPredicate.LOAD)) {
- //todo add new error code
+ // todo add new error code
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
"LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
@@ -575,7 +579,7 @@ public class RoutineLoadManager implements Writable {
else return all of result
*/
public List<RoutineLoadJob> getJob(String dbFullName, String jobName,
- boolean includeHistory, PatternMatcher
matcher)
+ boolean includeHistory, PatternMatcher matcher)
throws MetaNotFoundException {
Preconditions.checkArgument(jobName == null || matcher == null,
"jobName and matcher cannot be not null at the same time");
@@ -679,19 +683,55 @@ public class RoutineLoadManager implements Writable {
// Remove old routine load jobs from idToRoutineLoadJob
// This function is called periodically.
- // Cancelled and stopped job will be remove after
Configure.label_keep_max_second seconds
+ // Cancelled and stopped job will be removed after
Configure.label_keep_max_second seconds
public void cleanOldRoutineLoadJobs() {
LOG.debug("begin to clean old routine load jobs ");
+ clearRoutineLoadJobIf(RoutineLoadJob::isExpired);
+ }
+
+ /**
+ * Remove finished routine load jobs from idToRoutineLoadJob
+ * This function is called periodically if Config.label_num_threshold is
set.
+ * Cancelled and stopped job will be removed.
+ */
+ public void cleanOverLimitRoutineLoadJobs() {
+ if (Config.label_num_threshold < 0
+ || idToRoutineLoadJob.size() <= Config.label_num_threshold) {
+ return;
+ }
+ writeLock();
+ try {
+ LOG.debug("begin to clean routine load jobs");
+ Deque<RoutineLoadJob> finishedJobs = idToRoutineLoadJob
+ .values()
+ .stream()
+ .filter(RoutineLoadJob::isFinal)
+ .sorted(Comparator.comparingLong(o -> o.endTimestamp))
+ .collect(Collectors.toCollection(ArrayDeque::new));
+ while (!finishedJobs.isEmpty()
+ && idToRoutineLoadJob.size() > Config.label_num_threshold)
{
+ RoutineLoadJob routineLoadJob = finishedJobs.pollFirst();
+ unprotectedRemoveJobFromDb(routineLoadJob);
+ idToRoutineLoadJob.remove(routineLoadJob.getId());
+ RoutineLoadOperation operation = new
RoutineLoadOperation(routineLoadJob.getId(),
+ routineLoadJob.getState());
+
Env.getCurrentEnv().getEditLog().logRemoveRoutineLoadJob(operation);
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void clearRoutineLoadJobIf(Predicate<RoutineLoadJob> pred) {
writeLock();
try {
Iterator<Map.Entry<Long, RoutineLoadJob>> iterator =
idToRoutineLoadJob.entrySet().iterator();
long currentTimestamp = System.currentTimeMillis();
while (iterator.hasNext()) {
RoutineLoadJob routineLoadJob = iterator.next().getValue();
- if (routineLoadJob.needRemove()) {
+ if (pred.test(routineLoadJob)) {
unprotectedRemoveJobFromDb(routineLoadJob);
iterator.remove();
-
RoutineLoadOperation operation = new
RoutineLoadOperation(routineLoadJob.getId(),
routineLoadJob.getState());
Env.getCurrentEnv().getEditLog().logRemoveRoutineLoadJob(operation);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
index b3b2c8baf2f..84f9548de13 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -83,9 +83,9 @@ public class RoutineLoadScheduler extends MasterDaemon {
if (desiredConcurrentTaskNum <= 0) {
// the job will be rescheduled later.
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB,
routineLoadJob.getId())
- .add("msg", "the current concurrent num
is less than or equal to zero, "
- + "job will be rescheduled later")
- .build());
+ .add("msg", "the current concurrent num is less
than or equal to zero, "
+ + "job will be rescheduled later")
+ .build());
continue;
}
// check state and divide job into tasks
@@ -112,10 +112,10 @@ public class RoutineLoadScheduler extends MasterDaemon {
routineLoadJob.updateState(errorJobState, reason, false);
} catch (UserException e) {
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB,
routineLoadJob.getId())
- .add("current_state",
routineLoadJob.getState())
- .add("desired_state", errorJobState)
- .add("warn_msg", "failed to change state
to desired state")
- .build(), e);
+ .add("current_state", routineLoadJob.getState())
+ .add("desired_state", errorJobState)
+ .add("warn_msg", "failed to change state to
desired state")
+ .build(), e);
}
}
}
@@ -124,11 +124,12 @@ public class RoutineLoadScheduler extends MasterDaemon {
routineLoadManager.processTimeoutTasks();
routineLoadManager.cleanOldRoutineLoadJobs();
+
+ routineLoadManager.cleanOverLimitRoutineLoadJobs();
}
private List<RoutineLoadJob> getNeedScheduleRoutineJobs() throws
LoadException {
return
routineLoadManager.getRoutineLoadJobByState(Sets.newHashSet(RoutineLoadJob.JobState.NEED_SCHEDULE));
}
-
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
index c0fa55ac6ec..319589c9aa1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncChecker.java
@@ -79,5 +79,6 @@ public class SyncChecker extends MasterDaemon {
private void cleanOldSyncJobs() {
// clean up expired sync jobs
this.syncJobManager.cleanOldSyncJobs();
+ this.syncJobManager.cleanOverLimitSyncJobs();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
index c1533843ef9..5be922d6e2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/sync/SyncJobManager.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.ResumeSyncJobStmt;
import org.apache.doris.analysis.StopSyncJobStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
@@ -39,12 +40,17 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
public class SyncJobManager implements Writable {
@@ -58,7 +64,7 @@ public class SyncJobManager implements Writable {
public SyncJobManager() {
idToSyncJob = Maps.newConcurrentMap();
- dbIdToJobNameToSyncJobs = Maps.newConcurrentMap();
+ dbIdToJobNameToSyncJobs =
Collections.synchronizedMap(Maps.newLinkedHashMap());
lock = new ReentrantReadWriteLock(true);
}
@@ -283,31 +289,68 @@ public class SyncJobManager implements Writable {
// Stopped jobs will be removed after Config.label_keep_max_second.
public void cleanOldSyncJobs() {
LOG.debug("begin to clean old sync jobs ");
+ cleanFinishedSyncJobsIf(job ->
job.isExpired(System.currentTimeMillis()));
+ }
+
+ /**
+ * Remove completed jobs if total job num exceed Config.label_num_threshold
+ */
+ public void cleanOverLimitSyncJobs() {
+ if (Config.label_num_threshold < 0 || idToSyncJob.size() <=
Config.label_num_threshold) {
+ return;
+ }
+ writeLock();
+ try {
+ LOG.debug("begin to clean finished sync jobs ");
+ Deque<SyncJob> finishedJobs = idToSyncJob
+ .values()
+ .stream()
+ .filter(SyncJob::isCompleted)
+ .sorted(Comparator.comparingLong(o -> o.finishTimeMs))
+ .collect(Collectors.toCollection(ArrayDeque::new));
+ while (!finishedJobs.isEmpty() && idToSyncJob.size() >
Config.label_num_threshold) {
+ SyncJob syncJob = finishedJobs.pollFirst();
+ if (!dbIdToJobNameToSyncJobs.containsKey(syncJob.getDbId())) {
+ continue;
+ }
+ idToSyncJob.remove(syncJob.getId());
+ jobRemovedTrigger(syncJob);
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void jobRemovedTrigger(SyncJob syncJob) {
+ Map<String, List<SyncJob>> map =
dbIdToJobNameToSyncJobs.get(syncJob.getDbId());
+ List<SyncJob> list = map.get(syncJob.getJobName());
+ list.remove(syncJob);
+ if (list.isEmpty()) {
+ map.remove(syncJob.getJobName());
+ }
+ if (map.isEmpty()) {
+ dbIdToJobNameToSyncJobs.remove(syncJob.getDbId());
+ }
+ }
+
+ public void cleanFinishedSyncJobsIf(Predicate<SyncJob> pred) {
long currentTimeMs = System.currentTimeMillis();
writeLock();
try {
Iterator<Map.Entry<Long, SyncJob>> iterator =
idToSyncJob.entrySet().iterator();
while (iterator.hasNext()) {
SyncJob syncJob = iterator.next().getValue();
- if (syncJob.isExpired(currentTimeMs)) {
+ if (pred.test(syncJob)) {
if
(!dbIdToJobNameToSyncJobs.containsKey(syncJob.getDbId())) {
continue;
}
- Map<String, List<SyncJob>> map =
dbIdToJobNameToSyncJobs.get(syncJob.getDbId());
- List<SyncJob> list = map.get(syncJob.getJobName());
- list.remove(syncJob);
- if (list.isEmpty()) {
- map.remove(syncJob.getJobName());
- }
- if (map.isEmpty()) {
- dbIdToJobNameToSyncJobs.remove(syncJob.getDbId());
- }
+ jobRemovedTrigger(syncJob);
iterator.remove();
LOG.info(new LogBuilder(LogKey.SYNC_JOB, syncJob.getId())
.add("finishTimeMs", syncJob.getFinishTimeMs())
.add("currentTimeMs", currentTimeMs)
.add("jobState", syncJob.getJobState())
- .add("msg", "old sync job has been cleaned")
+ .add("msg", "sync job has been cleaned")
);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 1e2a8764505..125335b382c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -165,6 +165,8 @@ public class DatabaseTransactionMgr {
private long lockReportingThresholdMs = Config.lock_reporting_threshold_ms;
+ private long maxFinalTxnsNum = Long.MAX_VALUE;
+
protected void readLock() {
this.transactionLock.readLock().lock();
}
@@ -188,6 +190,9 @@ public class DatabaseTransactionMgr {
this.env = env;
this.idGenerator = idGenerator;
this.editLog = env.getEditLog();
+ if (Config.label_num_threshold >= 0) {
+ this.maxFinalTxnsNum = Config.label_num_threshold;
+ }
}
public long getDbId() {
@@ -1534,13 +1539,13 @@ public class DatabaseTransactionMgr {
return partitionInfos;
}
- public void removeExpiredTxns(long currentMillis) {
+ public void removeUselessTxns(long currentMillis) {
// delete expired txns
writeLock();
try {
- Pair<Long, Integer> expiredTxnsInfoForShort =
unprotectedRemoveExpiredTxns(currentMillis,
+ Pair<Long, Integer> expiredTxnsInfoForShort =
unprotectedRemoveUselessTxns(currentMillis,
finalStatusTransactionStateDequeShort,
MAX_REMOVE_TXN_PER_ROUND);
- Pair<Long, Integer> expiredTxnsInfoForLong =
unprotectedRemoveExpiredTxns(currentMillis,
+ Pair<Long, Integer> expiredTxnsInfoForLong =
unprotectedRemoveUselessTxns(currentMillis,
finalStatusTransactionStateDequeLong,
MAX_REMOVE_TXN_PER_ROUND - expiredTxnsInfoForShort.second);
int numOfClearedTransaction = expiredTxnsInfoForShort.second +
expiredTxnsInfoForLong.second;
@@ -1557,7 +1562,7 @@ public class DatabaseTransactionMgr {
}
}
- private Pair<Long, Integer> unprotectedRemoveExpiredTxns(long
currentMillis,
+ private Pair<Long, Integer> unprotectedRemoveUselessTxns(long
currentMillis,
ArrayDeque<TransactionState> finalStatusTransactionStateDeque, int
left) {
long latestTxnId = -1;
int numOfClearedTransaction = 0;
@@ -1572,6 +1577,18 @@ public class DatabaseTransactionMgr {
break;
}
}
+ while (finalStatusTransactionStateDeque.size() > maxFinalTxnsNum
+ && numOfClearedTransaction < left) {
+ TransactionState transactionState =
finalStatusTransactionStateDeque.getFirst();
+ if (transactionState.getFinishTime() != -1) {
+ finalStatusTransactionStateDeque.pop();
+ clearTransactionState(transactionState.getTransactionId());
+ latestTxnId = transactionState.getTransactionId();
+ numOfClearedTransaction++;
+ } else {
+ break;
+ }
+ }
return Pair.of(latestTxnId, numOfClearedTransaction);
}
@@ -1922,7 +1939,7 @@ public class DatabaseTransactionMgr {
}
public void removeExpiredAndTimeoutTxns(long currentMillis) {
- removeExpiredTxns(currentMillis);
+ removeUselessTxns(currentMillis);
List<Long> timeoutTxns = getTimeoutTxns(currentMillis);
// abort timeout txns
for (Long txnId : timeoutTxns) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
index a30d25a944f..e9b3278cfd0 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
@@ -40,6 +40,7 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.util.List;
import java.util.Map;
public class LoadManagerTest {
@@ -123,7 +124,7 @@ public class LoadManagerTest {
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L,
System.currentTimeMillis(), "", "", userInfo);
Deencapsulation.invoke(loadManager, "addLoadJob", job1);
- //make job1 don't serialize
+ // make job1 don't serialize
Config.streaming_label_keep_max_second = 1;
Thread.sleep(2000);
@@ -135,6 +136,49 @@ public class LoadManagerTest {
Assert.assertEquals(0, newLoadJobs.size());
}
+ @Test
+ public void testCleanOverLimitJobs(@Mocked Env env,
+ @Mocked InternalCatalog catalog, @Injectable Database database,
@Injectable Table table) throws Exception {
+ new Expectations() {
+ {
+ env.getNextId();
+ returns(1L, 2L);
+ env.getInternalCatalog();
+ minTimes = 0;
+ result = catalog;
+ catalog.getDbNullable(anyLong);
+ minTimes = 0;
+ result = database;
+ database.getTableNullable(anyLong);
+ minTimes = 0;
+ result = table;
+ table.getName();
+ minTimes = 0;
+ result = "tablename";
+ Env.getCurrentEnvJournalVersion();
+ minTimes = 0;
+ result = FeMetaVersion.VERSION_CURRENT;
+ }
+ };
+
+ loadManager = new LoadManager(new LoadJobScheduler());
+ LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L,
System.currentTimeMillis(), "", "", userInfo);
+ Thread.sleep(100);
+ LoadJob job2 = new InsertLoadJob("job2", 1L, 1L, 1L,
System.currentTimeMillis(), "", "", userInfo);
+ Deencapsulation.invoke(loadManager, "addLoadJob", job2);
+ Deencapsulation.invoke(loadManager, "addLoadJob", job1);
+ Config.label_num_threshold = 1;
+ loadManager.removeOverLimitLoadJob();
+ Map<Long, LoadJob> idToJobs = Deencapsulation.getField(loadManager,
fieldName);
+ Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs =
Deencapsulation.getField(loadManager,
+ "dbIdToLabelToLoadJobs");
+ Assert.assertEquals(1, idToJobs.size());
+ Assert.assertEquals(1, dbIdToLabelToLoadJobs.size());
+ LoadJob loadJob = idToJobs.get(job2.getId());
+ Assert.assertEquals("job2", loadJob.getLabel());
+ Assert.assertNotNull(dbIdToLabelToLoadJobs.get(1L).get("job2"));
+ }
+
private File serializeToFile(LoadManager loadManager) throws Exception {
File file = new File("./loadManagerTest");
file.createNewFile();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
index 7bd776df3eb..8114b9cf835 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java
@@ -801,7 +801,7 @@ public class RoutineLoadManagerTest {
new Expectations() {
{
- routineLoadJob.needRemove();
+ routineLoadJob.isExpired();
minTimes = 0;
result = true;
routineLoadJob.getDbId();
@@ -821,6 +821,47 @@ public class RoutineLoadManagerTest {
Assert.assertEquals(0, idToRoutineLoadJob.size());
}
+ @Test
+ public void testCleanOverLimitRoutineLoadJobs(@Injectable RoutineLoadJob
routineLoadJob,
+ @Mocked Env env, @Mocked EditLog editLog) {
+ RoutineLoadManager routineLoadManager = new RoutineLoadManager();
+ Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob
= Maps.newHashMap();
+ Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob =
Maps.newHashMap();
+ List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
+ routineLoadJobList.add(routineLoadJob);
+ nameToRoutineLoadJob.put("", routineLoadJobList);
+ dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
+ Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newHashMap();
+ idToRoutineLoadJob.put(1L, routineLoadJob);
+ Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob",
idToRoutineLoadJob);
+ Deencapsulation.setField(routineLoadManager,
"dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob);
+
+ new Expectations() {
+ {
+ routineLoadJob.getId();
+ minTimes = 0;
+ result = 1L;
+ routineLoadJob.isFinal();
+ minTimes = 0;
+ result = true;
+ routineLoadJob.getDbId();
+ minTimes = 0;
+ result = 1L;
+ routineLoadJob.getName();
+ minTimes = 0;
+ result = "";
+ env.getEditLog();
+ minTimes = 0;
+ result = editLog;
+ }
+ };
+ Config.label_num_threshold = 0;
+
+ routineLoadManager.cleanOverLimitRoutineLoadJobs();
+ Assert.assertEquals(0, dbToNameToRoutineLoadJob.size());
+ Assert.assertEquals(0, idToRoutineLoadJob.size());
+ }
+
@Test
public void testGetBeIdConcurrentTaskMaps(@Injectable RoutineLoadJob
routineLoadJob) {
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
index d0badf19701..dc239dde597 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/SyncJobManagerTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.ResumeSyncJobStmt;
import org.apache.doris.analysis.StopSyncJobStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
@@ -446,5 +447,40 @@ public class SyncJobManagerTest {
Assert.assertEquals(0, dbIdToJobNameToSyncJobs.size());
}
+ @Test
+ public void testCleanOverLimitJobs() {
+ SyncJob canalSyncJob = new CanalSyncJob(jobId, jobName, dbId);
+ // change sync job state to cancelled
+ try {
+ canalSyncJob.updateState(JobState.CANCELLED, false);
+ } catch (UserException e) {
+ Assert.fail();
+ }
+ Assert.assertEquals(JobState.CANCELLED, canalSyncJob.getJobState());
+
+ SyncJobManager manager = new SyncJobManager();
+ // add a sync job to manager
+ Map<Long, SyncJob> idToSyncJob = Maps.newHashMap();
+ idToSyncJob.put(jobId, canalSyncJob);
+ Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs =
Maps.newHashMap();
+ Map<String, List<SyncJob>> jobNameToSyncJobs = Maps.newHashMap();
+ jobNameToSyncJobs.put(jobName, Lists.newArrayList(canalSyncJob));
+ dbIdToJobNameToSyncJobs.put(dbId, jobNameToSyncJobs);
+
+ Deencapsulation.setField(manager, "idToSyncJob", idToSyncJob);
+ Deencapsulation.setField(manager, "dbIdToJobNameToSyncJobs",
dbIdToJobNameToSyncJobs);
+
+ new Expectations(canalSyncJob) {
+ {
+ canalSyncJob.isCompleted();
+ result = true;
+ }
+ };
+ Config.label_num_threshold = 0;
+ manager.cleanOverLimitSyncJobs();
+
+ Assert.assertEquals(0, idToSyncJob.size());
+ Assert.assertEquals(0, dbIdToJobNameToSyncJobs.size());
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index b988650fa2f..9108570e5e4 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -240,7 +240,17 @@ public class DatabaseTransactionMgrTest {
DatabaseTransactionMgr masterDbTransMgr =
masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1);
Config.label_keep_max_second = -1;
long currentMillis = System.currentTimeMillis();
- masterDbTransMgr.removeExpiredTxns(currentMillis);
+ masterDbTransMgr.removeUselessTxns(currentMillis);
+ Assert.assertEquals(0, masterDbTransMgr.getFinishedTxnNums());
+ Assert.assertEquals(3, masterDbTransMgr.getTransactionNum());
+
Assert.assertNull(masterDbTransMgr.unprotectedGetTxnIdsByLabel(CatalogTestUtil.testTxnLabel1));
+ }
+
+ @Test
+ public void testRemoveOverLimitTxns() throws AnalysisException {
+ DatabaseTransactionMgr masterDbTransMgr =
masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1);
+ Config.label_num_threshold = 0;
+ masterDbTransMgr.removeUselessTxns(System.currentTimeMillis());
Assert.assertEquals(0, masterDbTransMgr.getFinishedTxnNums());
Assert.assertEquals(3, masterDbTransMgr.getTransactionNum());
Assert.assertNull(masterDbTransMgr.unprotectedGetTxnIdsByLabel(CatalogTestUtil.testTxnLabel1));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]