This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e67b3a  [Bug] Fix bug that replayCreateLoadJob will cause fe memory 
leak in non master node because InsertLoadJob cannot be removed from 
TxnStateCallbackFactory (#6795)
9e67b3a is described below

commit 9e67b3a3924d8916e44cd2a61ec19215ca15458a
Author: caiconghui <[email protected]>
AuthorDate: Fri Oct 8 13:17:22 2021 +0800

    [Bug] Fix bug that replayCreateLoadJob will cause fe memory leak in non 
master node because InsertLoadJob cannot be removed from 
TxnStateCallbackFactory (#6795)
---
 .../main/java/org/apache/doris/load/loadv2/InsertLoadJob.java  |  3 ++-
 .../main/java/org/apache/doris/load/loadv2/LoadManager.java    | 10 ++++++----
 fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java |  3 ++-
 .../java/org/apache/doris/load/loadv2/InsertLoadJobTest.java   |  2 +-
 .../java/org/apache/doris/load/loadv2/LoadManagerTest.java     |  4 ++--
 5 files changed, 13 insertions(+), 9 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
index 8c2bdce..e11868e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
@@ -48,10 +48,11 @@ public class InsertLoadJob extends LoadJob {
         super(EtlJobType.INSERT);
     }
 
-    public InsertLoadJob(String label, long dbId, long tableId, long 
createTimestamp, String failMsg,
+    public InsertLoadJob(String label, long transactionId, long dbId, long 
tableId, long createTimestamp, String failMsg,
             String trackingUrl) throws MetaNotFoundException {
         super(EtlJobType.INSERT, dbId, label);
         this.tableId = tableId;
+        this.transactionId = transactionId;
         this.createTimestamp = createTimestamp;
         this.loadStartTimestamp = createTimestamp;
         this.finishTimestamp = System.currentTimeMillis();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 2715e2c..0c71559 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -260,9 +260,11 @@ public class LoadManager implements Writable{
             return;
         }
         addLoadJob(loadJob);
-        // add callback before txn created, because callback will be performed 
on replay without txn begin
+        // add callback before txn if load job is uncompleted, because 
callback will be performed on replay without txn begin
         // register txn state listener
-        
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
+        if (!loadJob.isCompleted()) {
+            
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
+        }
     }
 
     private void addLoadJob(LoadJob loadJob) {
@@ -278,7 +280,7 @@ public class LoadManager implements Writable{
         labelToLoadJobs.get(loadJob.getLabel()).add(loadJob);
     }
 
-    public void recordFinishedLoadJob(String label, String dbName, long 
tableId, EtlJobType jobType,
+    public void recordFinishedLoadJob(String label, long transactionId, String 
dbName, long tableId, EtlJobType jobType,
             long createTimestamp, String failMsg, String trackingUrl) throws 
MetaNotFoundException {
 
         // get db id
@@ -287,7 +289,7 @@ public class LoadManager implements Writable{
         LoadJob loadJob;
         switch (jobType) {
             case INSERT:
-                loadJob = new InsertLoadJob(label, db.getId(), tableId, 
createTimestamp, failMsg, trackingUrl);
+                loadJob = new InsertLoadJob(label, transactionId, db.getId(), 
tableId, createTimestamp, failMsg, trackingUrl);
                 break;
             default:
                 return;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index d581e70..f1e8e5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1288,9 +1288,11 @@ public class StmtExecutor implements ProfileWriter {
                 message = throwable.getMessage();
             }
 
+            txnId = insertStmt.getTransactionId();
             try {
                 context.getCatalog().getLoadManager().recordFinishedLoadJob(
                         label,
+                        txnId,
                         insertStmt.getDb(),
                         insertStmt.getTargetTable().getId(),
                         EtlJobType.INSERT,
@@ -1301,7 +1303,6 @@ public class StmtExecutor implements ProfileWriter {
                 LOG.warn("Record info of insert load with error {}", 
e.getMessage(), e);
                 errMsg = "Record info of insert load with error " + 
e.getMessage();
             }
-            txnId = insertStmt.getTransactionId();
         }
 
         // {'label':'my_label1', 'status':'visible', 'txnId':'123'}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java
index cba2f71..a7f2bb3 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/InsertLoadJobTest.java
@@ -39,7 +39,7 @@ public class InsertLoadJobTest {
     public void testGetTableNames(@Mocked Catalog catalog,
                                   @Injectable Database database,
                                   @Injectable Table table) throws 
MetaNotFoundException {
-        InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000, 
"", "");
+        InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1L, 
1000, "", "");
         String tableName = "table1";
         new Expectations() {
             {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
index d33f310..f014411 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
@@ -133,7 +133,7 @@ public class LoadManagerTest {
         };
 
         loadManager = new LoadManager(new LoadJobScheduler());
-        LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 
System.currentTimeMillis(), "", "");
+        LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, 
System.currentTimeMillis(), "", "");
         Deencapsulation.invoke(loadManager, "addLoadJob", job1);
 
         File file = serializeToFile(loadManager);
@@ -168,7 +168,7 @@ public class LoadManagerTest {
         };
 
         loadManager = new LoadManager(new LoadJobScheduler());
-        LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 
System.currentTimeMillis(), "", "");
+        LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, 
System.currentTimeMillis(), "", "");
         Deencapsulation.invoke(loadManager, "addLoadJob", job1);
 
         //make job1 don't serialize

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to