This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new aa4ac2d  [Bug] Serialize storage format in rollup job (#3686)
aa4ac2d is described below

commit aa4ac2d078168aaadd74701626f95adfe856a299
Author: EmmyMiao87 <[email protected]>
AuthorDate: Tue May 26 15:35:12 2020 +0800

    [Bug] Serialize storage format in rollup job (#3686)
    
    The segment v2 rollup job should set the storage format v2 and serialize it.
    If it is not serialized, the rollup of segment v2 may use the error format 
'segment v1'.
---
 .../java/org/apache/doris/alter/RollupJobV2.java   | 10 +++-
 .../org/apache/doris/common/FeMetaVersion.java     |  4 +-
 .../org/apache/doris/alter/RollupJobV2Test.java    | 62 ++++++++++++++++++++--
 3 files changed, 70 insertions(+), 6 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 4127bd2..71d2367 100644
--- a/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -33,6 +33,7 @@ import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.TimeUtils;
@@ -88,14 +89,15 @@ public class RollupJobV2 extends AlterJobV2 {
     private KeysType rollupKeysType;
     private short rollupShortKeyColumnCount;
 
+    // optional
+    private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
+
     // The rollup job will wait all transactions before this txn id finished, 
then send the rollup tasks.
     protected long watershedTxnId = -1;
 
     // save all create rollup tasks
     private AgentBatchTask rollupBatchTask = new AgentBatchTask();
 
-    private TStorageFormat storageFormat = null;
-
     public RollupJobV2(long jobId, long dbId, long tableId, String tableName, 
long timeoutMs,
             long baseIndexId, long rollupIndexId, String baseIndexName, String 
rollupIndexName,
             List<Column> rollupSchema, int baseSchemaHash, int 
rollupSchemaHash,
@@ -542,6 +544,7 @@ public class RollupJobV2 extends AlterJobV2 {
         out.writeShort(rollupShortKeyColumnCount);
 
         out.writeLong(watershedTxnId);
+        Text.writeString(out, storageFormat.name());
     }
 
     @Override
@@ -579,6 +582,9 @@ public class RollupJobV2 extends AlterJobV2 {
         rollupShortKeyColumnCount = in.readShort();
 
         watershedTxnId = in.readLong();
+        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_85) {
+            storageFormat = TStorageFormat.valueOf(Text.readString(in));
+        }
     }
 
     /**
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java 
b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index bebaccf..f4d1f8c 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -179,6 +179,8 @@ public final class FeMetaVersion {
     public static final int VERSION_83 = 83;
     // add storage format in schema change job
     public static final int VERSION_84 = 84;
+    // add storage format in rollup job
+    public static final int VERSION_85 = 85;
     // note: when increment meta version, should assign the latest version to 
VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_84;
+    public static final int VERSION_CURRENT = VERSION_85;
 }
diff --git a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java 
b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
index 2be848d..35429d0 100644
--- a/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
+++ b/fe/src/test/java/org/apache/doris/alter/RollupJobV2Test.java
@@ -17,51 +17,71 @@
 
 package org.apache.doris.alter;
 
-import mockit.Mock;
-import mockit.MockUp;
+import static org.junit.Assert.assertEquals;
+
 import org.apache.doris.alter.AlterJobV2.JobState;
 import org.apache.doris.analysis.AccessTestUtil;
 import org.apache.doris.analysis.AddRollupClause;
 import org.apache.doris.analysis.AlterClause;
 import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.CatalogTestUtil;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.FakeCatalog;
 import org.apache.doris.catalog.FakeEditLog;
+import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.MaterializedIndexMeta;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.OlapTable.OlapTableState;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.meta.MetaContext;
 import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.transaction.FakeTransactionIDGenerator;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 
 import com.google.common.collect.Lists;
 
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
 
 public class RollupJobV2Test {
+    private static String fileName = "./RollupJobV2Test";
+
     private static FakeTransactionIDGenerator fakeTransactionIDGenerator;
     private static GlobalTransactionMgr masterTransMgr;
     private static GlobalTransactionMgr slaveTransMgr;
@@ -118,6 +138,12 @@ public class RollupJobV2Test {
         };
     }
 
+    @After
+    public void tearDown() {
+        File file = new File(fileName);
+        file.delete();
+    }
+
     @Test
     public void testRunRollupJobConcurrentLimit() throws UserException {
         fakeCatalog = new FakeCatalog();
@@ -338,4 +364,34 @@ public class RollupJobV2Test {
         materializedViewHandler.runAfterCatalogReady();
         Assert.assertEquals(JobState.FINISHED, rollupJob.getJobState());
     }
+
+
+    @Test
+    public void testSerializeOfRollupJob() throws IOException {
+        // prepare file
+        File file = new File(fileName);
+        file.createNewFile();
+        DataOutputStream out = new DataOutputStream(new 
FileOutputStream(file));
+        
+        short keysCount = 1;
+        RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, 
"test", "rollup",Lists.newArrayList(), 1, 1,
+                KeysType.AGG_KEYS, keysCount);
+        rollupJobV2.setStorageFormat(TStorageFormat.V2);
+
+        // write rollup job
+        rollupJobV2.write(out);
+        out.flush();
+        out.close();
+
+        // read objects from file
+        MetaContext metaContext = new MetaContext();
+        metaContext.setMetaVersion(FeMetaVersion.VERSION_85);
+        metaContext.setThreadLocalInfo();
+        DataInputStream in = new DataInputStream(new FileInputStream(file));
+
+        RollupJobV2 result = (RollupJobV2) AlterJobV2.read(in);
+        Catalog.getCurrentCatalogJournalVersion();
+        Assert.assertEquals(TStorageFormat.V2, 
Deencapsulation.getField(result, "storageFormat"));
+
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to