This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f9a80ecdab5 [improvement](sync version) fe sync version with be 
(#25236)
f9a80ecdab5 is described below

commit f9a80ecdab530abdc86f381b97751ab654152dff
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Mon Oct 16 20:34:25 2023 +0800

    [improvement](sync version) fe sync version with be (#25236)
---
 .../java/org/apache/doris/catalog/Replica.java     |  36 ++++-
 .../apache/doris/catalog/TabletInvertedIndex.java  |  22 ++-
 .../org/apache/doris/clone/TabletSchedCtx.java     |   7 +
 .../java/org/apache/doris/master/MasterImpl.java   |   9 +-
 .../org/apache/doris/master/ReportHandler.java     |  42 +++--
 .../org/apache/doris/clone/RepairVersionTest.java  | 176 +++++++++++++++++++++
 .../apache/doris/utframe/TestWithFeService.java    |   2 +-
 7 files changed, 274 insertions(+), 20 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index e55eab89392..631f2ebaf3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -20,6 +20,7 @@ package org.apache.doris.catalog;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.gson.annotations.SerializedName;
@@ -114,6 +115,14 @@ public class Replica implements Writable {
     private TUniqueId cooldownMetaId;
     private long cooldownTerm = -1;
 
+    // A replica version should increase monotonically,
+    // but backend may missing some versions due to disk failure or bugs.
+    // FE should found these and mark the replica as missing versions.
+    // If backend's report version < fe version, record the backend's report 
version as `regressiveVersion`,
+    // and if time exceed 5min, fe should mark this replica as missing 
versions.
+    private long regressiveVersion = -1;
+    private long regressiveVersionTimestamp = 0;
+
     /*
      * This can happen when this replica is created by a balance clone task, 
and
      * when task finished, the version of this replica is behind the 
partition's visible version.
@@ -435,9 +444,9 @@ public class Replica implements Writable {
 
         if (lastFailedVersion != this.lastFailedVersion) {
             // Case 2:
-            if (lastFailedVersion > this.lastFailedVersion) {
+            if (lastFailedVersion > this.lastFailedVersion || 
lastFailedVersion < 0) {
                 this.lastFailedVersion = lastFailedVersion;
-                this.lastFailedTimestamp = System.currentTimeMillis();
+                this.lastFailedTimestamp = lastFailedVersion > 0 ? 
System.currentTimeMillis() : -1L;
             }
 
             this.lastSuccessVersion = this.version;
@@ -506,10 +515,6 @@ public class Replica implements Writable {
         return true;
     }
 
-    public void setLastFailedVersion(long lastFailedVersion) {
-        this.lastFailedVersion = lastFailedVersion;
-    }
-
     public void setState(ReplicaState replicaState) {
         this.state = replicaState;
     }
@@ -534,6 +539,25 @@ public class Replica implements Writable {
         this.versionCount = versionCount;
     }
 
+    public boolean checkVersionRegressive(long newVersion) {
+        if (newVersion >= version) {
+            regressiveVersion = -1;
+            regressiveVersionTimestamp = -1;
+            return false;
+        }
+
+        if (DebugPointUtil.isEnable("Replica.regressive_version_immediately")) 
{
+            return true;
+        }
+
+        if (newVersion != regressiveVersion) {
+            regressiveVersion = newVersion;
+            regressiveVersionTimestamp = System.currentTimeMillis();
+        }
+
+        return System.currentTimeMillis() - regressiveVersionTimestamp >= 5 * 
60 * 1000L;
+    }
+
     @Override
     public String toString() {
         StringBuilder strBuffer = new StringBuilder("[replicaId=");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 2b601f9f030..a2d5983aac4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -390,10 +390,22 @@ public class TabletInvertedIndex {
         if (backendTabletInfo.getVersion() > versionInFe) {
             // backend replica's version is larger or newer than replica in 
FE, sync it.
             return true;
-        } else if (versionInFe == backendTabletInfo.getVersion() && 
replicaInFe.isBad()) {
+        } else if (versionInFe == backendTabletInfo.getVersion()) {
             // backend replica's version is equal to replica in FE, but 
replica in FE is bad,
             // while backend replica is good, sync it
-            return true;
+            if (replicaInFe.isBad()) {
+                return true;
+            }
+
+            // FE' s replica last failed version > partition's committed 
version
+            // this can be occur when be report miss version, fe will set last 
failed version = visible version + 1
+            // then last failed version may greater than partition's committed 
version
+            //
+            // But here cannot got variable partition, we just check 
lastFailedVersion = version + 1,
+            // In ReportHandler.sync, we will check if last failed version > 
partition's committed version again.
+            if (replicaInFe.getLastFailedVersion() == versionInFe + 1) {
+                return true;
+            }
         }
 
         return false;
@@ -501,6 +513,12 @@ public class TabletInvertedIndex {
             // so we only return true if version_miss is true.
             return true;
         }
+
+        // backend versions regressive due to bugs
+        if 
(replicaInFe.checkVersionRegressive(backendTabletInfo.getVersion())) {
+            return true;
+        }
+
         return false;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index c9671cd2db7..c57bdc7762a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -1074,6 +1074,13 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
 
             replica.updateVersionInfo(reportedTablet.getVersion(), 
reportedTablet.getDataSize(),
                     reportedTablet.getDataSize(), 
reportedTablet.getRowCount());
+            if (replica.getLastFailedVersion() > 
partition.getCommittedVersion()
+                    && reportedTablet.getVersion() >= 
partition.getCommittedVersion()
+                    //&& !(reportedTablet.isSetVersionMiss() && 
reportedTablet.isVersionMiss()
+                    && !(reportedTablet.isSetUsed() && 
!reportedTablet.isUsed())) {
+                LOG.info("change replica {} of tablet {} 's last failed 
version to -1", replica, tabletId);
+                replica.updateLastFailedVersion(-1L);
+            }
             if (reportedTablet.isSetPathHash()) {
                 replica.setPathHash(reportedTablet.getPathHash());
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 2833eff5f3d..64b771663b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -192,7 +192,7 @@ public class MasterImpl {
                     finishRecoverTablet(task);
                     break;
                 case ALTER:
-                    finishAlterTask(task);
+                    finishAlterTask(task, request);
                     break;
                 case ALTER_INVERTED_INDEX:
                     finishAlterInvertedIndexTask(task, request);
@@ -575,7 +575,7 @@ public class MasterImpl {
         return reportHandler.handleReport(request);
     }
 
-    private void finishAlterTask(AgentTask task) {
+    private void finishAlterTask(AgentTask task, TFinishTaskRequest request) {
         AlterReplicaTask alterTask = (AlterReplicaTask) task;
         try {
             if (alterTask.getJobType() == JobType.ROLLUP) {
@@ -584,6 +584,11 @@ public class MasterImpl {
                 
Env.getCurrentEnv().getSchemaChangeHandler().handleFinishAlterTask(alterTask);
             }
             alterTask.setFinished(true);
+            if (request.isSetReportVersion()) {
+                long reportVersion = request.getReportVersion();
+                Env.getCurrentSystemInfo().updateBackendReportVersion(
+                        task.getBackendId(), reportVersion, task.getDbId(), 
task.getTableId());
+            }
         } catch (MetaNotFoundException e) {
             LOG.warn("failed to handle finish alter task: {}, {}", 
task.getSignature(), e.getMessage());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 4461ba19473..5b781d7c5f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -403,7 +403,8 @@ public class ReportHandler extends Daemon {
         }
     }
 
-    private static void tabletReport(long backendId, Map<Long, TTablet> 
backendTablets, long backendReportVersion) {
+    // public for fe ut
+    public static void tabletReport(long backendId, Map<Long, TTablet> 
backendTablets, long backendReportVersion) {
         long start = System.currentTimeMillis();
         LOG.info("backend[{}] reports {} tablet(s). report version: {}",
                 backendId, backendTablets.size(), backendReportVersion);
@@ -607,6 +608,11 @@ public class ReportHandler extends Daemon {
                 if (olapTable == null || !olapTable.writeLockIfExist()) {
                     continue;
                 }
+
+                if (backendReportVersion < 
Env.getCurrentSystemInfo().getBackendReportVersion(backendId)) {
+                    break;
+                }
+
                 try {
                     long partitionId = tabletMeta.getPartitionId();
                     Partition partition = olapTable.getPartition(partitionId);
@@ -660,14 +666,25 @@ public class ReportHandler extends Daemon {
                             continue;
                         }
 
-                        if (metaVersion < backendVersion
-                                || (metaVersion == backendVersion && 
replica.isBad())) {
-
-                            if (backendReportVersion < 
Env.getCurrentSystemInfo()
-                                    .getBackendReportVersion(backendId)) {
-                                continue;
+                        boolean needSync = false;
+                        if (metaVersion < backendVersion) {
+                            needSync = true;
+                        } else if (metaVersion == backendVersion) {
+                            if (replica.isBad()) {
+                                needSync = true;
                             }
+                            if (replica.getVersion() >= 
partition.getCommittedVersion()
+                                    && replica.getLastFailedVersion() > 
partition.getCommittedVersion()) {
+                                LOG.info("sync replica {} of tablet {} in 
backend {} in db {}. replica last failed"
+                                        + " version change to -1 because last 
failed version > replica's committed"
+                                        + " version {}",
+                                        replica, tabletId, backendId, dbId, 
partition.getCommittedVersion());
+                                replica.updateLastFailedVersion(-1L);
+                                needSync = true;
+                            }
+                        }
 
+                        if (needSync) {
                             // happens when
                             // 1. PUSH finished in BE but failed or not yet 
report to FE
                             // 2. repair for VERSION_INCOMPLETE finished in 
BE, but failed or not yet report to FE
@@ -1048,18 +1065,25 @@ public class ReportHandler extends Daemon {
                                 break;
                             }
 
-                            if (tTabletInfo.isSetVersionMiss() && 
tTabletInfo.isVersionMiss()) {
+                            if ((tTabletInfo.isSetVersionMiss() && 
tTabletInfo.isVersionMiss())
+                                    || 
replica.checkVersionRegressive(tTabletInfo.getVersion())) {
                                 // If the origin last failed version is larger 
than 0, not change it.
                                 // Otherwise, we set last failed version to 
replica'version + 1.
                                 // Because last failed version should always 
larger than replica's version.
                                 long newLastFailedVersion = 
replica.getLastFailedVersion();
                                 if (newLastFailedVersion < 0) {
                                     newLastFailedVersion = 
replica.getVersion() + 1;
+                                    
replica.updateLastFailedVersion(newLastFailedVersion);
+                                    LOG.warn("set missing version for replica 
{} of tablet {} on backend {}, "
+                                            + "version in fe {}, version in be 
{}, be missing {}",
+                                            replica.getId(), tabletId, 
backendId, replica.getVersion(),
+                                            tTabletInfo.getVersion(), 
tTabletInfo.isVersionMiss());
                                 }
-                                
replica.updateLastFailedVersion(newLastFailedVersion);
                                 
backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion);
                                 break;
                             }
+
+                            break;
                         }
                     }
                 } finally {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
new file mode 100644
index 00000000000..7539548583c
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
+import org.apache.doris.master.ReportHandler;
+import org.apache.doris.thrift.TTablet;
+import org.apache.doris.thrift.TTabletInfo;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+public class RepairVersionTest extends TestWithFeService {
+    private class TableInfo {
+        Partition partition;
+        Tablet tablet;
+        Replica replica;
+    }
+
+    @Override
+    protected void beforeCreatingConnectContext() throws Exception {
+        Config.enable_debug_points = true;
+        Config.disable_balance = true;
+        Config.disable_tablet_scheduler = true;
+        Config.allow_replica_on_same_host = true;
+        Config.tablet_checker_interval_ms = 100;
+        Config.tablet_schedule_interval_ms = 100;
+    }
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test");
+    }
+
+    @Override
+    protected int backendNum() {
+        return 2;
+    }
+
+    @Test
+    public void testRepairLastFailedVersionByClone() throws Exception {
+        TableInfo info = 
prepareTableForTest("tbl_repair_last_fail_version_by_clone");
+        Partition partition = info.partition;
+        Replica replica = info.replica;
+
+        replica.updateLastFailedVersion(replica.getVersion() + 1);
+        Assertions.assertEquals(partition.getCommittedVersion() + 1, 
replica.getLastFailedVersion());
+
+        Config.disable_tablet_scheduler = false;
+        Thread.sleep(1000);
+        Config.disable_tablet_scheduler = true;
+
+        Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
+        Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+    }
+
+    @Test
+    public void testRepairLastFailedVersionByReport() throws Exception {
+        TableInfo info = 
prepareTableForTest("tbl_repair_last_fail_version_by_report");
+        Partition partition = info.partition;
+        Tablet tablet = info.tablet;
+        Replica replica = info.replica;
+
+        replica.updateLastFailedVersion(replica.getVersion() + 1);
+        Assertions.assertEquals(partition.getCommittedVersion() + 1, 
replica.getLastFailedVersion());
+
+        TTabletInfo tTabletInfo = new TTabletInfo();
+        tTabletInfo.setTabletId(tablet.getId());
+        tTabletInfo.setSchemaHash(replica.getSchemaHash());
+        tTabletInfo.setVersion(replica.getVersion());
+        tTabletInfo.setPathHash(replica.getPathHash());
+        tTabletInfo.setPartitionId(partition.getId());
+        tTabletInfo.setReplicaId(replica.getId());
+
+        TTablet tTablet = new TTablet();
+        tTablet.addToTabletInfos(tTabletInfo);
+        Map<Long, TTablet> tablets = Maps.newHashMap();
+        tablets.put(tablet.getId(), tTablet);
+        Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
+
+        ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+
+        Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
+        Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+    }
+
+    @Test
+    public void testVersionRegressive() throws Exception {
+        TableInfo info = prepareTableForTest("tbl_version_regressive");
+        Partition partition = info.partition;
+        Tablet tablet = info.tablet;
+        Replica replica = info.replica;
+
+        Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
+        Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+        Assertions.assertTrue(replica.getVersion() > 1L);
+
+        TTabletInfo tTabletInfo = new TTabletInfo();
+        tTabletInfo.setTabletId(tablet.getId());
+        tTabletInfo.setSchemaHash(replica.getSchemaHash());
+        tTabletInfo.setVersion(1L); // be report version = 1 which less than 
fe version
+        tTabletInfo.setPathHash(replica.getPathHash());
+        tTabletInfo.setPartitionId(partition.getId());
+        tTabletInfo.setReplicaId(replica.getId());
+
+        TTablet tTablet = new TTablet();
+        tTablet.addToTabletInfos(tTabletInfo);
+        Map<Long, TTablet> tablets = Maps.newHashMap();
+        tablets.put(tablet.getId(), tTablet);
+
+        ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+        Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+
+        DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", 
new DebugPoint());
+        ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+        Assertions.assertEquals(replica.getVersion() + 1, 
replica.getLastFailedVersion());
+
+        Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
+    }
+
+    private TableInfo prepareTableForTest(String tableName) throws Exception {
+        createTable("CREATE TABLE test." + tableName + " (k INT) DISTRIBUTED 
BY HASH(k) "
+                + " BUCKETS 1 PROPERTIES ( \"replication_num\" = \"2\" )");
+
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:test");
+        OlapTable tbl = (OlapTable) db.getTableOrMetaException(tableName);
+        Assertions.assertNotNull(tbl);
+        Partition partition = tbl.getPartitions().iterator().next();
+        Tablet tablet = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
+                .getTablets().iterator().next();
+
+        long visibleVersion = 2L;
+        partition.updateVisibleVersion(visibleVersion);
+        partition.setNextVersion(visibleVersion + 1);
+        tablet.getReplicas().forEach(replica -> 
replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L));
+
+        Replica replica = tablet.getReplicas().iterator().next();
+        Assertions.assertEquals(visibleVersion, replica.getVersion());
+        Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+
+        TableInfo info = new TableInfo();
+        info.partition = partition;
+        info.tablet = tablet;
+        info.replica = replica;
+
+        return info;
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 8861112624f..3aa3a464c73 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -392,7 +392,7 @@ public abstract class TestWithFeService {
             InterruptedException {
         int feRpcPort = startFEServer(runningDir);
         List<Backend> bes = Lists.newArrayList();
-        System.out.println("start create backend");
+        System.out.println("start create backend, backend num " + backendNum);
         for (int i = 0; i < backendNum; i++) {
             bes.add(createBackend("127.0.0.1", feRpcPort));
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to