morningman commented on code in PR #16073:
URL: https://github.com/apache/doris/pull/16073#discussion_r1101018493


##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -43,55 +59,297 @@
 import org.apache.http.util.EntityUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Base64;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 public class MysqlLoadManager {
     private static final Logger LOG = 
LogManager.getLogger(MysqlLoadManager.class);
 
     private final ThreadPoolExecutor mysqlLoadPool;
+    private final ConcurrentHashMap<String, Long> mysqlAuthTokens = new 
ConcurrentHashMap<>();
+    private final ScheduledExecutorService authCleaner;
 
     public MysqlLoadManager() {
         this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4, 
"Mysql Load", true);
+        this.authCleaner = Executors.newScheduledThreadPool(1);
+        this.authCleaner.scheduleAtFixedRate(() -> {
+            synchronized (mysqlAuthTokens) {
+                for (String key : mysqlAuthTokens.keySet()) {
+                    if (System.currentTimeMillis() - mysqlAuthTokens.get(key) 
>= 3600 * 1000) {

Review Comment:
   If the load time is longer than 1 hour, the load will fail because the auth 
code is expired?



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java:
##########
@@ -98,6 +98,7 @@ public class DataDescription {
     private final String tableName;
 
     private String dbName;
+    private String fullDatabaseName;

Review Comment:
   what is different between `dbName` and `fullDatabaseName`?
   Add comment to explain, or I think we can merge them into one



##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -43,55 +59,297 @@
 import org.apache.http.util.EntityUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Base64;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 public class MysqlLoadManager {
     private static final Logger LOG = 
LogManager.getLogger(MysqlLoadManager.class);
 
     private final ThreadPoolExecutor mysqlLoadPool;
+    private final ConcurrentHashMap<String, Long> mysqlAuthTokens = new 
ConcurrentHashMap<>();
+    private final ScheduledExecutorService authCleaner;
 
     public MysqlLoadManager() {
         this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4, 
"Mysql Load", true);
+        this.authCleaner = Executors.newScheduledThreadPool(1);
+        this.authCleaner.scheduleAtFixedRate(() -> {
+            synchronized (mysqlAuthTokens) {
+                for (String key : mysqlAuthTokens.keySet()) {
+                    if (System.currentTimeMillis() - mysqlAuthTokens.get(key) 
>= 3600 * 1000) {
+                        mysqlAuthTokens.remove(key);
+                    }
+                }
+            }
+        }, 1, 1, TimeUnit.DAYS);
+    }
+
+    // this method only will be called in master node, since stream load only 
send message to master.
+    public boolean checkAuthToken(String token) {
+        return mysqlAuthTokens.containsKey(token);
+    }
+
+    // context only use in no master branch.
+    public String acquireToken(ConnectContext context) {
+        if (Env.getCurrentEnv().isMaster()) {
+            String token = UUID.randomUUID().toString();
+            long createTime = System.currentTimeMillis();
+            mysqlAuthTokens.put(token, createTime);
+            return token;
+        } else {
+            MasterTxnExecutor masterTxnExecutor = new 
MasterTxnExecutor(context);
+            try {
+                return masterTxnExecutor.acquireToken();
+            } catch (TException e) {
+                LOG.error("acquire token error", e);

Review Comment:
   ```suggestion
                   LOG.warn("acquire token error", e);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -43,55 +59,297 @@
 import org.apache.http.util.EntityUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Base64;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 public class MysqlLoadManager {
     private static final Logger LOG = 
LogManager.getLogger(MysqlLoadManager.class);
 
     private final ThreadPoolExecutor mysqlLoadPool;
+    private final ConcurrentHashMap<String, Long> mysqlAuthTokens = new 
ConcurrentHashMap<>();
+    private final ScheduledExecutorService authCleaner;
 
     public MysqlLoadManager() {
         this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4, 
"Mysql Load", true);
+        this.authCleaner = Executors.newScheduledThreadPool(1);
+        this.authCleaner.scheduleAtFixedRate(() -> {
+            synchronized (mysqlAuthTokens) {
+                for (String key : mysqlAuthTokens.keySet()) {
+                    if (System.currentTimeMillis() - mysqlAuthTokens.get(key) 
>= 3600 * 1000) {
+                        mysqlAuthTokens.remove(key);
+                    }
+                }
+            }
+        }, 1, 1, TimeUnit.DAYS);
+    }
+
+    // this method only will be called in master node, since stream load only 
send message to master.
+    public boolean checkAuthToken(String token) {
+        return mysqlAuthTokens.containsKey(token);
+    }
+
+    // context only use in no master branch.
+    public String acquireToken(ConnectContext context) {
+        if (Env.getCurrentEnv().isMaster()) {
+            String token = UUID.randomUUID().toString();
+            long createTime = System.currentTimeMillis();
+            mysqlAuthTokens.put(token, createTime);

Review Comment:
   If we generate token for every load job, there may be too many tokens in 
memeory(in high concurrency load scene).
   Maybe we can make it simple:
   1. On master, there is a scheduled timer thread and a FIFO fixed size queue.
   2. Every 12 hour(larger interval to make long-run job happy), the timer will 
generate a token and push it to the queue. And because this is a fixed size 
queue, the oldest token will be removed automatically.
   3. When calling `acquireToken`, it will always return the token at the end 
of the queue(the latest token)
   4. When calling `checkToken`, it can simple check if the token exist in the 
queue.
   
   Therefore, we only need to keep several tokens, and these tokens can be used 
by all load jobs.
   And no need to save the timestamp, because the expired token will be removed 
automatically, we only need
   to control the interval and queue size.
   And no need to call `releaseToken()` manually



##########
fe/fe-core/src/main/java/org/apache/doris/qe/StreamLoadTxnExecutor.java:
##########
@@ -42,20 +42,31 @@
 
 import org.apache.thrift.TException;
 
+import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-public class InsertStreamTxnExecutor {
+public class StreamLoadTxnExecutor {
     private long txnId;
     private TUniqueId loadId;
-    private TransactionEntry txnEntry;
+    private final TransactionEntry txnEntry;
+    private final TFileFormatType formatType;
 
-    public InsertStreamTxnExecutor(TransactionEntry txnEntry) {
-        this.txnEntry = txnEntry;
+    private final TFileCompressType compressType;
+
+    public StreamLoadTxnExecutor(TransactionEntry txnEntry) {
+        this(txnEntry, TFileFormatType.FORMAT_PROTO, TFileCompressType.PLAIN);
+    }
+
+    public StreamLoadTxnExecutor(TransactionEntry entry, TFileFormatType 
formatType, TFileCompressType compressType) {
+        this.txnEntry = entry;
+        this.formatType = formatType;
+        this.compressType = compressType;
     }
 
     public void beginTransaction(TStreamLoadPutRequest request) throws 
UserException, TException, TimeoutException,

Review Comment:
   How about name it `executeTranscation`?



##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -43,55 +59,297 @@
 import org.apache.http.util.EntityUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Base64;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 public class MysqlLoadManager {
     private static final Logger LOG = 
LogManager.getLogger(MysqlLoadManager.class);
 
     private final ThreadPoolExecutor mysqlLoadPool;
+    private final ConcurrentHashMap<String, Long> mysqlAuthTokens = new 
ConcurrentHashMap<>();
+    private final ScheduledExecutorService authCleaner;
 
     public MysqlLoadManager() {
         this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4, 
"Mysql Load", true);
+        this.authCleaner = Executors.newScheduledThreadPool(1);
+        this.authCleaner.scheduleAtFixedRate(() -> {
+            synchronized (mysqlAuthTokens) {
+                for (String key : mysqlAuthTokens.keySet()) {
+                    if (System.currentTimeMillis() - mysqlAuthTokens.get(key) 
>= 3600 * 1000) {
+                        mysqlAuthTokens.remove(key);
+                    }
+                }
+            }
+        }, 1, 1, TimeUnit.DAYS);
+    }
+
+    // this method only will be called in master node, since stream load only 
send message to master.
+    public boolean checkAuthToken(String token) {
+        return mysqlAuthTokens.containsKey(token);
+    }
+
+    // context only use in no master branch.
+    public String acquireToken(ConnectContext context) {
+        if (Env.getCurrentEnv().isMaster()) {
+            String token = UUID.randomUUID().toString();
+            long createTime = System.currentTimeMillis();
+            mysqlAuthTokens.put(token, createTime);
+            return token;
+        } else {
+            MasterTxnExecutor masterTxnExecutor = new 
MasterTxnExecutor(context);
+            try {
+                return masterTxnExecutor.acquireToken();
+            } catch (TException e) {
+                LOG.error("acquire token error", e);
+                return null;
+            }
+        }
+    }
+
+    // context only use in no master branch.
+    public void releaseToken(ConnectContext context, String token) {
+        if (Env.getCurrentEnv().isMaster()) {
+            mysqlAuthTokens.remove(token);
+        } else {
+            MasterTxnExecutor masterTxnExecutor = new 
MasterTxnExecutor(context);
+            try {
+                masterTxnExecutor.releaseToken(token);
+            } catch (TException e) {
+                LOG.error("release token error", e);
+            }
+        }
     }
 
     public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext 
context, LoadStmt stmt)
             throws IOException, LoadException {
         LoadJobRowResult loadResult = new LoadJobRowResult();
         // Mysql data load only have one data desc
         DataDescription dataDesc = stmt.getDataDescriptions().get(0);
-        String database = dataDesc.getDbName();
-        String table = dataDesc.getTableName();
         List<String> filePaths = dataDesc.getFilePaths();
-        try (final CloseableHttpClient httpclient = 
HttpClients.createDefault()) {
-            for (String file : filePaths) {
-                InputStreamEntity entity = getInputStreamEntity(context, 
dataDesc.isClientLocal(), file);
-                HttpPut request = generateRequestForMySqlLoad(entity, 
dataDesc, database, table);
-                try (final CloseableHttpResponse response = 
httpclient.execute(request)) {
-                    JsonObject result = 
JsonParser.parseString(EntityUtils.toString(response.getEntity()))
-                            .getAsJsonObject();
-                    if 
(!result.get("Status").getAsString().equalsIgnoreCase("Success")) {
-                        LOG.warn("Execute stream load for mysql data load 
failed with message: " + request);
-                        throw new 
LoadException(result.get("Message").getAsString());
+        if (Config.use_http_mysql_load_job) {
+            String database = dataDesc.getDbName();
+            String table = dataDesc.getTableName();
+            String token = acquireToken(context);
+            if (token == null) {
+                throw new LoadException("Can't get the load token in mysql 
load");
+            }
+            try (final CloseableHttpClient httpclient = 
HttpClients.createDefault()) {
+                for (String file : filePaths) {
+                    InputStreamEntity entity = getInputStreamEntity(context, 
dataDesc.isClientLocal(), file);
+                    HttpPut request = generateRequestForMySqlLoad(entity, 
dataDesc, database, table, token);
+                    try (final CloseableHttpResponse response = 
httpclient.execute(request)) {
+                        JsonObject result = 
JsonParser.parseString(EntityUtils.toString(response.getEntity()))
+                                .getAsJsonObject();
+                        if 
(!result.get("Status").getAsString().equalsIgnoreCase("Success")) {
+                            LOG.warn("Execute stream load for mysql data load 
failed with message: " + request);
+                            throw new 
LoadException(result.get("Message").getAsString());
+                        }
+                        
loadResult.incRecords(result.get("NumberLoadedRows").getAsLong());
+                        
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
                     }
-                    
loadResult.incRecords(result.get("NumberLoadedRows").getAsLong());
-                    
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
                 }
             }
+            releaseToken(context, token);
+        } else {
+            StreamLoadTxnExecutor executor = null;
+            try {
+                String database = dataDesc.getFullDatabaseName();
+                String table = dataDesc.getTableName();
+                TransactionEntry entry = prepareTransactionEntry(database, 
table);
+                openTxn(context, entry);
+                executor = beginTxn(context, entry, dataDesc);
+                // sendData
+                for (String file : filePaths) {
+                    sendData(context, executor, file);
+                }
+                executor.commitTransaction();
+            } catch (Exception e) {
+                LOG.error("Failed to load mysql data into doris", e);
+                if (executor != null) {
+                    try {
+                        executor.abortTransaction();
+                    } catch (Exception ex) {
+                        throw new LoadException("Failed when abort the 
transaction", ex);
+                    }
+                }
+                throw new LoadException("Load failed when execute the mysql 
data load", e);
+            }
         }
         return loadResult;
     }
 
+    private TransactionEntry prepareTransactionEntry(String database, String 
table)
+            throws TException {
+        TTxnParams txnConf = new TTxnParams();
+        
txnConf.setNeedTxn(true).setEnablePipelineTxnLoad(Config.enable_pipeline_load)
+                .setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("");
+        Database dbObj = Env.getCurrentInternalCatalog()
+                .getDbOrException(database, s -> new TException("database is 
invalid for dbName: " + s));
+        Table tblObj = dbObj.getTableOrException(table, s -> new 
TException("table is invalid: " + s));
+        txnConf.setDbId(dbObj.getId()).setTbl(table).setDb(database);
+
+        TransactionEntry txnEntry = new TransactionEntry();
+        txnEntry.setTxnConf(txnConf);
+        txnEntry.setTable(tblObj);
+        txnEntry.setDb(dbObj);
+        String label = UUID.randomUUID().toString();
+        txnEntry.setLabel(label);
+        return txnEntry;
+    }
+
+    private void openTxn(ConnectContext context, TransactionEntry txnEntry) 
throws Exception {

Review Comment:
   The method name is confusing.
   in `openTxn()`, it actually "beginTransaction()".
   and in `beginTxn()`, it actually do the plan job.
   
   
   



##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -716,6 +721,19 @@ struct TFetchSchemaTableDataResult {
   2: optional list<Data.TRow> data_batch;
 }
 
+struct TMySqlLoadAcquireTokenResult {
+    1: required Status.TStatus status

Review Comment:
   use `optional`



##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -483,6 +483,7 @@ struct TLoadTxnBeginRequest {
     10: optional i64 timeout
     11: optional Types.TUniqueId request_id
     12: optional string auth_code_uuid
+    13: optional string token

Review Comment:
   I think we can unify `auth_code_uuid` and `token`.
   
   Currently, `auth_code_uuid` is used for transactional insert operation, and 
is generated for each TransactionState on FE side, and pass it to BE.
   I think we can replace the `auth_code_uuid` with `token`. When creating 
`TransactionState`, get a token from TokenManager, and when checking token, 
just check if token is still valid.
   
   



##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -716,6 +721,19 @@ struct TFetchSchemaTableDataResult {
   2: optional list<Data.TRow> data_batch;
 }
 
+struct TMySqlLoadAcquireTokenResult {
+    1: required Status.TStatus status
+    2: optional string token
+}
+
+struct TMySqlLoadReleaseTokenRequest {
+    1: optional string token
+}
+
+struct TMySqlLoadReleaseTokenResult {
+    1: required Status.TStatus status

Review Comment:
   use `optional`, it is a tradition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to