This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d36b927fdb [improvement](fe-ut) use local journal to make FE ut run fast (#11038) d36b927fdb is described below commit d36b927fdbc44492f45c28747bbdc7cfd473f23b Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Thu Jul 21 09:12:21 2022 +0800 [improvement](fe-ut) use local journal to make FE ut run fast (#11038) * [improvement](fe-ut) use local journal to make FE ut run fast --- .../src/main/java/org/apache/doris/PaloFe.java | 43 ++-- .../java/org/apache/doris/catalog/Catalog.java | 37 +-- .../main/java/org/apache/doris/common/Config.java | 4 +- .../org/apache/doris/common/util/MasterDaemon.java | 2 +- .../org/apache/doris/journal/JournalEntity.java | 5 + .../apache/doris/journal/local/LocalJournal.java | 12 +- .../doris/journal/local/LocalJournalCursor.java | 279 +++++---------------- .../org/apache/doris/metric/SystemMetrics.java | 4 +- .../java/org/apache/doris/mysql/MysqlChannel.java | 2 +- .../java/org/apache/doris/persist/EditLog.java | 10 +- .../doris/persist/EditLogFileOutputStream.java | 4 +- .../org/apache/doris/persist/OperationType.java | 3 +- .../java/org/apache/doris/system/Frontend.java | 3 +- .../org/apache/doris/utframe/MockedFrontend.java | 25 +- .../apache/doris/utframe/TestWithFeService.java | 49 ++-- .../org/apache/doris/utframe/UtFrameUtils.java | 38 ++- 16 files changed, 228 insertions(+), 292 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java index a692789cdf..817c641982 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java +++ b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java @@ -61,11 +61,14 @@ public class PaloFe { public static final String PID_DIR = System.getenv("PID_DIR"); public static void main(String[] args) { - start(DORIS_HOME_DIR, PID_DIR, args); + StartupOptions options = new StartupOptions(); + options.enableHttpServer = true; + options.enableQeService = true; + start(DORIS_HOME_DIR, PID_DIR, args, options); } // entrance for doris frontend - public static void start(String dorisHomeDir, String pidDir, String[] args) { + public static void start(String dorisHomeDir, String pidDir, String[] args, StartupOptions options) { if (System.getenv("DORIS_LOG_TO_STDERR") != null) { Log4jConfig.foreground = true; } @@ -135,24 +138,27 @@ public class PaloFe { // 1. HttpServer for HTTP Server // 2. FeServer for Thrift Server // 3. QeService for MySQL Server - QeService qeService = new QeService(Config.query_port, Config.mysql_service_nio_enabled, - ExecuteEnv.getInstance().getScheduler()); FeServer feServer = new FeServer(Config.rpc_port); - feServer.start(); - HttpServer httpServer = new HttpServer(); - httpServer.setPort(Config.http_port); - httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size); - httpServer.setAcceptors(Config.jetty_server_acceptors); - httpServer.setSelectors(Config.jetty_server_selectors); - httpServer.setWorkers(Config.jetty_server_workers); - httpServer.setMaxThreads(Config.jetty_threadPool_maxThreads); - httpServer.setMinThreads(Config.jetty_threadPool_minThreads); - httpServer.setMaxHttpHeaderSize(Config.jetty_server_max_http_header_size); - httpServer.start(); + if (options.enableHttpServer) { + HttpServer httpServer = new HttpServer(); + httpServer.setPort(Config.http_port); + httpServer.setMaxHttpPostSize(Config.jetty_server_max_http_post_size); + httpServer.setAcceptors(Config.jetty_server_acceptors); + httpServer.setSelectors(Config.jetty_server_selectors); + httpServer.setWorkers(Config.jetty_server_workers); + httpServer.setMaxThreads(Config.jetty_threadPool_maxThreads); + httpServer.setMinThreads(Config.jetty_threadPool_minThreads); + httpServer.setMaxHttpHeaderSize(Config.jetty_server_max_http_header_size); + httpServer.start(); + } - qeService.start(); + if (options.enableQeService) { + QeService qeService = new QeService(Config.query_port, Config.mysql_service_nio_enabled, + ExecuteEnv.getInstance().getScheduler()); + qeService.start(); + } ThreadPoolManager.registerAllThreadPoolMetric(); @@ -372,4 +378,9 @@ public class PaloFe { throw e; } } + + public static class StartupOptions { + public boolean enableHttpServer = true; + public boolean enableQeService = true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index d865769094..40a93b1510 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -270,7 +270,7 @@ public class Catalog { private static final int STATE_CHANGE_CHECK_INTERVAL_MS = 100; private static final int REPLAY_INTERVAL_MS = 1; private static final String BDB_DIR = "/bdb"; - private static final String IMAGE_DIR = "/image"; + public static final String IMAGE_DIR = "/image"; private String metaDir; private String bdbDir; @@ -802,13 +802,10 @@ public class Catalog { if (!bdbDir.exists()) { bdbDir.mkdirs(); } - - File imageDir = new File(this.imageDir); - if (!imageDir.exists()) { - imageDir.mkdirs(); - } - } else { - throw new Exception("Invalid edit log type: " + Config.edit_log_type); + } + File imageDir = new File(this.imageDir); + if (!imageDir.exists()) { + imageDir.mkdirs(); } // init plugin manager @@ -834,18 +831,27 @@ public class Catalog { // 6. start state listener thread createStateListener(); listener.start(); + + if (!Config.edit_log_type.equalsIgnoreCase("bdb")) { + // If not using bdb, we need to notify the FE type transfer manually. + notifyNewFETypeTransfer(FrontendNodeType.MASTER); + } } // wait until FE is ready. public void waitForReady() throws InterruptedException { + long counter = 0; while (true) { if (isReady()) { LOG.info("catalog is ready. FE type: {}", feType); break; } - Thread.sleep(2000); - LOG.info("wait catalog to be ready. FE type: {}. is ready: {}", feType, isReady.get()); + Thread.sleep(100); + if (counter++ % 20 == 0) { + LOG.info("wait catalog to be ready. FE type: {}. is ready: {}, counter: {}", feType, isReady.get(), + counter); + } } } @@ -1224,9 +1230,11 @@ public class Catalog { editLog.open(); - if (!haProtocol.fencing()) { - LOG.error("fencing failed. will exit."); - System.exit(-1); + if (Config.edit_log_type.equalsIgnoreCase("bdb")) { + if (!haProtocol.fencing()) { + LOG.error("fencing failed. will exit."); + System.exit(-1); + } } long replayStartTime = System.currentTimeMillis(); @@ -1285,7 +1293,6 @@ public class Catalog { canRead.set(true); isReady.set(true); - checkLowerCaseTableNames(); String msg = "master finished to replay journal, can write now."; @@ -1404,7 +1411,7 @@ public class Catalog { // transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER // add helper sockets - if (Config.edit_log_type.equalsIgnoreCase("BDB")) { + if (Config.edit_log_type.equalsIgnoreCase("bdb")) { for (Frontend fe : frontends.values()) { if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) { ((BDBHA) getHaProtocol()).addHelperSocket(fe.getHost(), fe.getEditLogPort()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 7367f97e8f..289c4d24bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -188,10 +188,10 @@ public class Config extends ConfigBase { /** * Edit log type. * BDB: write log to bdbje - * LOCAL: deprecated. + * LOCAL: use local file to save edit log, only used for unit test */ @ConfField - public static String edit_log_type = "BDB"; + public static String edit_log_type = "bdb"; /** * bdbje port diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/MasterDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/MasterDaemon.java index 103880ef99..8d1aa8ebdb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/MasterDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/MasterDaemon.java @@ -49,7 +49,7 @@ public class MasterDaemon extends Daemon { try { // not return, but sleep a while. to avoid some thread with large running interval will // wait for a long time to start again. - Thread.sleep(10 * 1000); + Thread.sleep(100); } catch (InterruptedException e) { LOG.warn("interrupted exception. thread: {}", getName(), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 0788662622..943315def0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -147,6 +147,11 @@ public class JournalEntity implements Writable { boolean isRead = false; LOG.debug("get opcode: {}", opCode); switch (opCode) { + case OperationType.OP_LOCAL_EOF: { + data = null; + isRead = true; + break; + } case OperationType.OP_SAVE_NEXTID: { data = new Text(); ((Text) data).readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java index e656955dba..e54237a7bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java @@ -25,6 +25,7 @@ import org.apache.doris.persist.EditLogFileOutputStream; import org.apache.doris.persist.EditLogOutputStream; import org.apache.doris.persist.Storage; +import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -54,10 +55,18 @@ public class LocalJournal implements Journal { this.journalId.set(getCurrentJournalId(storage.getEditsFileSequenceNumbers())); long id = journalId.get(); - if (id == storage.getEditsSeq()) { + if (storage.getEditsSeq() == 0) { + // there is no edits file, create first one + Preconditions.checkState(id == 1, id); + currentEditFile = new File(imageDir, "edits.1"); + currentEditFile.createNewFile(); + outputStream = new EditLogFileOutputStream(currentEditFile); + } else if (id == storage.getEditsSeq()) { + // there exist edits files, point to the latest one and set position to the end of file. this.currentEditFile = storage.getEditsFile(id); this.outputStream = new EditLogFileOutputStream(currentEditFile); } else { + // create next edits file currentEditFile = new File(imageDir, "edits." + (id + 1)); currentEditFile.createNewFile(); outputStream = new EditLogFileOutputStream(currentEditFile); @@ -78,6 +87,7 @@ public class LocalJournal implements Journal { return; } if (outputStream != null) { + outputStream.flush(); outputStream.close(); } currentEditFile = new File(imageDir, "edits." + journalId.get()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java index ffef5bb74e..8031a2c991 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournalCursor.java @@ -17,33 +17,11 @@ package org.apache.doris.journal.local; -import org.apache.doris.catalog.Database; -import org.apache.doris.common.io.Text; -import org.apache.doris.ha.MasterInfo; import org.apache.doris.journal.JournalCursor; import org.apache.doris.journal.JournalEntity; -import org.apache.doris.journal.bdbje.Timestamp; -import org.apache.doris.load.DeleteInfo; -import org.apache.doris.load.LoadErrorHub; -import org.apache.doris.load.LoadJob; -import org.apache.doris.persist.BatchDropInfo; -import org.apache.doris.persist.BatchModifyPartitionsInfo; -import org.apache.doris.persist.ConsistencyCheckInfo; -import org.apache.doris.persist.CreateTableInfo; -import org.apache.doris.persist.DatabaseInfo; -import org.apache.doris.persist.DropInfo; -import org.apache.doris.persist.DropPartitionInfo; import org.apache.doris.persist.EditLogFileInputStream; -import org.apache.doris.persist.ModifyPartitionInfo; import org.apache.doris.persist.OperationType; -import org.apache.doris.persist.PartitionPersistInfo; -import org.apache.doris.persist.RecoverInfo; -import org.apache.doris.persist.RefreshExternalTableInfo; -import org.apache.doris.persist.ReplicaPersistInfo; import org.apache.doris.persist.Storage; -import org.apache.doris.persist.TableInfo; -import org.apache.doris.system.Backend; -import org.apache.doris.system.Frontend; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -55,7 +33,10 @@ import java.io.File; import java.io.IOException; import java.util.List; -@Deprecated +/** + * For unit test only. + * Use local file save edit logs. + */ public final class LocalJournalCursor implements JournalCursor { private static final Logger LOG = LogManager.getLogger(LocalJournalCursor.class); private String imageDir; @@ -121,35 +102,29 @@ public final class LocalJournalCursor implements JournalCursor { new EditLogFileInputStream(new File(imageDir, "edits." + fileName)))); while (scannedKey < fromKey) { - short opCode = currentStream.readShort(); - if (opCode == OperationType.OP_INVALID) { - System.out.println("Can not find the key:" + fromKey); - throw new IOException(); - } - getJournalEntity(currentStream, opCode); + getJournalEntity(currentStream); scannedKey++; } } - @Override - public JournalEntity next() { + public JournalEntity next2() { if (currentKey > toKey) { return null; } JournalEntity ret = null; try { - short opCode = OperationType.OP_INVALID; + short opCode = OperationType.OP_LOCAL_EOF; while (true) { try { opCode = currentStream.readShort(); - if (opCode == OperationType.OP_INVALID) { + if (opCode == OperationType.OP_LOCAL_EOF) { if (nextFilePositionIndex < editFileSequenceNumbers.size()) { currentStream.close(); currentStream = new DataInputStream(new BufferedInputStream(new EditLogFileInputStream( - new File(imageDir, "edits." + editFileSequenceNumbers - .get(nextFilePositionIndex))))); + new File(imageDir, + "edits." + editFileSequenceNumbers.get(nextFilePositionIndex))))); nextFilePositionIndex++; continue; } else { @@ -159,9 +134,8 @@ public final class LocalJournalCursor implements JournalCursor { } catch (EOFException e) { if (nextFilePositionIndex < editFileSequenceNumbers.size()) { currentStream.close(); - currentStream = new DataInputStream( - new BufferedInputStream(new EditLogFileInputStream(new File( - imageDir, "edits." + editFileSequenceNumbers.get(nextFilePositionIndex))))); + currentStream = new DataInputStream(new BufferedInputStream(new EditLogFileInputStream( + new File(imageDir, "edits." + editFileSequenceNumbers.get(nextFilePositionIndex))))); nextFilePositionIndex++; continue; } else { @@ -171,7 +145,7 @@ public final class LocalJournalCursor implements JournalCursor { break; } - ret = getJournalEntity(currentStream, opCode); + ret = getJournalEntity(currentStream); currentKey++; return ret; } catch (IOException e) { @@ -186,191 +160,64 @@ public final class LocalJournalCursor implements JournalCursor { return ret; } - @Deprecated - private JournalEntity getJournalEntity(DataInputStream in, short opCode) throws IOException { - JournalEntity ret = new JournalEntity(); - ret.setOpCode(opCode); - switch (opCode) { - case OperationType.OP_SAVE_NEXTID: { - Text text = new Text(); - text.readFields(in); - ret.setData(text); - break; - } - case OperationType.OP_SAVE_TRANSACTION_ID: { - Text text = new Text(); - text.readFields(in); - ret.setData(text); - break; - } - case OperationType.OP_CREATE_DB: { - Database db = new Database(); - db.readFields(in); - ret.setData(db); - break; - } - case OperationType.OP_DROP_DB: { - Text text = new Text(); - text.readFields(in); - ret.setData(text); - break; - } - case OperationType.OP_ALTER_DB: - case OperationType.OP_RENAME_DB: { - DatabaseInfo info = new DatabaseInfo(); - info.readFields(in); - ret.setData(info); - break; - } - case OperationType.OP_CREATE_TABLE: { - CreateTableInfo info = new CreateTableInfo(); - info.readFields(in); - ret.setData(info); - break; - } - case OperationType.OP_DROP_TABLE: { - DropInfo info = new DropInfo(); - info.readFields(in); - ret.setData(info); - break; - } - case OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA: { - RefreshExternalTableInfo info = RefreshExternalTableInfo.read(in); - ret.setData(info); - break; - } - case OperationType.OP_ADD_PARTITION: { - PartitionPersistInfo info = new PartitionPersistInfo(); - info.readFields(in); - ret.setData(info); - break; - } - case OperationType.OP_DROP_PARTITION: { - DropPartitionInfo info = DropPartitionInfo.read(in); - ret.setData(info); - break; - } - case OperationType.OP_MODIFY_PARTITION: { - ModifyPartitionInfo info = ModifyPartitionInfo.read(in); - ret.setData(info); - break; - } - case OperationType.OP_BATCH_MODIFY_PARTITION: { - BatchModifyPartitionsInfo info = BatchModifyPartitionsInfo.read(in); - ret.setData(info); - break; - } - case OperationType.OP_ERASE_DB: - case OperationType.OP_ERASE_TABLE: - case OperationType.OP_ERASE_PARTITION: { - Text text = new Text(); - text.readFields(in); - ret.setData(text); - break; - } - case OperationType.OP_RECOVER_DB: - case OperationType.OP_RECOVER_TABLE: - case OperationType.OP_RECOVER_PARTITION: { - RecoverInfo recoverInfo = new RecoverInfo(); - recoverInfo.readFields(in); - ret.setData(recoverInfo); - break; - } - case OperationType.OP_CLEAR_ROLLUP_INFO: { - ReplicaPersistInfo info = ReplicaPersistInfo.read(in); - ret.setData(info); - break; - } - case OperationType.OP_DROP_ROLLUP: { - DropInfo info = DropInfo.read(in); - ret.setData(info); - break; - } - case OperationType.OP_BATCH_DROP_ROLLUP: { - BatchDropInfo batchDropInfo = BatchDropInfo.read(in); - ret.setData(batchDropInfo); - break; - } - case OperationType.OP_RENAME_TABLE: - case OperationType.OP_RENAME_ROLLUP: - case OperationType.OP_RENAME_PARTITION: { - TableInfo info = TableInfo.read(in); - ret.setData(info); - break; - } - case OperationType.OP_FINISH_CONSISTENCY_CHECK: { - ConsistencyCheckInfo info = ConsistencyCheckInfo.read(in); - ret.setData(info); - break; - } - case OperationType.OP_LOAD_START: - case OperationType.OP_LOAD_ETL: - case OperationType.OP_LOAD_LOADING: - case OperationType.OP_LOAD_QUORUM: - case OperationType.OP_LOAD_DONE: - case OperationType.OP_LOAD_CANCEL: { - LoadJob job = new LoadJob(); - job.readFields(in); - ret.setData(job); - break; - } - case OperationType.OP_FINISH_DELETE: { - DeleteInfo info = DeleteInfo.read(in); - ret.setData(info); - break; - } - case OperationType.OP_ADD_REPLICA: - case OperationType.OP_DELETE_REPLICA: { - ReplicaPersistInfo info = ReplicaPersistInfo.read(in); - ret.setData(info); - break; - } - case OperationType.OP_ADD_BACKEND: - case OperationType.OP_DROP_BACKEND: - case OperationType.OP_BACKEND_STATE_CHANGE: { - Backend be = Backend.read(in); - ret.setData(be); - break; - } - case OperationType.OP_ADD_FRONTEND: - case OperationType.OP_ADD_FIRST_FRONTEND: - case OperationType.OP_REMOVE_FRONTEND: { - Frontend fe = Frontend.read(in); - ret.setData(fe); - break; - } - case OperationType.OP_SET_LOAD_ERROR_HUB: { - LoadErrorHub.Param param = new LoadErrorHub.Param(); - param.readFields(in); - ret.setData(param); - break; - } - case OperationType.OP_MASTER_INFO_CHANGE: { - MasterInfo info = new MasterInfo(); - info.readFields(in); - ret.setData(info); - break; - } - case OperationType.OP_TIMESTAMP: { - Timestamp stamp = new Timestamp(); - stamp.readFields(in); - ret.setData(stamp); - break; - } - case OperationType.OP_META_VERSION: { - Text text = new Text(); - text.readFields(in); - ret.setData(text); + @Override + public JournalEntity next() { + if (currentKey > toKey) { + return null; + } + + JournalEntity ret = null; + try { + while (true) { + try { + ret = getJournalEntity(currentStream); + if (ret.getOpCode() == OperationType.OP_LOCAL_EOF) { + if (nextFilePositionIndex < editFileSequenceNumbers.size()) { + currentStream.close(); + currentStream = new DataInputStream(new BufferedInputStream(new EditLogFileInputStream( + new File(imageDir, + "edits." + editFileSequenceNumbers.get(nextFilePositionIndex))))); + nextFilePositionIndex++; + continue; + } else { + return null; + } + } + } catch (EOFException e) { + if (nextFilePositionIndex < editFileSequenceNumbers.size()) { + currentStream.close(); + currentStream = new DataInputStream(new BufferedInputStream(new EditLogFileInputStream( + new File(imageDir, "edits." + editFileSequenceNumbers.get(nextFilePositionIndex))))); + nextFilePositionIndex++; + continue; + } else { + return null; + } + } break; } - default: { - throw new IOException("Never seen opcode " + opCode); + currentKey++; + return ret; + } catch (IOException e) { + LOG.error("something wrong. {}", e); + try { + currentStream.close(); + } catch (IOException e1) { + LOG.error(e1); } + LOG.error(e); } return ret; } + @Deprecated + private JournalEntity getJournalEntity(DataInputStream in) throws IOException { + JournalEntity ret = new JournalEntity(); + ret.readFields(in); + return ret; + } + @Override public void close() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/SystemMetrics.java b/fe/fe-core/src/main/java/org/apache/doris/metric/SystemMetrics.java index 7e138af036..25a865065c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/SystemMetrics.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/SystemMetrics.java @@ -105,7 +105,7 @@ public class SystemMetrics { tcpOutSegs = Long.valueOf(parts[headerMap.get("OutSegs")]); } catch (Exception e) { - LOG.warn("failed to get /proc/net/snmp", e); + LOG.warn("failed to get /proc/net/snmp: ", e.getMessage()); } } @@ -141,7 +141,7 @@ public class SystemMetrics { buffers = memInfoMap.getOrDefault("Buffers", -1L); cached = memInfoMap.getOrDefault("Cached", -1L); } catch (Exception e) { - LOG.warn("failed to get /proc/meminfo", e); + LOG.warn("failed to get /proc/meminfo: ", e.getMessage()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java index df7f1d856c..9c8bd622da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java @@ -76,7 +76,7 @@ public class MysqlChannel { // avoid calling getHostName() which may trigger a name service reverse lookup remoteHostPortString = address.getHostString() + ":" + address.getPort(); remoteIp = address.getAddress().getHostAddress(); - } else { + } else if (channel.getRemoteAddress() != null) { // Reach here, what's it? remoteHostPortString = channel.getRemoteAddress().toString(); remoteIp = channel.getRemoteAddress().toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 594d8c634e..02a8059a6e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -48,6 +48,7 @@ import org.apache.doris.journal.JournalCursor; import org.apache.doris.journal.JournalEntity; import org.apache.doris.journal.bdbje.BDBJEJournal; import org.apache.doris.journal.bdbje.Timestamp; +import org.apache.doris.journal.local.LocalJournal; import org.apache.doris.load.DeleteHandler; import org.apache.doris.load.DeleteInfo; import org.apache.doris.load.ExportJob; @@ -95,7 +96,14 @@ public class EditLog { private Journal journal; public EditLog(String nodeName) { - journal = new BDBJEJournal(nodeName); + String journalType = Config.edit_log_type; + if (journalType.equalsIgnoreCase("bdb")) { + journal = new BDBJEJournal(nodeName); + } else if (journalType.equalsIgnoreCase("local")) { + journal = new LocalJournal(Catalog.getCurrentCatalog().getImageDir()); + } else { + throw new IllegalArgumentException("Unknown edit log type: " + journalType); + } } public long getMaxJournalId() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java index 802eb2c648..3d8b360704 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java @@ -61,7 +61,7 @@ public class EditLogFileOutputStream extends EditLogOutputStream { } public void write(short op, Writable writable) throws IOException { - write(op); + bufCurrent.writeShort(op); writable.write(bufCurrent); } @@ -99,7 +99,7 @@ public class EditLogFileOutputStream extends EditLogOutputStream { */ public void setReadyToFlush() throws IOException { assert bufReady.size() == 0 : "previous data is not flushed yet"; - write(OperationType.OP_INVALID); // insert end-of-file marker + write(OperationType.OP_LOCAL_EOF); // insert end-of-file marker DataOutputBuffer tmp = bufReady; bufReady = bufCurrent; bufCurrent = tmp; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 14871828de..168d88ec96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -21,7 +21,8 @@ import java.lang.reflect.Field; import java.lang.reflect.Modifier; public class OperationType { - public static final short OP_INVALID = -1; + // OP_LOCAL_EOF is only for local edit log, to indicate the end of a edit log run. + public static final short OP_LOCAL_EOF = -1; public static final short OP_SAVE_NEXTID = 0; public static final short OP_CREATE_DB = 1; public static final short OP_DROP_DB = 2; diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java index 1dd498beb2..64e34d17dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Frontend.java @@ -18,6 +18,7 @@ package org.apache.doris.system; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.Config; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.ha.BDBHA; @@ -106,7 +107,7 @@ public class Frontend implements Writable { public boolean handleHbResponse(FrontendHbResponse hbResponse, boolean isReplay) { boolean isChanged = false; if (hbResponse.getStatus() == HbStatus.OK) { - if (!isAlive && !isReplay) { + if (!isAlive && !isReplay && Config.edit_log_type.equalsIgnoreCase("bdb")) { BDBHA bdbha = (BDBHA) Catalog.getCurrentCatalog().getHaProtocol(); bdbha.removeUnReadyElectableNode(nodeName, Catalog.getCurrentCatalog().getFollowerCount()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java index 37346b2b15..207d926474 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedFrontend.java @@ -18,6 +18,7 @@ package org.apache.doris.utframe; import org.apache.doris.PaloFe; +import org.apache.doris.PaloFe.StartupOptions; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.util.PrintableMap; @@ -57,7 +58,7 @@ import java.util.Map; * There will be 3 directories under running dir/: * running dir/conf/ * running dir/log/ - * running dir/palo-meta/ + * running dir/doris-meta/ * * All these 3 directories will be cleared first. * @@ -66,7 +67,7 @@ public class MockedFrontend { public static final String FE_PROCESS = "fe"; // the running dir of this mocked frontend. - // log/ palo-meta/ and conf/ dirs will be created under this dir. + // log/ doris-meta/ and conf/ dirs will be created under this dir. private String runningDir; // the min set of fe.conf. private static final Map<String, String> MIN_FE_CONF; @@ -100,7 +101,7 @@ public class MockedFrontend { // init the fe process. This must be called before starting the frontend process. // 1. check if all necessary environment variables are set. - // 2. clear and create 3 dirs: runningDir/log/, runningDir/palo-meta/, runningDir/conf/ + // 2. clear and create 3 dirs: runningDir/log/, runningDir/doris-meta/, runningDir/conf/ // 3. init fe.conf // The content of "fe.conf" is a merge set of input `feConf` and MIN_FE_CONF public void init(String runningDir, Map<String, String> feConf) throws EnvVarNotSetException, IOException { @@ -121,7 +122,7 @@ public class MockedFrontend { // clear and create log dir createAndClearDir(runningDir + "/log/"); // clear and create meta dir - createAndClearDir(runningDir + "/palo-meta/"); + createAndClearDir(runningDir + "/doris-meta/"); // clear and create conf dir createAndClearDir(runningDir + "/conf/"); // init fe.conf @@ -134,7 +135,7 @@ public class MockedFrontend { finalFeConf = Maps.newHashMap(MIN_FE_CONF); // these 2 configs depends on running dir, so set them here. finalFeConf.put("LOG_DIR", this.runningDir + "/log"); - finalFeConf.put("meta_dir", this.runningDir + "/palo-meta"); + finalFeConf.put("meta_dir", this.runningDir + "/doris-meta"); finalFeConf.put("sys_log_dir", this.runningDir + "/log"); finalFeConf.put("audit_log_dir", this.runningDir + "/log"); finalFeConf.put("tmp_dir", this.runningDir + "/temp_dir"); @@ -183,7 +184,12 @@ public class MockedFrontend { @Override public void run() { - PaloFe.start(frontend.getRunningDir(), frontend.getRunningDir(), args); + StartupOptions options = new StartupOptions(); + // For FE unit tests, we don't need these 2 servers. + // And it also cost time to start up. + options.enableHttpServer = false; + options.enableQeService = false; + PaloFe.start(frontend.getRunningDir(), frontend.getRunningDir(), args, options); } } @@ -195,8 +201,7 @@ public class MockedFrontend { Thread feThread = new Thread(new FERunnable(this, args), FE_PROCESS); feThread.start(); // wait the catalog to be ready until timeout (30 seconds) - waitForCatalogReady(120 * 1000); - System.out.println("Fe process is started"); + waitForCatalogReady(30 * 1000); } private void waitForCatalogReady(long timeoutMs) throws FeStartException { @@ -204,11 +209,11 @@ public class MockedFrontend { while (!Catalog.getCurrentCatalog().isReady() && left > 0) { System.out.println("catalog is not ready"); try { - Thread.sleep(5000); + Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } - left -= 5000; + left -= 100; } if (left <= 0 && !Catalog.getCurrentCatalog().isReady()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 87498ed605..1abad1631f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -58,6 +58,7 @@ import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException; import org.apache.doris.utframe.MockedFrontend.FeStartException; import org.apache.doris.utframe.MockedFrontend.NotInitException; +import com.clearspring.analytics.util.Lists; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -204,6 +205,7 @@ public abstract class TestWithFeService { } Config.plugin_dir = dorisHome + "/plugins"; Config.custom_config_dir = dorisHome + "/conf"; + Config.edit_log_type = "local"; File file = new File(Config.custom_config_dir); if (!file.exists()) { file.mkdir(); @@ -237,10 +239,29 @@ public abstract class TestWithFeService { throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException, InterruptedException { int feRpcPort = startFEServer(runningDir); + List<Backend> bes = Lists.newArrayList(); + System.out.println("start create backend"); for (int i = 0; i < backendNum; i++) { - createBackend("127.0.0.1", feRpcPort); - // sleep to wait first heartbeat - Thread.sleep(6000); + bes.add(createBackend("127.0.0.1", feRpcPort)); + } + System.out.println("after create backend"); + checkBEHeartbeat(bes); + // Thread.sleep(2000); + System.out.println("after create backend2"); + } + + private void checkBEHeartbeat(List<Backend> bes) throws InterruptedException { + int maxTry = 10; + boolean allAlive = false; + while (maxTry-- > 0 && !allAlive) { + Thread.sleep(1000); + boolean hasDead = false; + for (Backend be : bes) { + if (!be.isAlive()) { + hasDead = true; + } + } + allAlive = !hasDead; } } @@ -253,31 +274,30 @@ public abstract class TestWithFeService { // to make cluster running well. FeConstants.runningUnitTest = true; int feRpcPort = startFEServer(runningDir); + List<Backend> bes = Lists.newArrayList(); for (int i = 0; i < backendNum; i++) { String host = "127.0.0." + (i + 1); - createBackend(host, feRpcPort); + bes.add(createBackend(host, feRpcPort)); } - // sleep to wait first heartbeat - Thread.sleep(6000); + checkBEHeartbeat(bes); } - protected void createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException { + protected Backend createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException { int beHeartbeatPort = findValidPort(); int beThriftPort = findValidPort(); int beBrpcPort = findValidPort(); int beHttpPort = findValidPort(); // start be - MockedBackend backend = - MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort, beHttpPort, - new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort), - new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); + MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort, + beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort), + new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort)); backend.start(); // add be - Backend be = - new Backend(Catalog.getCurrentCatalog().getNextId(), backend.getHost(), backend.getHeartbeatPort()); + Backend be = new Backend(Catalog.getCurrentCatalog().getNextId(), backend.getHost(), + backend.getHeartbeatPort()); DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId()); diskInfo1.setTotalCapacityB(1000000); diskInfo1.setAvailableCapacityB(500000); @@ -285,12 +305,13 @@ public abstract class TestWithFeService { Map<String, DiskInfo> disks = Maps.newHashMap(); disks.put(diskInfo1.getRootPath(), diskInfo1); be.setDisks(ImmutableMap.copyOf(disks)); - be.setAlive(true); + be.setAlive(false); be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER); be.setBePort(beThriftPort); be.setHttpPort(beHttpPort); be.setBrpcPort(beBrpcPort); Catalog.getCurrentSystemInfo().addBackend(be); + return be; } protected void cleanDorisFeDir(String baseDir) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index 4e0a49016f..41f7a9196b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -49,6 +49,7 @@ import org.apache.doris.utframe.MockedFrontend.NotInitException; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FileUtils; @@ -155,6 +156,7 @@ public class UtFrameUtils { } Config.plugin_dir = dorisHome + "/plugins"; Config.custom_config_dir = dorisHome + "/conf"; + Config.edit_log_type = "local"; File file = new File(Config.custom_config_dir); if (!file.exists()) { file.mkdir(); @@ -187,18 +189,36 @@ public class UtFrameUtils { public static void createDorisCluster(String runningDir, int backendNum) throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException, InterruptedException { int feRpcPort = startFEServer(runningDir); + List<Backend> bes = Lists.newArrayList(); for (int i = 0; i < backendNum; i++) { - createBackend("127.0.0.1", feRpcPort); - // sleep to wait first heartbeat - Thread.sleep(6000); + bes.add(createBackend("127.0.0.1", feRpcPort)); + } + System.out.println("after create backend"); + checkBEHeartbeat(bes); + // Thread.sleep(2000); + System.out.println("after create backend2"); + } + + private static void checkBEHeartbeat(List<Backend> bes) throws InterruptedException { + int maxTry = 10; + boolean allAlive = false; + while (maxTry-- > 0 && !allAlive) { + Thread.sleep(1000); + boolean hasDead = false; + for (Backend be : bes) { + if (!be.isAlive()) { + hasDead = true; + } + } + allAlive = !hasDead; } } // Create multi backends with different host for unit test. // the host of BE will be "127.0.0.1", "127.0.0.2" public static void createDorisClusterWithMultiTag(String runningDir, int backendNum) - throws EnvVarNotSetException, IOException, FeStartException, - NotInitException, DdlException, InterruptedException { + throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException, + InterruptedException { // set runningUnitTest to true, so that for ut, // the agent task will be sent to "127.0.0.1" to make cluster running well. FeConstants.runningUnitTest = true; @@ -211,16 +231,15 @@ public class UtFrameUtils { Thread.sleep(6000); } - public static void createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException { + public static Backend createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException { int beHeartbeatPort = findValidPort(); int beThriftPort = findValidPort(); int beBrpcPort = findValidPort(); int beHttpPort = findValidPort(); // start be - MockedBackend backend = MockedBackendFactory.createBackend(beHost, - beHeartbeatPort, beThriftPort, beBrpcPort, beHttpPort, - new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort), + MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort, + beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort), new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort)); backend.start(); @@ -241,6 +260,7 @@ public class UtFrameUtils { be.setHttpPort(beHttpPort); be.setBrpcPort(beBrpcPort); Catalog.getCurrentSystemInfo().addBackend(be); + return be; } public static void cleanDorisFeDir(String baseDir) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org