This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9e67b3a [Bug] Fix bug that replayCreateLoadJob will cause fe memory
leak in non master node because InsertLoadJob cannot be removed from
TxnStateCallbackFactory (#6795)
9e67b3a is described below
commit 9e67b3a3924d8916e44cd2a61ec19215ca15458a
Author: caiconghui <[email protected]>
AuthorDate: Fri Oct 8 13:17:22 2021 +0800
[Bug] Fix bug that replayCreateLoadJob will cause fe memory leak in non
master node because InsertLoadJob cannot be removed from
TxnStateCallbackFactory (#6795)
---
.../main/java/org/apache/doris/load/loadv2/InsertLoadJob.java | 3 ++-
.../main/java/org/apache/doris/load/loadv2/LoadManager.java | 10 ++++++----
fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java | 3 ++-
.../java/org/apache/doris/load/loadv2/InsertLoadJobTest.java | 2 +-
.../java/org/apache/doris/load/loadv2/LoadManagerTest.java | 4 ++--
5 files changed, 13 insertions(+), 9 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
index 8c2bdce..e11868e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
@@ -48,10 +48,11 @@ public class InsertLoadJob extends LoadJob {
super(EtlJobType.INSERT);
}
- public InsertLoadJob(String label, long dbId, long tableId, long
createTimestamp, String failMsg,
+ public InsertLoadJob(String label, long transactionId, long dbId, long
tableId, long createTimestamp, String failMsg,
String trackingUrl) throws MetaNotFoundException {
super(EtlJobType.INSERT, dbId, label);
this.tableId = tableId;
+ this.transactionId = transactionId;
this.createTimestamp = createTimestamp;
this.loadStartTimestamp = createTimestamp;
this.finishTimestamp = System.currentTimeMillis();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 2715e2c..0c71559 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -260,9 +260,11 @@ public class LoadManager implements Writable{
return;
}
addLoadJob(loadJob);
- // add callback before txn created, because callback will be performed
on replay without txn begin
+ // add callback before txn if load job is uncompleted, because
callback will be performed on replay without txn begin
// register txn state listener
-
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
+ if (!loadJob.isCompleted()) {
+
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
+ }
}
private void addLoadJob(LoadJob loadJob) {
@@ -278,7 +280,7 @@ public class LoadManager implements Writable{
labelToLoadJobs.get(loadJob.getLabel()).add(loadJob);
}
- public void recordFinishedLoadJob(String label, String dbName, long
tableId, EtlJobType jobType,
+ public void recordFinishedLoadJob(String label, long transactionId, String
dbName, long tableId, EtlJobType jobType,
long createTimestamp, String failMsg, String trackingUrl) throws
MetaNotFoundException {
// get db id
@@ -287,7 +289,7 @@ public class LoadManager implements Writable{
LoadJob loadJob;
switch (jobType) {
case INSERT:
- loadJob = new InsertLoadJob(label, db.getId(), tableId,
createTimestamp, failMsg, trackingUrl);
+ loadJob = new InsertLoadJob(label, transactionId, db.getId(),
tableId, createTimestamp, failMsg, trackingUrl);
break;
default:
return;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index d581e70..f1e8e5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1288,9 +1288,11 @@ public class StmtExecutor implements ProfileWriter {
message = throwable.getMessage();
}
+ txnId = insertStmt.getTransactionId();
try {
context.getCatalog().getLoadManager().recordFinishedLoadJob(
label,
+ txnId,
insertStmt.getDb(),
insertStmt.getTargetTable().getId(),
EtlJobType.INSERT,
@@ -1301,7 +1303,6 @@ public class StmtExecutor implements ProfileWriter {
LOG.warn("Record info of insert load with error {}",
e.getMessage(), e);
errMsg = "Record info of insert load with error " +
e.getMessage();
}
- txnId = insertStmt.getTransactionId();
}
// {'label':'my_label1', 'status':'visible', 'txnId':'123'}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java
index cba2f71..a7f2bb3 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java
@@ -39,7 +39,7 @@ public class InsertLoadJobTest {
public void testGetTableNames(@Mocked Catalog catalog,
@Injectable Database database,
@Injectable Table table) throws
MetaNotFoundException {
- InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000,
"", "");
+ InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1L,
1000, "", "");
String tableName = "table1";
new Expectations() {
{
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
index d33f310..f014411 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
@@ -133,7 +133,7 @@ public class LoadManagerTest {
};
loadManager = new LoadManager(new LoadJobScheduler());
- LoadJob job1 = new InsertLoadJob("job1", 1L, 1L,
System.currentTimeMillis(), "", "");
+ LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L,
System.currentTimeMillis(), "", "");
Deencapsulation.invoke(loadManager, "addLoadJob", job1);
File file = serializeToFile(loadManager);
@@ -168,7 +168,7 @@ public class LoadManagerTest {
};
loadManager = new LoadManager(new LoadJobScheduler());
- LoadJob job1 = new InsertLoadJob("job1", 1L, 1L,
System.currentTimeMillis(), "", "");
+ LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L,
System.currentTimeMillis(), "", "");
Deencapsulation.invoke(loadManager, "addLoadJob", job1);
//make job1 don't serialize
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]