This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f9a80ecdab5 [improvement](sync version) fe sync version with be (#25236) f9a80ecdab5 is described below commit f9a80ecdab530abdc86f381b97751ab654152dff Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Mon Oct 16 20:34:25 2023 +0800 [improvement](sync version) fe sync version with be (#25236) --- .../java/org/apache/doris/catalog/Replica.java | 36 ++++- .../apache/doris/catalog/TabletInvertedIndex.java | 22 ++- .../org/apache/doris/clone/TabletSchedCtx.java | 7 + .../java/org/apache/doris/master/MasterImpl.java | 9 +- .../org/apache/doris/master/ReportHandler.java | 42 +++-- .../org/apache/doris/clone/RepairVersionTest.java | 176 +++++++++++++++++++++ .../apache/doris/utframe/TestWithFeService.java | 2 +- 7 files changed, 274 insertions(+), 20 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index e55eab89392..631f2ebaf3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -20,6 +20,7 @@ package org.apache.doris.catalog; import org.apache.doris.common.Config; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.thrift.TUniqueId; import com.google.gson.annotations.SerializedName; @@ -114,6 +115,14 @@ public class Replica implements Writable { private TUniqueId cooldownMetaId; private long cooldownTerm = -1; + // A replica version should increase monotonically, + // but backend may missing some versions due to disk failure or bugs. + // FE should found these and mark the replica as missing versions. + // If backend's report version < fe version, record the backend's report version as `regressiveVersion`, + // and if time exceed 5min, fe should mark this replica as missing versions. + private long regressiveVersion = -1; + private long regressiveVersionTimestamp = 0; + /* * This can happen when this replica is created by a balance clone task, and * when task finished, the version of this replica is behind the partition's visible version. @@ -435,9 +444,9 @@ public class Replica implements Writable { if (lastFailedVersion != this.lastFailedVersion) { // Case 2: - if (lastFailedVersion > this.lastFailedVersion) { + if (lastFailedVersion > this.lastFailedVersion || lastFailedVersion < 0) { this.lastFailedVersion = lastFailedVersion; - this.lastFailedTimestamp = System.currentTimeMillis(); + this.lastFailedTimestamp = lastFailedVersion > 0 ? System.currentTimeMillis() : -1L; } this.lastSuccessVersion = this.version; @@ -506,10 +515,6 @@ public class Replica implements Writable { return true; } - public void setLastFailedVersion(long lastFailedVersion) { - this.lastFailedVersion = lastFailedVersion; - } - public void setState(ReplicaState replicaState) { this.state = replicaState; } @@ -534,6 +539,25 @@ public class Replica implements Writable { this.versionCount = versionCount; } + public boolean checkVersionRegressive(long newVersion) { + if (newVersion >= version) { + regressiveVersion = -1; + regressiveVersionTimestamp = -1; + return false; + } + + if (DebugPointUtil.isEnable("Replica.regressive_version_immediately")) { + return true; + } + + if (newVersion != regressiveVersion) { + regressiveVersion = newVersion; + regressiveVersionTimestamp = System.currentTimeMillis(); + } + + return System.currentTimeMillis() - regressiveVersionTimestamp >= 5 * 60 * 1000L; + } + @Override public String toString() { StringBuilder strBuffer = new StringBuilder("[replicaId="); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 2b601f9f030..a2d5983aac4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -390,10 +390,22 @@ public class TabletInvertedIndex { if (backendTabletInfo.getVersion() > versionInFe) { // backend replica's version is larger or newer than replica in FE, sync it. return true; - } else if (versionInFe == backendTabletInfo.getVersion() && replicaInFe.isBad()) { + } else if (versionInFe == backendTabletInfo.getVersion()) { // backend replica's version is equal to replica in FE, but replica in FE is bad, // while backend replica is good, sync it - return true; + if (replicaInFe.isBad()) { + return true; + } + + // FE' s replica last failed version > partition's committed version + // this can be occur when be report miss version, fe will set last failed version = visible version + 1 + // then last failed version may greater than partition's committed version + // + // But here cannot got variable partition, we just check lastFailedVersion = version + 1, + // In ReportHandler.sync, we will check if last failed version > partition's committed version again. + if (replicaInFe.getLastFailedVersion() == versionInFe + 1) { + return true; + } } return false; @@ -501,6 +513,12 @@ public class TabletInvertedIndex { // so we only return true if version_miss is true. return true; } + + // backend versions regressive due to bugs + if (replicaInFe.checkVersionRegressive(backendTabletInfo.getVersion())) { + return true; + } + return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index c9671cd2db7..c57bdc7762a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -1074,6 +1074,13 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getDataSize(), reportedTablet.getDataSize(), reportedTablet.getRowCount()); + if (replica.getLastFailedVersion() > partition.getCommittedVersion() + && reportedTablet.getVersion() >= partition.getCommittedVersion() + //&& !(reportedTablet.isSetVersionMiss() && reportedTablet.isVersionMiss() + && !(reportedTablet.isSetUsed() && !reportedTablet.isUsed())) { + LOG.info("change replica {} of tablet {} 's last failed version to -1", replica, tabletId); + replica.updateLastFailedVersion(-1L); + } if (reportedTablet.isSetPathHash()) { replica.setPathHash(reportedTablet.getPathHash()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 2833eff5f3d..64b771663b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -192,7 +192,7 @@ public class MasterImpl { finishRecoverTablet(task); break; case ALTER: - finishAlterTask(task); + finishAlterTask(task, request); break; case ALTER_INVERTED_INDEX: finishAlterInvertedIndexTask(task, request); @@ -575,7 +575,7 @@ public class MasterImpl { return reportHandler.handleReport(request); } - private void finishAlterTask(AgentTask task) { + private void finishAlterTask(AgentTask task, TFinishTaskRequest request) { AlterReplicaTask alterTask = (AlterReplicaTask) task; try { if (alterTask.getJobType() == JobType.ROLLUP) { @@ -584,6 +584,11 @@ public class MasterImpl { Env.getCurrentEnv().getSchemaChangeHandler().handleFinishAlterTask(alterTask); } alterTask.setFinished(true); + if (request.isSetReportVersion()) { + long reportVersion = request.getReportVersion(); + Env.getCurrentSystemInfo().updateBackendReportVersion( + task.getBackendId(), reportVersion, task.getDbId(), task.getTableId()); + } } catch (MetaNotFoundException e) { LOG.warn("failed to handle finish alter task: {}, {}", task.getSignature(), e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 4461ba19473..5b781d7c5f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -403,7 +403,8 @@ public class ReportHandler extends Daemon { } } - private static void tabletReport(long backendId, Map<Long, TTablet> backendTablets, long backendReportVersion) { + // public for fe ut + public static void tabletReport(long backendId, Map<Long, TTablet> backendTablets, long backendReportVersion) { long start = System.currentTimeMillis(); LOG.info("backend[{}] reports {} tablet(s). report version: {}", backendId, backendTablets.size(), backendReportVersion); @@ -607,6 +608,11 @@ public class ReportHandler extends Daemon { if (olapTable == null || !olapTable.writeLockIfExist()) { continue; } + + if (backendReportVersion < Env.getCurrentSystemInfo().getBackendReportVersion(backendId)) { + break; + } + try { long partitionId = tabletMeta.getPartitionId(); Partition partition = olapTable.getPartition(partitionId); @@ -660,14 +666,25 @@ public class ReportHandler extends Daemon { continue; } - if (metaVersion < backendVersion - || (metaVersion == backendVersion && replica.isBad())) { - - if (backendReportVersion < Env.getCurrentSystemInfo() - .getBackendReportVersion(backendId)) { - continue; + boolean needSync = false; + if (metaVersion < backendVersion) { + needSync = true; + } else if (metaVersion == backendVersion) { + if (replica.isBad()) { + needSync = true; } + if (replica.getVersion() >= partition.getCommittedVersion() + && replica.getLastFailedVersion() > partition.getCommittedVersion()) { + LOG.info("sync replica {} of tablet {} in backend {} in db {}. replica last failed" + + " version change to -1 because last failed version > replica's committed" + + " version {}", + replica, tabletId, backendId, dbId, partition.getCommittedVersion()); + replica.updateLastFailedVersion(-1L); + needSync = true; + } + } + if (needSync) { // happens when // 1. PUSH finished in BE but failed or not yet report to FE // 2. repair for VERSION_INCOMPLETE finished in BE, but failed or not yet report to FE @@ -1048,18 +1065,25 @@ public class ReportHandler extends Daemon { break; } - if (tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) { + if ((tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) + || replica.checkVersionRegressive(tTabletInfo.getVersion())) { // If the origin last failed version is larger than 0, not change it. // Otherwise, we set last failed version to replica'version + 1. // Because last failed version should always larger than replica's version. long newLastFailedVersion = replica.getLastFailedVersion(); if (newLastFailedVersion < 0) { newLastFailedVersion = replica.getVersion() + 1; + replica.updateLastFailedVersion(newLastFailedVersion); + LOG.warn("set missing version for replica {} of tablet {} on backend {}, " + + "version in fe {}, version in be {}, be missing {}", + replica.getId(), tabletId, backendId, replica.getVersion(), + tTabletInfo.getVersion(), tTabletInfo.isVersionMiss()); } - replica.updateLastFailedVersion(newLastFailedVersion); backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion); break; } + + break; } } } finally { diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java new file mode 100644 index 00000000000..7539548583c --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.DebugPointUtil; +import org.apache.doris.common.util.DebugPointUtil.DebugPoint; +import org.apache.doris.master.ReportHandler; +import org.apache.doris.thrift.TTablet; +import org.apache.doris.thrift.TTabletInfo; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Maps; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +public class RepairVersionTest extends TestWithFeService { + private class TableInfo { + Partition partition; + Tablet tablet; + Replica replica; + } + + @Override + protected void beforeCreatingConnectContext() throws Exception { + Config.enable_debug_points = true; + Config.disable_balance = true; + Config.disable_tablet_scheduler = true; + Config.allow_replica_on_same_host = true; + Config.tablet_checker_interval_ms = 100; + Config.tablet_schedule_interval_ms = 100; + } + + @Override + protected void runBeforeAll() throws Exception { + createDatabase("test"); + } + + @Override + protected int backendNum() { + return 2; + } + + @Test + public void testRepairLastFailedVersionByClone() throws Exception { + TableInfo info = prepareTableForTest("tbl_repair_last_fail_version_by_clone"); + Partition partition = info.partition; + Replica replica = info.replica; + + replica.updateLastFailedVersion(replica.getVersion() + 1); + Assertions.assertEquals(partition.getCommittedVersion() + 1, replica.getLastFailedVersion()); + + Config.disable_tablet_scheduler = false; + Thread.sleep(1000); + Config.disable_tablet_scheduler = true; + + Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); + Assertions.assertEquals(-1L, replica.getLastFailedVersion()); + } + + @Test + public void testRepairLastFailedVersionByReport() throws Exception { + TableInfo info = prepareTableForTest("tbl_repair_last_fail_version_by_report"); + Partition partition = info.partition; + Tablet tablet = info.tablet; + Replica replica = info.replica; + + replica.updateLastFailedVersion(replica.getVersion() + 1); + Assertions.assertEquals(partition.getCommittedVersion() + 1, replica.getLastFailedVersion()); + + TTabletInfo tTabletInfo = new TTabletInfo(); + tTabletInfo.setTabletId(tablet.getId()); + tTabletInfo.setSchemaHash(replica.getSchemaHash()); + tTabletInfo.setVersion(replica.getVersion()); + tTabletInfo.setPathHash(replica.getPathHash()); + tTabletInfo.setPartitionId(partition.getId()); + tTabletInfo.setReplicaId(replica.getId()); + + TTablet tTablet = new TTablet(); + tTablet.addToTabletInfos(tTabletInfo); + Map<Long, TTablet> tablets = Maps.newHashMap(); + tablets.put(tablet.getId(), tTablet); + Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); + + ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + + Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); + Assertions.assertEquals(-1L, replica.getLastFailedVersion()); + } + + @Test + public void testVersionRegressive() throws Exception { + TableInfo info = prepareTableForTest("tbl_version_regressive"); + Partition partition = info.partition; + Tablet tablet = info.tablet; + Replica replica = info.replica; + + Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); + Assertions.assertEquals(-1L, replica.getLastFailedVersion()); + Assertions.assertTrue(replica.getVersion() > 1L); + + TTabletInfo tTabletInfo = new TTabletInfo(); + tTabletInfo.setTabletId(tablet.getId()); + tTabletInfo.setSchemaHash(replica.getSchemaHash()); + tTabletInfo.setVersion(1L); // be report version = 1 which less than fe version + tTabletInfo.setPathHash(replica.getPathHash()); + tTabletInfo.setPartitionId(partition.getId()); + tTabletInfo.setReplicaId(replica.getId()); + + TTablet tTablet = new TTablet(); + tTablet.addToTabletInfos(tTabletInfo); + Map<Long, TTablet> tablets = Maps.newHashMap(); + tablets.put(tablet.getId(), tTablet); + + ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + Assertions.assertEquals(-1L, replica.getLastFailedVersion()); + + DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", new DebugPoint()); + ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + Assertions.assertEquals(replica.getVersion() + 1, replica.getLastFailedVersion()); + + Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); + } + + private TableInfo prepareTableForTest(String tableName) throws Exception { + createTable("CREATE TABLE test." + tableName + " (k INT) DISTRIBUTED BY HASH(k) " + + " BUCKETS 1 PROPERTIES ( \"replication_num\" = \"2\" )"); + + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:test"); + OlapTable tbl = (OlapTable) db.getTableOrMetaException(tableName); + Assertions.assertNotNull(tbl); + Partition partition = tbl.getPartitions().iterator().next(); + Tablet tablet = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next() + .getTablets().iterator().next(); + + long visibleVersion = 2L; + partition.updateVisibleVersion(visibleVersion); + partition.setNextVersion(visibleVersion + 1); + tablet.getReplicas().forEach(replica -> replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L)); + + Replica replica = tablet.getReplicas().iterator().next(); + Assertions.assertEquals(visibleVersion, replica.getVersion()); + Assertions.assertEquals(-1L, replica.getLastFailedVersion()); + + TableInfo info = new TableInfo(); + info.partition = partition; + info.tablet = tablet; + info.replica = replica; + + return info; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 8861112624f..3aa3a464c73 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -392,7 +392,7 @@ public abstract class TestWithFeService { InterruptedException { int feRpcPort = startFEServer(runningDir); List<Backend> bes = Lists.newArrayList(); - System.out.println("start create backend"); + System.out.println("start create backend, backend num " + backendNum); for (int i = 0; i < backendNum; i++) { bes.add(createBackend("127.0.0.1", feRpcPort)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org