This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b15894  [Bug] Fix SchemaChangeJobV2's meta persist bug (#3804)
5b15894 is described below

commit 5b1589498a1c7722739eccaf2763c01fac6705eb
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Jun 9 21:55:46 2020 +0800

    [Bug] Fix SchemaChangeJobV2's meta persist bug (#3804)
    
    1. Missing field `partitionIndexMap` in SchemaChangeJobV2
    2. Pair in field `indexSchemaVersionAndHashMap` can not be persisted by GSON
    3. Exit the FE process when replay edit log error.
    
    Fix: #3802
---
 .../org/apache/doris/alter/SchemaChangeJobV2.java  | 21 ++++----
 .../apache/doris/common/SchemaVersionAndHash.java  | 61 ++++++++++++++++++++++
 .../java/org/apache/doris/persist/EditLog.java     |  3 +-
 .../apache/doris/alter/SchemaChangeJobV2Test.java  | 14 ++++-
 4 files changed, 87 insertions(+), 12 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 08838eb..bdede94 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -35,7 +35,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.MarkedCountDownLatch;
-import org.apache.doris.common.Pair;
+import org.apache.doris.common.SchemaVersionAndHash;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.persist.gson.GsonUtils;
@@ -85,6 +85,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
     @SerializedName(value = "partitionIndexTabletMap")
     private Table<Long, Long, Map<Long, Long>> partitionIndexTabletMap = 
HashBasedTable.create();
     // partition id -> (shadow index id -> shadow index))
+    @SerializedName(value = "partitionIndexMap")
     private Table<Long, Long, MaterializedIndex> partitionIndexMap = 
HashBasedTable.create();
     // shadow index id -> origin index id
     @SerializedName(value = "indexIdMap")
@@ -97,7 +98,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
     private Map<Long, List<Column>> indexSchemaMap = Maps.newHashMap();
     // shadow index id -> (shadow index schema version : schema hash)
     @SerializedName(value = "indexSchemaVersionAndHashMap")
-    private Map<Long, Pair<Integer, Integer>> indexSchemaVersionAndHashMap = 
Maps.newHashMap();
+    private Map<Long, SchemaVersionAndHash> indexSchemaVersionAndHashMap = 
Maps.newHashMap();
     // shadow index id -> shadow index short key count
     @SerializedName(value = "indexShortKeyMap")
     private Map<Long, Short> indexShortKeyMap = Maps.newHashMap();
@@ -151,7 +152,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             short shadowIdxShortKeyCount, List<Column> shadowIdxSchema) {
         indexIdMap.put(shadowIdxId, originIdxId);
         indexIdToName.put(shadowIdxId, shadowIndexName);
-        indexSchemaVersionAndHashMap.put(shadowIdxId, 
Pair.create(shadowSchemaVersion, shadowSchemaHash));
+        indexSchemaVersionAndHashMap.put(shadowIdxId, new 
SchemaVersionAndHash(shadowSchemaVersion, shadowSchemaHash));
         indexShortKeyMap.put(shadowIdxId, shadowIdxShortKeyCount);
         indexSchemaMap.put(shadowIdxId, shadowIdxSchema);
     }
@@ -234,7 +235,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                     
                     short shadowShortKeyColumnCount = 
indexShortKeyMap.get(shadowIdxId);
                     List<Column> shadowSchema = 
indexSchemaMap.get(shadowIdxId);
-                    int shadowSchemaHash = 
indexSchemaVersionAndHashMap.get(shadowIdxId).second;
+                    int shadowSchemaHash = 
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
                     int originSchemaHash = 
tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));
                     
                     for (Tablet shadowTablet : shadowIdx.getTablets()) {
@@ -334,8 +335,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
         for (long shadowIdxId : indexIdMap.keySet()) {
             tbl.setIndexMeta(shadowIdxId, indexIdToName.get(shadowIdxId), 
indexSchemaMap.get(shadowIdxId),
-                    indexSchemaVersionAndHashMap.get(shadowIdxId).first,
-                    indexSchemaVersionAndHashMap.get(shadowIdxId).second,
+                    
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaVersion,
+                    indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash,
                     indexShortKeyMap.get(shadowIdxId), TStorageType.COLUMN, 
null);
         }
 
@@ -385,7 +386,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                     MaterializedIndex shadowIdx = entry.getValue();
 
                     long originIdxId = indexIdMap.get(shadowIdxId);
-                    int shadowSchemaHash = 
indexSchemaVersionAndHashMap.get(shadowIdxId).second;
+                    int shadowSchemaHash = 
indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
                     int originSchemaHash = 
tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));
 
                     for (Tablet shadowTablet : shadowIdx.getTablets()) {
@@ -679,7 +680,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
                 TStorageMedium medium = 
tbl.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
                 TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, 
partitionId, shadowIndexId,
-                        
indexSchemaVersionAndHashMap.get(shadowIndexId).second, medium);
+                        
indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium);
 
                 for (Tablet shadownTablet : shadowIndex.getTablets()) {
                     invertedIndex.addTablet(shadownTablet.getId(), 
shadowTabletMeta);
@@ -867,7 +868,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             }
             int schemaVersion = in.readInt();
             int schemaVersionHash = in.readInt();
-            Pair<Integer, Integer> schemaVersionAndHash = 
Pair.create(schemaVersion, schemaVersionHash);
+            SchemaVersionAndHash schemaVersionAndHash = new 
SchemaVersionAndHash(schemaVersion, schemaVersionHash);
             short shortKeyCount = in.readShort();
 
             indexIdMap.put(shadowIndexId, originIndexId);
@@ -923,7 +924,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             String indexName = Text.readString(in);
             int schemaVersion = in.readInt();
             int schemaVersionHash = in.readInt();
-            Pair<Integer, Integer> schemaVersionAndHash = 
Pair.create(schemaVersion, schemaVersionHash);
+            SchemaVersionAndHash schemaVersionAndHash = new 
SchemaVersionAndHash(schemaVersion, schemaVersionHash);
 
             indexIdMap.put(shadowIndexId, originIndexId);
             indexIdToName.put(shadowIndexId, indexName);
diff --git a/fe/src/main/java/org/apache/doris/common/SchemaVersionAndHash.java 
b/fe/src/main/java/org/apache/doris/common/SchemaVersionAndHash.java
new file mode 100644
index 0000000..7932d52
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/common/SchemaVersionAndHash.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/*
+ * Currently just used for persisting schema version and schema hash pair
+ * using GSON
+ */
+public class SchemaVersionAndHash implements Writable {
+
+    @SerializedName(value = "version")
+    public int schemaVersion;
+    @SerializedName(value = "hash")
+    public int schemaHash;
+
+    public SchemaVersionAndHash(int schemaVersion, int schemaHash) {
+        this.schemaVersion = schemaVersion;
+        this.schemaHash = schemaHash;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    @Override
+    public String toString() {
+        return schemaVersion + ":" + schemaHash;
+    }
+
+    public static SchemaVersionAndHash read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, SchemaVersionAndHash.class);
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index 3762095..fa3ef9e 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -29,9 +29,9 @@ import org.apache.doris.backup.RestoreJob;
 import org.apache.doris.catalog.BrokerMgr;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.Function;
 import org.apache.doris.catalog.FunctionSearchDesc;
+import org.apache.doris.catalog.Resource;
 import org.apache.doris.cluster.BaseParam;
 import org.apache.doris.cluster.Cluster;
 import org.apache.doris.common.Config;
@@ -779,6 +779,7 @@ public class EditLog {
             }
         } catch (Exception e) {
             LOG.error("Operation Type {}", opCode, e);
+            System.exit(-1);
         }
     }
 
diff --git a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java 
b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
index edcf8ee..1eea8bb 100644
--- a/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
+++ b/fe/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java
@@ -52,6 +52,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.common.SchemaVersionAndHash;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.meta.MetaContext;
@@ -62,6 +63,8 @@ import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.transaction.FakeTransactionIDGenerator;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 
+import com.google.common.collect.Maps;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -393,7 +396,9 @@ public class SchemaChangeJobV2Test {
         SchemaChangeJobV2 schemaChangeJobV2 = new SchemaChangeJobV2(1, 1,1, 
"test",600000);
         schemaChangeJobV2.setStorageFormat(TStorageFormat.V2);
         Deencapsulation.setField(schemaChangeJobV2, "jobState", 
AlterJobV2.JobState.FINISHED);
-
+        Map<Long, SchemaVersionAndHash> indexSchemaVersionAndHashMap = 
Maps.newHashMap();
+        indexSchemaVersionAndHashMap.put(Long.valueOf(1000), new 
SchemaVersionAndHash(10, 20));
+        Deencapsulation.setField(schemaChangeJobV2, 
"indexSchemaVersionAndHashMap", indexSchemaVersionAndHashMap);
 
         // write schema change job
         schemaChangeJobV2.write(out);
@@ -410,5 +415,12 @@ public class SchemaChangeJobV2Test {
         Assert.assertEquals(1, result.getJobId());
         Assert.assertEquals(AlterJobV2.JobState.FINISHED, 
result.getJobState());
         Assert.assertEquals(TStorageFormat.V2, 
Deencapsulation.getField(result, "storageFormat"));
+
+        Assert.assertNotNull(Deencapsulation.getField(result, 
"partitionIndexMap"));
+        Assert.assertNotNull(Deencapsulation.getField(result, 
"partitionIndexTabletMap"));
+
+        Map<Long, SchemaVersionAndHash> map = Deencapsulation.getField(result, 
"indexSchemaVersionAndHashMap");
+        Assert.assertEquals(10, map.get(1000L).schemaVersion);
+        Assert.assertEquals(20, map.get(1000L).schemaHash);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to