morningman closed pull request #436: Redesign the access to meta version
URL: https://github.com/apache/incubator-doris/pull/436
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/help/Contents/Data Definition/ddl_stmt.md 
b/docs/help/Contents/Data Definition/ddl_stmt.md
index 12a1968d..72603dae 100644
--- a/docs/help/Contents/Data Definition/ddl_stmt.md    
+++ b/docs/help/Contents/Data Definition/ddl_stmt.md    
@@ -860,6 +860,7 @@
                 "backup_timestamp" = 
"2018-05-04-16-45-08":指定了恢复对应备份的哪个时间版本,必填。该信息可以通过 `SHOW SNAPSHOT ON repo;` 语句获得。
                 "replication_num" = 
"3":指定恢复的表或分区的副本数。默认为3。若恢复已存在的表或分区,则副本数必须和已存在表或分区的副本数相同。同时,必须有足够的 host 容纳多个副本。
                 "timeout" = "3600":任务超时时间,默认为一天。单位秒。
+                "meta_version" = 40:使用指定的 meta_version 
来读取之前备份的元数据。注意,该参数作为临时方案,仅用于恢复老版本 Doris 备份的数据。最新版本的备份数据中已经包含 meta version,无需再指定。
 
 ## example
     1. 从 example_repo 中恢复备份 snapshot_1 中的表 backup_tbl 到数据库 example_db1,时间版本为 
"2018-05-04-16-45-08"。恢复为 1 个副本:
diff --git a/fe/src/main/java/org/apache/doris/analysis/RestoreStmt.java 
b/fe/src/main/java/org/apache/doris/analysis/RestoreStmt.java
index b282839d..f77562e8 100644
--- a/fe/src/main/java/org/apache/doris/analysis/RestoreStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/RestoreStmt.java
@@ -36,10 +36,12 @@
     private final static String PROP_ALLOW_LOAD = "allow_load";
     private final static String PROP_REPLICATION_NUM = "replication_num";
     private final static String PROP_BACKUP_TIMESTAMP = "backup_timestamp";
+    private final static String PROP_META_VERSION = "meta_version";
 
     private boolean allowLoad = false;
     private int replicationNum = FeConstants.default_replication_num;
     private String backupTimestamp = null;
+    private int metaVersion = -1;
 
     public RestoreStmt(LabelName labelName, String repoName, List<TableRef> 
tblRefs, Map<String, String> properties) {
         super(labelName, repoName, tblRefs, properties);
@@ -57,6 +59,10 @@ public String getBackupTimestamp() {
         return backupTimestamp;
     }
 
+    public int getMetaVersion() {
+        return metaVersion;
+    }
+
     @Override
     public void analyze(Analyzer analyzer) throws UserException {
         super.analyze(analyzer);
@@ -114,6 +120,18 @@ public void analyzeProperties() throws AnalysisException {
                                                 "Missing " + 
PROP_BACKUP_TIMESTAMP + " property");
         }
 
+        // meta version
+        if (copiedProperties.containsKey(PROP_META_VERSION)) {
+            try {
+                metaVersion = 
Integer.valueOf(copiedProperties.get(PROP_META_VERSION));
+            } catch (NumberFormatException e) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
+                        "Invalid meta version format: "
+                                + copiedProperties.get(PROP_META_VERSION));
+            }
+            copiedProperties.remove(PROP_META_VERSION);
+        }
+
         if (!copiedProperties.isEmpty()) {
             ErrorReport.reportAnalysisException(ErrorCode.ERR_COMMON_ERROR,
                                                 "Unknown restore job 
properties: " + copiedProperties.keySet());
diff --git a/fe/src/main/java/org/apache/doris/backup/BackupHandler.java 
b/fe/src/main/java/org/apache/doris/backup/BackupHandler.java
index 7ad43119..9ea24c6f 100644
--- a/fe/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -326,7 +326,7 @@ private void backup(Repository repository, Database db, 
BackupStmt stmt) throws
                 // as base snapshot.
                 // But first we need to check if the existing snapshot has 
same meta.
                 List<BackupMeta> backupMetas = Lists.newArrayList();
-                st = repository.getSnapshotMetaFile(stmt.getLabel(), 
backupMetas);
+                st = repository.getSnapshotMetaFile(stmt.getLabel(), 
backupMetas, -1);
                 if (!st.ok()) {
                     ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
                                                    "Failed to get existing 
meta info for repository: "
@@ -373,7 +373,7 @@ private void restore(Repository repository, Database db, 
RestoreStmt stmt) throw
         // Create a restore job
         RestoreJob restoreJob = new RestoreJob(stmt.getLabel(), 
stmt.getBackupTimestamp(),
                 db.getId(), db.getFullName(), jobInfo, stmt.allowLoad(), 
stmt.getReplicationNum(),
-                stmt.getTimeoutMs(), catalog, repository.getId());
+                stmt.getTimeoutMs(), stmt.getMetaVersion(), catalog, 
repository.getId());
         dbIdToBackupOrRestoreJob.put(db.getId(), restoreJob);
 
         catalog.getEditLog().logRestoreJob(restoreJob);
diff --git a/fe/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/src/main/java/org/apache/doris/backup/BackupJob.java
index 0d3707b8..49a94944 100644
--- a/fe/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -470,7 +470,7 @@ private void uploadSnapshot() {
             int batchNum = Math.min(totalNum, 3);
             // each task contains several upload sub tasks
             int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
-            LOG.debug("backend {} has {} batch, total {} tasks, {}", beId, 
batchNum, totalNum, this);
+            LOG.info("backend {} has {} batch, total {} tasks, {}", beId, 
batchNum, totalNum, this);
 
             List<FsBroker> brokers = Lists.newArrayList();
             Status st = repo.getBrokerAddress(beId, catalog, brokers);
@@ -628,7 +628,7 @@ private boolean validateLocalFile(String filePath) {
      * Choose a replica order by replica id.
      * This is to expect to choose the same replica at each backup job.
      */
-    private Replica chooseReplica(Tablet tablet, long committedVersion, long 
committedVersionHash) {
+    private Replica chooseReplica(Tablet tablet, long visibleVersion, long 
visibleVersionHash) {
         List<Long> replicaIds = Lists.newArrayList();
         for (Replica replica : tablet.getReplicas()) {
             replicaIds.add(replica.getId());
@@ -637,8 +637,8 @@ private Replica chooseReplica(Tablet tablet, long 
committedVersion, long committ
         Collections.sort(replicaIds);
         for (Long replicaId : replicaIds) {
             Replica replica = tablet.getReplicaById(replicaId);
-            if (replica.getVersion() > committedVersion 
-                    || (replica.getVersion() == committedVersion && 
replica.getVersionHash()==committedVersionHash)) {
+            if (replica.getLastFailedVersion() <= 0 && (replica.getVersion() > 
visibleVersion
+                    || (replica.getVersion() == visibleVersion && 
replica.getVersionHash() == visibleVersionHash))) {
                 return replica;
             }
         }
diff --git a/fe/src/main/java/org/apache/doris/backup/BackupJobInfo.java 
b/fe/src/main/java/org/apache/doris/backup/BackupJobInfo.java
index c0038f0d..16e76a15 100644
--- a/fe/src/main/java/org/apache/doris/backup/BackupJobInfo.java
+++ b/fe/src/main/java/org/apache/doris/backup/BackupJobInfo.java
@@ -23,6 +23,7 @@
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 
@@ -34,6 +35,7 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.json.JSONArray;
+import org.json.JSONException;
 import org.json.JSONObject;
 
 import java.io.DataInput;
@@ -67,6 +69,8 @@
     public Map<String, BackupTableInfo> tables = Maps.newHashMap();
     public boolean success;
 
+    public int metaVersion;
+
     // This map is used to save the table alias mapping info when processing a 
restore job.
     // origin -> alias
     public Map<String, String> tblAlias = Maps.newHashMap();
@@ -229,6 +233,7 @@ public static BackupJobInfo fromCatalog(long backupTime, 
String label, String db
         jobInfo.dbName = dbName;
         jobInfo.dbId = dbId;
         jobInfo.success = true;
+        jobInfo.metaVersion = FeConstants.meta_version;
 
         // tbls
         for (Table tbl : tbls) {
@@ -282,6 +287,7 @@ private static void genFromJson(String json, BackupJobInfo 
jobInfo) {
          *   "database": "db1"
          *   "id": 10000
          *   "backup_result": "succeed",
+         *   "meta_version" : 40 // this is optional
          *   "backup_objects": {
          *       "table1": {
          *           "partitions": {
@@ -321,6 +327,14 @@ private static void genFromJson(String json, BackupJobInfo 
jobInfo) {
         jobInfo.dbName = (String) root.get("database");
         jobInfo.dbId = root.getLong("id");
         jobInfo.backupTime = root.getLong("backup_time");
+        
+        try {
+            jobInfo.metaVersion = root.getInt("meta_version");
+        } catch (JSONException e) {
+            // meta_version does not exist
+            jobInfo.metaVersion = FeConstants.meta_version;
+        }
+        
         JSONObject backupObjs = root.getJSONObject("backup_objects");
         String[] tblNames = JSONObject.getNames(backupObjs);
         for (String tblName : tblNames) {
@@ -409,6 +423,7 @@ public JSONObject toJson() {
         root.put("backup_time", backupTime);
         JSONObject backupObj = new JSONObject();
         root.put("backup_objects", backupObj);
+        root.put("meta_version", FeConstants.meta_version);
         
         for (BackupTableInfo tblInfo : tables.values()) {
             JSONObject tbl = new JSONObject();
diff --git a/fe/src/main/java/org/apache/doris/backup/BackupMeta.java 
b/fe/src/main/java/org/apache/doris/backup/BackupMeta.java
index 25c643b1..55110369 100644
--- a/fe/src/main/java/org/apache/doris/backup/BackupMeta.java
+++ b/fe/src/main/java/org/apache/doris/backup/BackupMeta.java
@@ -19,6 +19,7 @@
 
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.meta.MetaContext;
 
 import com.google.common.collect.Maps;
 
@@ -67,11 +68,16 @@ public Table getTable(Long tblId) {
         return tblIdMap.get(tblId);
     }
 
-    public static BackupMeta fromFile(String filePath) throws IOException {
+    public static BackupMeta fromFile(String filePath, int metaVersion) throws 
IOException {
         File file = new File(filePath);
+        MetaContext metaContext = new MetaContext();
+        metaContext.setMetaVersion(metaVersion);
+        metaContext.setThreadLocalInfo();
         try (DataInputStream dis = new DataInputStream(new 
FileInputStream(file))) {
             BackupMeta backupMeta = BackupMeta.read(dis);
             return backupMeta;
+        } finally {
+            MetaContext.remove();
         }
     }
 
diff --git a/fe/src/main/java/org/apache/doris/backup/Repository.java 
b/fe/src/main/java/org/apache/doris/backup/Repository.java
index 0b46470b..e0d9199c 100644
--- a/fe/src/main/java/org/apache/doris/backup/Repository.java
+++ b/fe/src/main/java/org/apache/doris/backup/Repository.java
@@ -331,7 +331,7 @@ public Status getSnapshotInfoFile(String label, String 
backupTimestamp, List<Bac
         return Status.OK;
     }
 
-    public Status getSnapshotMetaFile(String label, List<BackupMeta> 
backupMetas) {
+    public Status getSnapshotMetaFile(String label, List<BackupMeta> 
backupMetas, int metaVersion) {
         String remoteMetaFilePath = assembleMetaInfoFilePath(label);
         File localMetaFile = new File(BackupHandler.BACKUP_ROOT_DIR + 
PATH_DELIMITER
                 + "meta_" + System.currentTimeMillis());
@@ -343,7 +343,7 @@ public Status getSnapshotMetaFile(String label, 
List<BackupMeta> backupMetas) {
             }
 
             // read file to backupMeta
-            BackupMeta backupMeta = 
BackupMeta.fromFile(localMetaFile.getAbsolutePath());
+            BackupMeta backupMeta = 
BackupMeta.fromFile(localMetaFile.getAbsolutePath(), metaVersion);
             backupMetas.add(backupMeta);
         } catch (IOException e) {
             return new Status(ErrCode.COMMON_ERROR, "Failed create backup meta 
from file: "
diff --git a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java
index 64227675..5c2bf84b 100644
--- a/fe/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -131,12 +131,21 @@
 
     private Map<Long, Long> unfinishedSignatureToId = Maps.newConcurrentMap();
 
+    // the meta version is used when reading backup meta from file.
+    // we do not persist this field, because this is just a temporary solution.
+    // the true meta version should be get from backup job info, which is 
saved when doing backup job.
+    // But the earlier version of Doris do not save the meta version in backup 
job info, so we allow user to
+    // set this 'metaVersion' in restore stmt.
+    // NOTICE: because we do not persist it, this info may be lost if Frontend 
restart,
+    // and if you don't want to losing it, backup your data again by using 
latest Doris version.
+    private int metaVersion = -1;
+
     public RestoreJob() {
         super(JobType.RESTORE);
     }
 
     public RestoreJob(String label, String backupTs, long dbId, String dbName, 
BackupJobInfo jobInfo,
-            boolean allowLoad, int restoreReplicationNum, long timeoutMs,
+            boolean allowLoad, int restoreReplicationNum, long timeoutMs, int 
metaVersion,
             Catalog catalog, long repoId) {
         super(JobType.RESTORE, label, dbId, dbName, timeoutMs, catalog, 
repoId);
         this.backupTimestamp = backupTs;
@@ -144,6 +153,7 @@ public RestoreJob(String label, String backupTs, long dbId, 
String dbName, Backu
         this.allowLoad = allowLoad;
         this.restoreReplicationNum = restoreReplicationNum;
         this.state = RestoreJobState.PENDING;
+        this.metaVersion = metaVersion;
     }
 
     public RestoreJobState getState() {
@@ -153,6 +163,10 @@ public RestoreJobState getState() {
     public RestoreFileMapping getFileMapping() {
         return fileMapping;
     }
+    
+    public int getMetaVersion() {
+        return metaVersion;
+    }
 
     public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, 
TFinishTaskRequest request) {
         Preconditions.checkState(task.getJobId() == jobId);
@@ -822,7 +836,8 @@ private void genFileMapping(OlapTable localTbl, Partition 
localPartition, Long r
 
     private boolean downloadAndDeserializeMetaInfo() {
         List<BackupMeta> backupMetas = Lists.newArrayList();
-        Status st = repo.getSnapshotMetaFile(jobInfo.name, backupMetas);
+        Status st = repo.getSnapshotMetaFile(jobInfo.name, backupMetas,
+                this.metaVersion == -1 ? jobInfo.metaVersion : 
this.metaVersion);
         if (!st.ok()) {
             status = st;
             return false;
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index bc1975f6..1c7efab6 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -127,6 +127,7 @@
 import org.apache.doris.load.routineload.RoutineLoadManager;
 import org.apache.doris.master.Checkpoint;
 import org.apache.doris.master.MetaHelper;
+import org.apache.doris.meta.MetaContext;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.PaloAuth;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -228,10 +229,9 @@
     public static final String BDB_DIR = Config.meta_dir + "/bdb";
     public static final String IMAGE_DIR = Config.meta_dir + "/image";
 
-    // Image file meta data version. Use this version to load image file
-    private int imageVersion = 0;
     // Current journal meta data version. Use this version to load journals
-    private int journalVersion = 0;
+    // private int journalVersion = 0;
+    private MetaContext metaContext;
     private long epoch = 0;
 
     // Lock to perform atomic modification on map like 'idToDb' and 
'fullNameToDb'.
@@ -444,6 +444,9 @@ private Catalog() {
         this.domainResolver = new DomainResolver(auth);
 
         this.esStateStore = new EsStateStore();
+
+        this.metaContext = new MetaContext();
+        this.metaContext.setThreadLocalInfo();
     }
 
     public static void destroyCheckpoint() {
@@ -518,7 +521,7 @@ public static CatalogRecycleBin getCurrentRecycleBin() {
 
     // use this to get correct Catalog's journal version
     public static int getCurrentCatalogJournalVersion() {
-        return getCurrentCatalog().getJournalVersion();
+        return MetaContext.get().getMetaVersion();
     }
 
     public static final boolean isCheckpointThread() {
@@ -608,6 +611,7 @@ public void initialize(String[] args) throws Exception {
 
         // 6. start state listener thread
         createStateListener();
+        listener.setMetaContext(metaContext);
         listener.setName("stateListener");
         listener.setInterval(STATE_CHANGE_CHECK_INTERVAL_MS);
         listener.start();
@@ -978,9 +982,10 @@ private void transferToMaster() throws IOException {
         editLog.rollEditLog();
 
         // Log meta_version
+        long journalVersion = MetaContext.get().getMetaVersion();
         if (journalVersion < FeConstants.meta_version) {
             editLog.logMetaVersion(FeConstants.meta_version);
-            this.setJournalVersion(FeConstants.meta_version);
+            MetaContext.get().setMetaVersion(FeConstants.meta_version);
         }
 
         // Log the first frontend
@@ -1010,6 +1015,7 @@ private void transferToMaster() throws IOException {
 
         // start checkpoint thread
         checkpointer = new Checkpoint(editLog);
+        checkpointer.setMetaContext(metaContext);
         checkpointer.setName("leaderCheckpointer");
         checkpointer.setInterval(FeConstants.checkpoint_interval_second * 
1000L);
 
@@ -1107,6 +1113,7 @@ private void transferToNonMaster() {
 
         if (replayer == null) {
             createReplayer();
+            replayer.setMetaContext(metaContext);
             replayer.setName("replayer");
             replayer.setInterval(REPLAY_INTERVAL_MS);
             replayer.start();
@@ -1311,16 +1318,18 @@ private void recreateTabletInvertIndex() {
     }
 
     public long loadHeader(DataInputStream dis, long checksum) throws 
IOException {
-        imageVersion = dis.readInt();
-        long newChecksum = checksum ^ imageVersion;
-        journalVersion = imageVersion;
+        int journalVersion = dis.readInt();
+        long newChecksum = checksum ^ journalVersion;
+        MetaContext.get().setMetaVersion(journalVersion);
+
         long replayedJournalId = dis.readLong();
         newChecksum ^= replayedJournalId;
-        long id = dis.readLong();
-        newChecksum ^= id;
-        idGenerator.setId(id);
 
-        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_32) {
+        long catalogId = dis.readLong();
+        newChecksum ^= catalogId;
+        idGenerator.setId(catalogId);
+
+        if (journalVersion >= FeMetaVersion.VERSION_32) {
             isDefaultClusterCreated = dis.readBoolean();
         }
 
@@ -4564,20 +4573,6 @@ public void setHaProtocol(HAProtocol protocol) {
         this.haProtocol = protocol;
     }
 
-    public void setJournalVersion(int version) {
-        this.journalVersion = version;
-    }
-
-    // Get current journal meta data version
-    public int getJournalVersion() {
-        return this.journalVersion;
-    }
-
-    // Get current journal meta data version
-    public int getImageVersion() {
-        return this.imageVersion;
-    }
-
     public static short calcShortKeyColumnCount(List<Column> columns, 
Map<String, String> properties)
             throws DdlException {
         List<Column> indexColumns = new ArrayList<Column>();
@@ -5642,7 +5637,7 @@ public long saveBrokers(DataOutputStream dos, long 
checksum) throws IOException
     }
 
     public long loadBrokers(DataInputStream dis, long checksum) throws 
IOException, DdlException {
-        if (getJournalVersion() >= FeMetaVersion.VERSION_31) {
+        if (MetaContext.get().getMetaVersion() >= FeMetaVersion.VERSION_31) {
             int count = dis.readInt();
             checksum ^= count;
             for (long i = 0; i < count; ++i) {
diff --git a/fe/src/main/java/org/apache/doris/common/util/Daemon.java 
b/fe/src/main/java/org/apache/doris/common/util/Daemon.java
index c6b6b9f8..d1419650 100644
--- a/fe/src/main/java/org/apache/doris/common/util/Daemon.java
+++ b/fe/src/main/java/org/apache/doris/common/util/Daemon.java
@@ -17,8 +17,10 @@
 
 package org.apache.doris.common.util;
 
-import org.apache.logging.log4j.Logger;
+import org.apache.doris.meta.MetaContext;
+
 import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -30,6 +32,8 @@
     private AtomicBoolean isStop;
     private Runnable runnable;
     
+    private MetaContext metaContext = null;
+
     {
         setDaemon(true);
     }
@@ -70,6 +74,10 @@ public Runnable getRunnable() {
         return runnable;
     }
     
+    public void setMetaContext(MetaContext metaContext) {
+        this.metaContext = metaContext;
+    }
+
     public void exit() {
         isStop.set(true);
     }
@@ -91,6 +99,10 @@ protected void runOneCycle() {
 
     @Override
     public void run() {
+        if (metaContext != null) {
+            metaContext.setThreadLocalInfo();
+        }
+
         while (!isStop.get()) {
             try {
                 runOneCycle();
@@ -104,6 +116,10 @@ public void run() {
                 LOG.error("InterruptedException: ", e);
             }
         }
+
+        if (metaContext != null) {
+            MetaContext.remove();
+        }
         LOG.error("daemon thread exits. name=" + this.getName());
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/journal/bdbje/BDBTool.java 
b/fe/src/main/java/org/apache/doris/journal/bdbje/BDBTool.java
index 22360dc7..23e33ff0 100644
--- a/fe/src/main/java/org/apache/doris/journal/bdbje/BDBTool.java
+++ b/fe/src/main/java/org/apache/doris/journal/bdbje/BDBTool.java
@@ -19,6 +19,7 @@
 
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.journal.JournalEntity;
+import org.apache.doris.meta.MetaContext;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -121,7 +122,9 @@ public boolean run() {
                     }
                     
                     // meta version
-                    
Catalog.getInstance().setJournalVersion(options.getMetaVersion());
+                    MetaContext metaContext = new MetaContext();
+                    metaContext.setMetaVersion(options.getMetaVersion());
+                    metaContext.setThreadLocalInfo();
 
                     for (Long key = fromKey; key <= endKey; key++) {
                         getValueByKey(db, key);
diff --git a/fe/src/main/java/org/apache/doris/master/Checkpoint.java 
b/fe/src/main/java/org/apache/doris/master/Checkpoint.java
index 13019510..c787aa2a 100644
--- a/fe/src/main/java/org/apache/doris/master/Checkpoint.java
+++ b/fe/src/main/java/org/apache/doris/master/Checkpoint.java
@@ -67,10 +67,6 @@ public Checkpoint(EditLog editLog) throws IOException {
         this.imageDir = Catalog.IMAGE_DIR;
         this.editLog = editLog;
     }
-    
-    public Checkpoint() {
-        
-    }
 
     public static class NullOutputStream extends OutputStream {
         public void write(byte[] b, int off, int len) throws IOException {
diff --git a/fe/src/main/java/org/apache/doris/meta/MetaContext.java 
b/fe/src/main/java/org/apache/doris/meta/MetaContext.java
new file mode 100644
index 00000000..9f96bd7f
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/meta/MetaContext.java
@@ -0,0 +1,53 @@
+package org.apache.doris.meta;
+
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+
+/*
+ * MetaContext saved the current meta version.
+ * And we need to create a thread local meta context for all threads which are 
about to reading meta.
+ */
+public class MetaContext {
+
+    private int metaVersion;
+
+    private static ThreadLocal<MetaContext> threadLocalInfo = new 
ThreadLocal<MetaContext>();
+
+    public MetaContext() {
+
+    }
+
+    public void setMetaVersion(int metaVersion) {
+        this.metaVersion = metaVersion;
+    }
+
+    public int getMetaVersion() {
+        return metaVersion;
+    }
+
+    public void setThreadLocalInfo() {
+        threadLocalInfo.set(this);
+    }
+    
+    public static MetaContext get() {
+        return threadLocalInfo.get();
+    }
+
+    public static void remove() {
+        threadLocalInfo.remove();
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index d9aab0c2..1b6c14d8 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -50,6 +50,7 @@
 import org.apache.doris.load.LoadErrorHub;
 import org.apache.doris.load.LoadJob;
 import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.meta.MetaContext;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.UserProperty;
 import org.apache.doris.mysql.privilege.UserPropertyInfo;
@@ -504,13 +505,13 @@ public static void loadJournal(Catalog catalog, 
JournalEntity journal) {
                 case OperationType.OP_META_VERSION: {
                     String versionString = ((Text) 
journal.getData()).toString();
                     int version = Integer.parseInt(versionString);
-                    if (catalog.getJournalVersion() > 
FeConstants.meta_version) {
+                    if (MetaContext.get().getMetaVersion() > 
FeConstants.meta_version) {
                         LOG.error("meta data version is out of date, image: 
{}. meta: {}."
                                         + "please update 
FeConstants.meta_version and restart.",
-                                catalog.getJournalVersion(), 
FeConstants.meta_version);
+                                MetaContext.get().getMetaVersion(), 
FeConstants.meta_version);
                         System.exit(-1);
                     }
-                    catalog.setJournalVersion(version);
+                    MetaContext.get().setMetaVersion(version);
                     break;
                 }
                 case OperationType.OP_GLOBAL_VARIABLE: {
diff --git a/fe/src/test/java/org/apache/doris/alter/RollupJobTest.java 
b/fe/src/test/java/org/apache/doris/alter/RollupJobTest.java
index cb92f4b2..d03a63d1 100644
--- a/fe/src/test/java/org/apache/doris/alter/RollupJobTest.java
+++ b/fe/src/test/java/org/apache/doris/alter/RollupJobTest.java
@@ -39,6 +39,7 @@
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.meta.MetaContext;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.thrift.TTabletInfo;
@@ -90,8 +91,11 @@ public void setUp() throws InstantiationException, 
IllegalAccessException, Illeg
         fakeTransactionIDGenerator = new FakeTransactionIDGenerator();
         masterCatalog = CatalogTestUtil.createTestCatalog();
         slaveCatalog = CatalogTestUtil.createTestCatalog();
-        masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
-        slaveCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
+        MetaContext metaContext = new MetaContext();
+        metaContext.setMetaVersion(FeMetaVersion.VERSION_40);
+        metaContext.setThreadLocalInfo();
+        // masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
+        // slaveCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
         masterTransMgr = masterCatalog.getGlobalTransactionMgr();
         masterTransMgr.setEditLog(masterCatalog.getEditLog());
 
diff --git a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobTest.java 
b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobTest.java
index 55770e6b..f39caaf5 100644
--- a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobTest.java
+++ b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobTest.java
@@ -20,43 +20,34 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.doris.analysis.ColumnDef;
-import org.apache.doris.analysis.TypeDef;
-import org.apache.doris.catalog.ScalarType;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.doris.alter.AlterJob.JobState;
 import org.apache.doris.analysis.AccessTestUtil;
 import org.apache.doris.analysis.AddColumnClause;
 import org.apache.doris.analysis.AlterClause;
 import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.ColumnDef;
 import org.apache.doris.analysis.ColumnPosition;
+import org.apache.doris.analysis.TypeDef;
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.CatalogTestUtil;
-import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.FakeCatalog;
 import org.apache.doris.catalog.FakeEditLog;
 import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexState;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.OlapTable.OlapTableState;
 import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Partition.PartitionState;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.Tablet;
-import org.apache.doris.catalog.MaterializedIndex.IndexState;
-import org.apache.doris.catalog.OlapTable.OlapTableState;
-import org.apache.doris.catalog.Partition.PartitionState;
 import org.apache.doris.catalog.Replica.ReplicaState;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.Tablet;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.meta.MetaContext;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.thrift.TTabletInfo;
@@ -65,11 +56,21 @@
 import org.apache.doris.transaction.GlobalTransactionMgr;
 import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionStatus;
 import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
+import org.apache.doris.transaction.TransactionStatus;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
 public class SchemaChangeJobTest {
 
     private static FakeEditLog fakeEditLog;
@@ -94,8 +95,12 @@ public void setUp() throws InstantiationException, 
IllegalAccessException, Illeg
         fakeTransactionIDGenerator = new FakeTransactionIDGenerator();
         masterCatalog = CatalogTestUtil.createTestCatalog();
         slaveCatalog = CatalogTestUtil.createTestCatalog();
-        masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
-        slaveCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
+        MetaContext metaContext = new MetaContext();
+        metaContext.setMetaVersion(FeMetaVersion.VERSION_40);
+        metaContext.setThreadLocalInfo();
+
+        // masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
+        // slaveCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
         masterTransMgr = masterCatalog.getGlobalTransactionMgr();
         masterTransMgr.setEditLog(masterCatalog.getEditLog());
         slaveTransMgr = slaveCatalog.getGlobalTransactionMgr();
diff --git a/fe/src/test/java/org/apache/doris/backup/BackupJobTest.java 
b/fe/src/test/java/org/apache/doris/backup/BackupJobTest.java
index 9bb7dbc7..42e835e4 100644
--- a/fe/src/test/java/org/apache/doris/backup/BackupJobTest.java
+++ b/fe/src/test/java/org/apache/doris/backup/BackupJobTest.java
@@ -282,7 +282,7 @@ public void testRunNormal() {
         BackupMeta restoreMetaInfo = null;
         BackupJobInfo restoreJobInfo = null;
         try {
-            restoreMetaInfo = 
BackupMeta.fromFile(job.getLocalMetaInfoFilePath());
+            restoreMetaInfo = 
BackupMeta.fromFile(job.getLocalMetaInfoFilePath(), -1);
             Assert.assertEquals(1, restoreMetaInfo.getTables().size());
             OlapTable olapTable = (OlapTable) restoreMetaInfo.getTable(tblId);
             Assert.assertNotNull(olapTable);
diff --git a/fe/src/test/java/org/apache/doris/backup/RestoreJobTest.java 
b/fe/src/test/java/org/apache/doris/backup/RestoreJobTest.java
index 075a2bc0..304cf58e 100644
--- a/fe/src/test/java/org/apache/doris/backup/RestoreJobTest.java
+++ b/fe/src/test/java/org/apache/doris/backup/RestoreJobTest.java
@@ -177,7 +177,7 @@ public void logBackupJob(BackupJob job) {
                 minTimes = 0;
 
                 List<BackupMeta> backupMetas = Lists.newArrayList();
-                repo.getSnapshotMetaFile(label, backupMetas);
+                repo.getSnapshotMetaFile(label, backupMetas, -1);
                 result = new Delegate() {
                     public Status getSnapshotMetaFile(String label, 
List<BackupMeta> backupMetas) {
                         backupMetas.add(backupMeta);
@@ -236,7 +236,7 @@ boolean await(long timeout, TimeUnit unit) {
         db.dropTable(expectedRestoreTbl.getName());
         
         job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), 
db.getFullName(),
-                jobInfo, false, 3, 100000, catalog, repo.getId());
+                jobInfo, false, 3, 100000, -1, catalog, repo.getId());
         
         List<Table> tbls = Lists.newArrayList();
         tbls.add(expectedRestoreTbl);
diff --git a/fe/src/test/java/org/apache/doris/catalog/CatalogTest.java 
b/fe/src/test/java/org/apache/doris/catalog/CatalogTest.java
index 6ef7cd4a..1f38b1f9 100644
--- a/fe/src/test/java/org/apache/doris/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/doris/catalog/CatalogTest.java
@@ -25,6 +25,7 @@
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.load.Load;
 import org.apache.doris.load.LoadJob;
+import org.apache.doris.meta.MetaContext;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -133,7 +134,7 @@ public void testSaveLoadHeader() throws Exception {
         file.createNewFile();
         DataOutputStream dos = new DataOutputStream(new 
FileOutputStream(file));
         Catalog catalog = Catalog.getInstance();
-        catalog.setJournalVersion(FeConstants.meta_version);
+        MetaContext.get().setMetaVersion(FeConstants.meta_version);
         Field field = catalog.getClass().getDeclaredField("load");
         field.setAccessible(true);
         field.set(catalog, new Load());
@@ -161,7 +162,7 @@ public void testSaveLoadJob() throws Exception {
         DataOutputStream dos = new DataOutputStream(new 
FileOutputStream(file));
 
         Catalog catalog = Catalog.getInstance();
-        catalog.setJournalVersion(FeConstants.meta_version);
+        MetaContext.get().setMetaVersion(FeConstants.meta_version);
         Field field = catalog.getClass().getDeclaredField("load");
         field.setAccessible(true);
         field.set(catalog, new Load());
@@ -196,7 +197,7 @@ public void testSaveLoadSchemaChangeJob() throws Exception {
         file.createNewFile();
         DataOutputStream dos = new DataOutputStream(new 
FileOutputStream(file));
         Catalog catalog = Catalog.getInstance();
-        catalog.setJournalVersion(FeConstants.meta_version);
+        MetaContext.get().setMetaVersion(FeConstants.meta_version);
         Field field = catalog.getClass().getDeclaredField("load");
         field.setAccessible(true);
         field.set(catalog, new Load());
diff --git a/fe/src/test/java/org/apache/doris/catalog/FakeCatalog.java 
b/fe/src/test/java/org/apache/doris/catalog/FakeCatalog.java
index 3b42841b..37c0ed62 100644
--- a/fe/src/test/java/org/apache/doris/catalog/FakeCatalog.java
+++ b/fe/src/test/java/org/apache/doris/catalog/FakeCatalog.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.catalog;
 
-import org.apache.doris.common.FeMetaVersion;
-
 import mockit.Mock;
 import mockit.MockUp;
 
@@ -30,10 +28,10 @@ public static void setCatalog(Catalog catalog) {
         FakeCatalog.catalog = catalog;
     }
     
-    @Mock
-    public int getJournalVersion() {
-        return FeMetaVersion.VERSION_45;
-    }
+    // @Mock
+    // public int getJournalVersion() {
+    // return FeMetaVersion.VERSION_45;
+    // }
     
     @Mock
     private static Catalog getCurrentCatalog() {
diff --git a/fe/src/test/java/org/apache/doris/catalog/TabletTest.java 
b/fe/src/test/java/org/apache/doris/catalog/TabletTest.java
index 6482e229..c70ada65 100644
--- a/fe/src/test/java/org/apache/doris/catalog/TabletTest.java
+++ b/fe/src/test/java/org/apache/doris/catalog/TabletTest.java
@@ -67,8 +67,6 @@ public void makeTablet() {
         tablet.addReplica(replica1);
         tablet.addReplica(replica2);
         tablet.addReplica(replica3);
-
-
     }
 
     @Test
diff --git a/fe/src/test/java/org/apache/doris/es/EsStateStoreTest.java 
b/fe/src/test/java/org/apache/doris/es/EsStateStoreTest.java
index eb533882..d704b330 100644
--- a/fe/src/test/java/org/apache/doris/es/EsStateStoreTest.java
+++ b/fe/src/test/java/org/apache/doris/es/EsStateStoreTest.java
@@ -22,20 +22,6 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.lang.reflect.InvocationTargetException;
-import java.net.URISyntaxException;
-import java.util.Map;
-
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.CatalogTestUtil;
 import org.apache.doris.catalog.EsTable;
@@ -48,9 +34,25 @@
 import org.apache.doris.external.EsIndexState;
 import org.apache.doris.external.EsStateStore;
 import org.apache.doris.external.EsTableState;
+import org.apache.doris.meta.MetaContext;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URISyntaxException;
+import java.util.Map;
+
 public class EsStateStoreTest {
 
     private static FakeEditLog fakeEditLog;
@@ -68,7 +70,10 @@ public static void init() throws IOException, 
InstantiationException, IllegalAcc
         fakeEditLog = new FakeEditLog();
         fakeCatalog = new FakeCatalog();
         masterCatalog = CatalogTestUtil.createTestCatalog();
-        masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
+        MetaContext metaContext = new MetaContext();
+        metaContext.setMetaVersion(FeMetaVersion.VERSION_40);
+        metaContext.setThreadLocalInfo();
+        // masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
         FakeCatalog.setCatalog(masterCatalog);
         clusterStateStr1 = loadJsonFromFile("data/es/clusterstate1.json");
         clusterStateStr2 = loadJsonFromFile("data/es/clusterstate2.json");
diff --git 
a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java 
b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 481cdc50..0153f353 100644
--- 
a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -32,6 +32,7 @@
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.meta.MetaContext;
 import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
 
 import com.google.common.collect.Lists;
@@ -65,8 +66,12 @@ public void setUp() throws InstantiationException, 
IllegalAccessException, Illeg
         fakeTransactionIDGenerator = new FakeTransactionIDGenerator();
         masterCatalog = CatalogTestUtil.createTestCatalog();
         slaveCatalog = CatalogTestUtil.createTestCatalog();
-        masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
-        slaveCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
+        MetaContext metaContext = new MetaContext();
+        metaContext.setMetaVersion(FeMetaVersion.VERSION_40);
+        metaContext.setThreadLocalInfo();
+
+        // masterCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
+        // slaveCatalog.setJournalVersion(FeMetaVersion.VERSION_40);
         masterTransMgr = masterCatalog.getGlobalTransactionMgr();
         masterTransMgr.setEditLog(masterCatalog.getEditLog());
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to