morningman commented on code in PR #16073: URL: https://github.com/apache/doris/pull/16073#discussion_r1101018493
########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java: ########## @@ -43,55 +59,297 @@ import org.apache.http.util.EntityUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Base64; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class MysqlLoadManager { private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class); private final ThreadPoolExecutor mysqlLoadPool; + private final ConcurrentHashMap<String, Long> mysqlAuthTokens = new ConcurrentHashMap<>(); + private final ScheduledExecutorService authCleaner; public MysqlLoadManager() { this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4, "Mysql Load", true); + this.authCleaner = Executors.newScheduledThreadPool(1); + this.authCleaner.scheduleAtFixedRate(() -> { + synchronized (mysqlAuthTokens) { + for (String key : mysqlAuthTokens.keySet()) { + if (System.currentTimeMillis() - mysqlAuthTokens.get(key) >= 3600 * 1000) { Review Comment: If the load time is longer than 1 hour, the load will fail because the auth code is expired? ########## fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java: ########## @@ -98,6 +98,7 @@ public class DataDescription { private final String tableName; private String dbName; + private String fullDatabaseName; Review Comment: what is different between `dbName` and `fullDatabaseName`? Add comment to explain, or I think we can merge them into one ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java: ########## @@ -43,55 +59,297 @@ import org.apache.http.util.EntityUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Base64; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class MysqlLoadManager { private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class); private final ThreadPoolExecutor mysqlLoadPool; + private final ConcurrentHashMap<String, Long> mysqlAuthTokens = new ConcurrentHashMap<>(); + private final ScheduledExecutorService authCleaner; public MysqlLoadManager() { this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4, "Mysql Load", true); + this.authCleaner = Executors.newScheduledThreadPool(1); + this.authCleaner.scheduleAtFixedRate(() -> { + synchronized (mysqlAuthTokens) { + for (String key : mysqlAuthTokens.keySet()) { + if (System.currentTimeMillis() - mysqlAuthTokens.get(key) >= 3600 * 1000) { + mysqlAuthTokens.remove(key); + } + } + } + }, 1, 1, TimeUnit.DAYS); + } + + // this method only will be called in master node, since stream load only send message to master. + public boolean checkAuthToken(String token) { + return mysqlAuthTokens.containsKey(token); + } + + // context only use in no master branch. + public String acquireToken(ConnectContext context) { + if (Env.getCurrentEnv().isMaster()) { + String token = UUID.randomUUID().toString(); + long createTime = System.currentTimeMillis(); + mysqlAuthTokens.put(token, createTime); + return token; + } else { + MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(context); + try { + return masterTxnExecutor.acquireToken(); + } catch (TException e) { + LOG.error("acquire token error", e); Review Comment: ```suggestion LOG.warn("acquire token error", e); ``` ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java: ########## @@ -43,55 +59,297 @@ import org.apache.http.util.EntityUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Base64; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class MysqlLoadManager { private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class); private final ThreadPoolExecutor mysqlLoadPool; + private final ConcurrentHashMap<String, Long> mysqlAuthTokens = new ConcurrentHashMap<>(); + private final ScheduledExecutorService authCleaner; public MysqlLoadManager() { this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4, "Mysql Load", true); + this.authCleaner = Executors.newScheduledThreadPool(1); + this.authCleaner.scheduleAtFixedRate(() -> { + synchronized (mysqlAuthTokens) { + for (String key : mysqlAuthTokens.keySet()) { + if (System.currentTimeMillis() - mysqlAuthTokens.get(key) >= 3600 * 1000) { + mysqlAuthTokens.remove(key); + } + } + } + }, 1, 1, TimeUnit.DAYS); + } + + // this method only will be called in master node, since stream load only send message to master. + public boolean checkAuthToken(String token) { + return mysqlAuthTokens.containsKey(token); + } + + // context only use in no master branch. + public String acquireToken(ConnectContext context) { + if (Env.getCurrentEnv().isMaster()) { + String token = UUID.randomUUID().toString(); + long createTime = System.currentTimeMillis(); + mysqlAuthTokens.put(token, createTime); Review Comment: If we generate token for every load job, there may be too many tokens in memeory(in high concurrency load scene). Maybe we can make it simple: 1. On master, there is a scheduled timer thread and a FIFO fixed size queue. 2. Every 12 hour(larger interval to make long-run job happy), the timer will generate a token and push it to the queue. And because this is a fixed size queue, the oldest token will be removed automatically. 3. When calling `acquireToken`, it will always return the token at the end of the queue(the latest token) 4. When calling `checkToken`, it can simple check if the token exist in the queue. Therefore, we only need to keep several tokens, and these tokens can be used by all load jobs. And no need to save the timestamp, because the expired token will be removed automatically, we only need to control the interval and queue size. And no need to call `releaseToken()` manually ########## fe/fe-core/src/main/java/org/apache/doris/qe/StreamLoadTxnExecutor.java: ########## @@ -42,20 +42,31 @@ import org.apache.thrift.TException; +import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -public class InsertStreamTxnExecutor { +public class StreamLoadTxnExecutor { private long txnId; private TUniqueId loadId; - private TransactionEntry txnEntry; + private final TransactionEntry txnEntry; + private final TFileFormatType formatType; - public InsertStreamTxnExecutor(TransactionEntry txnEntry) { - this.txnEntry = txnEntry; + private final TFileCompressType compressType; + + public StreamLoadTxnExecutor(TransactionEntry txnEntry) { + this(txnEntry, TFileFormatType.FORMAT_PROTO, TFileCompressType.PLAIN); + } + + public StreamLoadTxnExecutor(TransactionEntry entry, TFileFormatType formatType, TFileCompressType compressType) { + this.txnEntry = entry; + this.formatType = formatType; + this.compressType = compressType; } public void beginTransaction(TStreamLoadPutRequest request) throws UserException, TException, TimeoutException, Review Comment: How about name it `executeTranscation`? ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java: ########## @@ -43,55 +59,297 @@ import org.apache.http.util.EntityUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Base64; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class MysqlLoadManager { private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class); private final ThreadPoolExecutor mysqlLoadPool; + private final ConcurrentHashMap<String, Long> mysqlAuthTokens = new ConcurrentHashMap<>(); + private final ScheduledExecutorService authCleaner; public MysqlLoadManager() { this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4, "Mysql Load", true); + this.authCleaner = Executors.newScheduledThreadPool(1); + this.authCleaner.scheduleAtFixedRate(() -> { + synchronized (mysqlAuthTokens) { + for (String key : mysqlAuthTokens.keySet()) { + if (System.currentTimeMillis() - mysqlAuthTokens.get(key) >= 3600 * 1000) { + mysqlAuthTokens.remove(key); + } + } + } + }, 1, 1, TimeUnit.DAYS); + } + + // this method only will be called in master node, since stream load only send message to master. + public boolean checkAuthToken(String token) { + return mysqlAuthTokens.containsKey(token); + } + + // context only use in no master branch. + public String acquireToken(ConnectContext context) { + if (Env.getCurrentEnv().isMaster()) { + String token = UUID.randomUUID().toString(); + long createTime = System.currentTimeMillis(); + mysqlAuthTokens.put(token, createTime); + return token; + } else { + MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(context); + try { + return masterTxnExecutor.acquireToken(); + } catch (TException e) { + LOG.error("acquire token error", e); + return null; + } + } + } + + // context only use in no master branch. + public void releaseToken(ConnectContext context, String token) { + if (Env.getCurrentEnv().isMaster()) { + mysqlAuthTokens.remove(token); + } else { + MasterTxnExecutor masterTxnExecutor = new MasterTxnExecutor(context); + try { + masterTxnExecutor.releaseToken(token); + } catch (TException e) { + LOG.error("release token error", e); + } + } } public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext context, LoadStmt stmt) throws IOException, LoadException { LoadJobRowResult loadResult = new LoadJobRowResult(); // Mysql data load only have one data desc DataDescription dataDesc = stmt.getDataDescriptions().get(0); - String database = dataDesc.getDbName(); - String table = dataDesc.getTableName(); List<String> filePaths = dataDesc.getFilePaths(); - try (final CloseableHttpClient httpclient = HttpClients.createDefault()) { - for (String file : filePaths) { - InputStreamEntity entity = getInputStreamEntity(context, dataDesc.isClientLocal(), file); - HttpPut request = generateRequestForMySqlLoad(entity, dataDesc, database, table); - try (final CloseableHttpResponse response = httpclient.execute(request)) { - JsonObject result = JsonParser.parseString(EntityUtils.toString(response.getEntity())) - .getAsJsonObject(); - if (!result.get("Status").getAsString().equalsIgnoreCase("Success")) { - LOG.warn("Execute stream load for mysql data load failed with message: " + request); - throw new LoadException(result.get("Message").getAsString()); + if (Config.use_http_mysql_load_job) { + String database = dataDesc.getDbName(); + String table = dataDesc.getTableName(); + String token = acquireToken(context); + if (token == null) { + throw new LoadException("Can't get the load token in mysql load"); + } + try (final CloseableHttpClient httpclient = HttpClients.createDefault()) { + for (String file : filePaths) { + InputStreamEntity entity = getInputStreamEntity(context, dataDesc.isClientLocal(), file); + HttpPut request = generateRequestForMySqlLoad(entity, dataDesc, database, table, token); + try (final CloseableHttpResponse response = httpclient.execute(request)) { + JsonObject result = JsonParser.parseString(EntityUtils.toString(response.getEntity())) + .getAsJsonObject(); + if (!result.get("Status").getAsString().equalsIgnoreCase("Success")) { + LOG.warn("Execute stream load for mysql data load failed with message: " + request); + throw new LoadException(result.get("Message").getAsString()); + } + loadResult.incRecords(result.get("NumberLoadedRows").getAsLong()); + loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt()); } - loadResult.incRecords(result.get("NumberLoadedRows").getAsLong()); - loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt()); } } + releaseToken(context, token); + } else { + StreamLoadTxnExecutor executor = null; + try { + String database = dataDesc.getFullDatabaseName(); + String table = dataDesc.getTableName(); + TransactionEntry entry = prepareTransactionEntry(database, table); + openTxn(context, entry); + executor = beginTxn(context, entry, dataDesc); + // sendData + for (String file : filePaths) { + sendData(context, executor, file); + } + executor.commitTransaction(); + } catch (Exception e) { + LOG.error("Failed to load mysql data into doris", e); + if (executor != null) { + try { + executor.abortTransaction(); + } catch (Exception ex) { + throw new LoadException("Failed when abort the transaction", ex); + } + } + throw new LoadException("Load failed when execute the mysql data load", e); + } } return loadResult; } + private TransactionEntry prepareTransactionEntry(String database, String table) + throws TException { + TTxnParams txnConf = new TTxnParams(); + txnConf.setNeedTxn(true).setEnablePipelineTxnLoad(Config.enable_pipeline_load) + .setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl(""); + Database dbObj = Env.getCurrentInternalCatalog() + .getDbOrException(database, s -> new TException("database is invalid for dbName: " + s)); + Table tblObj = dbObj.getTableOrException(table, s -> new TException("table is invalid: " + s)); + txnConf.setDbId(dbObj.getId()).setTbl(table).setDb(database); + + TransactionEntry txnEntry = new TransactionEntry(); + txnEntry.setTxnConf(txnConf); + txnEntry.setTable(tblObj); + txnEntry.setDb(dbObj); + String label = UUID.randomUUID().toString(); + txnEntry.setLabel(label); + return txnEntry; + } + + private void openTxn(ConnectContext context, TransactionEntry txnEntry) throws Exception { Review Comment: The method name is confusing. in `openTxn()`, it actually "beginTransaction()". and in `beginTxn()`, it actually do the plan job. ########## gensrc/thrift/FrontendService.thrift: ########## @@ -716,6 +721,19 @@ struct TFetchSchemaTableDataResult { 2: optional list<Data.TRow> data_batch; } +struct TMySqlLoadAcquireTokenResult { + 1: required Status.TStatus status Review Comment: use `optional` ########## gensrc/thrift/FrontendService.thrift: ########## @@ -483,6 +483,7 @@ struct TLoadTxnBeginRequest { 10: optional i64 timeout 11: optional Types.TUniqueId request_id 12: optional string auth_code_uuid + 13: optional string token Review Comment: I think we can unify `auth_code_uuid` and `token`. Currently, `auth_code_uuid` is used for transactional insert operation, and is generated for each TransactionState on FE side, and pass it to BE. I think we can replace the `auth_code_uuid` with `token`. When creating `TransactionState`, get a token from TokenManager, and when checking token, just check if token is still valid. ########## gensrc/thrift/FrontendService.thrift: ########## @@ -716,6 +721,19 @@ struct TFetchSchemaTableDataResult { 2: optional list<Data.TRow> data_batch; } +struct TMySqlLoadAcquireTokenResult { + 1: required Status.TStatus status + 2: optional string token +} + +struct TMySqlLoadReleaseTokenRequest { + 1: optional string token +} + +struct TMySqlLoadReleaseTokenResult { + 1: required Status.TStatus status Review Comment: use `optional`, it is a tradition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org