This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 0408501504 KYLIN-5308 add transaction timeout when epoch renew
0408501504 is described below
commit 0408501504b954f731017a48b2ef76f41e9b9f8e
Author: Jiale He <[email protected]>
AuthorDate: Thu Oct 13 09:53:40 2022 +0800
KYLIN-5308 add transaction timeout when epoch renew
Co-authored-by: Jiale He <[email protected]>
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +
.../common/persistence/metadata/EpochStore.java | 5 +-
.../persistence/metadata/FileEpochStore.java | 5 +
.../persistence/metadata/JdbcAuditLogStore.java | 2 +-
.../persistence/metadata/JdbcEpochStore.java | 6 +
.../common/persistence/metadata/jdbc/JdbcUtil.java | 12 +-
.../apache/kylin/common/KylinConfigBaseTest.java | 12 ++
.../epochstore/AbstractEpochStoreTest.java | 132 +++++++--------------
.../metadata/epochstore/JdbcEpochStoreTest.java | 65 +++++++---
.../apache/kylin/metadata/epoch/EpochManager.java | 17 ++-
.../kylin/metadata/epoch/EpochManagerTest.java | 57 ++++++++-
11 files changed, 197 insertions(+), 120 deletions(-)
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3b94b3b024..8498eba0e6 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2738,6 +2738,10 @@ public abstract class KylinConfigBase implements
Serializable {
return
Long.parseLong(getOptional("kylin.server.leader-race.heart-beat-interval",
"30"));
}
+ public double getEpochRenewTimeoutRate() {
+ return
Double.parseDouble(getOptional("kylin.server.leader-race.heart-beat-timeout-rate",
"0.8"));
+ }
+
public long getDiscoveryClientTimeoutThreshold() {
return
TimeUtil.timeStringAs(getOptional("kylin.server.discovery-client-timeout-threshold",
"3s"),
TimeUnit.SECONDS);
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/EpochStore.java
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/EpochStore.java
index dde480d2bc..388485b789 100644
---
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/EpochStore.java
+++
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/EpochStore.java
@@ -20,11 +20,12 @@ package org.apache.kylin.common.persistence.metadata;
import java.util.List;
import java.util.Objects;
-import lombok.extern.slf4j.Slf4j;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.Singletons;
import org.apache.kylin.common.util.AddressUtil;
+import lombok.extern.slf4j.Slf4j;
+
@Slf4j
public abstract class EpochStore {
public static final String EPOCH_SUFFIX = "_epoch";
@@ -54,6 +55,8 @@ public abstract class EpochStore {
public abstract <T> T executeWithTransaction(Callback<T> callback);
+ public abstract <T> T executeWithTransaction(Callback<T> callback, int
timeout);
+
public Epoch getGlobalEpoch() {
return getEpoch("_global");
}
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileEpochStore.java
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileEpochStore.java
index b18f4c5be4..b90a53b35e 100644
---
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileEpochStore.java
+++
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/FileEpochStore.java
@@ -157,4 +157,9 @@ public class FileEpochStore extends EpochStore {
return null;
}
+
+ @Override
+ public <T> T executeWithTransaction(Callback<T> callback, int timeout) {
+ return executeWithTransaction(callback);
+ }
}
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java
index ec2081bd14..80af263876 100644
---
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java
+++
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcAuditLogStore.java
@@ -200,7 +200,7 @@ public class JdbcAuditLogStore implements AuditLogStore {
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList())),
- TransactionDefinition.ISOLATION_REPEATABLE_READ, beforeCommit);
+ TransactionDefinition.ISOLATION_REPEATABLE_READ, beforeCommit,
TransactionDefinition.TIMEOUT_DEFAULT);
}
public void batchInsert(List<AuditLog> auditLogs) {
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcEpochStore.java
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcEpochStore.java
index be32feb19e..bb444822ac 100644
---
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcEpochStore.java
+++
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/JdbcEpochStore.java
@@ -21,6 +21,7 @@ import static
org.apache.kylin.common.exception.CommonErrorCode.FAILED_UPDATE_ME
import static
org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.datasourceParameters;
import static
org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.isTableExists;
import static
org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.withTransaction;
+import static
org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.withTransactionTimeout;
import java.io.InputStream;
import java.sql.PreparedStatement;
@@ -266,4 +267,9 @@ public class JdbcEpochStore extends EpochStore {
public <T> T executeWithTransaction(Callback<T> callback) {
return withTransaction(transactionManager, callback::handle);
}
+
+ @Override
+ public <T> T executeWithTransaction(Callback<T> callback, int timeout) {
+ return withTransactionTimeout(transactionManager, callback::handle,
timeout);
+ }
}
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java
index 7d74e8851d..94734b415e 100644
---
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java
+++
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/jdbc/JdbcUtil.java
@@ -53,19 +53,27 @@ public class JdbcUtil {
private static final Logger logger =
LoggerFactory.getLogger(JdbcUtil.class);
+ public static <T> T withTransactionTimeout(DataSourceTransactionManager
transactionManager, Callback<T> consumer,
+ int timeout) {
+ return withTransaction(transactionManager, consumer,
TransactionDefinition.ISOLATION_REPEATABLE_READ, null,
+ timeout);
+ }
+
public static <T> T withTransaction(DataSourceTransactionManager
transactionManager, Callback<T> consumer) {
return withTransaction(transactionManager, consumer,
TransactionDefinition.ISOLATION_REPEATABLE_READ);
}
public static <T> T withTransaction(DataSourceTransactionManager
transactionManager, Callback<T> consumer,
int isolationLevel) {
- return withTransaction(transactionManager, consumer, isolationLevel,
null);
+ return withTransaction(transactionManager, consumer, isolationLevel,
null,
+ TransactionDefinition.TIMEOUT_DEFAULT);
}
public static <T> T withTransaction(DataSourceTransactionManager
transactionManager, Callback<T> consumer,
- int isolationLevel, Callback<T> beforeCommit) {
+ int isolationLevel, Callback<T> beforeCommit, int timeout) {
val definition = new DefaultTransactionDefinition();
definition.setIsolationLevel(isolationLevel);
+ definition.setTimeout(timeout);
val status = transactionManager.getTransaction(definition);
try {
T result = consumer.handle();
diff --git
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index f927bd4e80..903fb8f6a0 100644
---
a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++
b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -1359,6 +1359,18 @@ class KylinConfigBaseTest {
// Reset to prevent impacting other tests
config.setProperty(WRITING_CLUSTER_WORKING_DIR, "");
}
+
+ @Test
+ void testGetEpochRenewTimeoutRate() {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ Assertions.assertEquals(0.8, config.getEpochRenewTimeoutRate());
+ config.setProperty("kylin.server.leader-race.heart-beat-timeout-rate",
"0.0");
+ Assertions.assertEquals(0.0, config.getEpochRenewTimeoutRate());
+ config.setProperty("kylin.server.leader-race.heart-beat-timeout-rate",
"0");
+ Assertions.assertEquals(0.0, config.getEpochRenewTimeoutRate());
+ config.setProperty("kylin.server.leader-race.heart-beat-timeout-rate",
"1");
+ Assertions.assertEquals(1.0, config.getEpochRenewTimeoutRate());
+ }
}
class EnvironmentUpdateUtils {
diff --git
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/AbstractEpochStoreTest.java
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/AbstractEpochStoreTest.java
index 9cc3f9e84e..02f3b05363 100644
---
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/AbstractEpochStoreTest.java
+++
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/AbstractEpochStoreTest.java
@@ -27,10 +27,11 @@ import
org.apache.kylin.common.persistence.metadata.EpochStore;
import org.apache.kylin.common.persistence.metadata.JdbcEpochStore;
import org.apache.kylin.common.util.AddressUtil;
import org.apache.kylin.junit.annotation.MetadataInfo;
-import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import com.google.common.collect.Lists;
+
import lombok.val;
@MetadataInfo(onlyProps = true)
@@ -56,152 +57,107 @@ public abstract class AbstractEpochStoreTest {
@Test
public void testInsertAndUpdate() {
- Epoch newSaveEpoch = new Epoch();
- newSaveEpoch.setEpochTarget("test1");
- newSaveEpoch.setCurrentEpochOwner("owner1");
- newSaveEpoch.setEpochId(1);
- newSaveEpoch.setLastEpochRenewTime(System.currentTimeMillis());
-
+ Epoch mockEpoch = getMockEpoch("test1", "owner1");
//insert one
- epochStore.insert(newSaveEpoch);
- val epochs = epochStore.list();
- Assert.assertEquals(epochs.size(), 1);
+ epochStore.insert(mockEpoch);
- Assert.assertTrue(compareEpoch(newSaveEpoch, epochs.get(0)));
+ val epochs = epochStore.list();
+ Assertions.assertEquals(1, epochs.size());
+ Assertions.assertTrue(compareEpoch(mockEpoch, epochs.get(0)));
//update owner
- newSaveEpoch.setCurrentEpochOwner("o2");
- epochStore.update(newSaveEpoch);
-
- Assert.assertEquals(newSaveEpoch.getCurrentEpochOwner(),
epochStore.list().get(0).getCurrentEpochOwner());
+ mockEpoch.setCurrentEpochOwner("o2");
+ epochStore.update(mockEpoch);
+ Assertions.assertEquals(mockEpoch.getCurrentEpochOwner(),
epochStore.list().get(0).getCurrentEpochOwner());
}
@Test
public void testExecuteWithTransaction_Success() {
- Epoch e1 = new Epoch();
- e1.setEpochTarget("test1");
- e1.setCurrentEpochOwner("owner1");
- e1.setEpochId(1);
- e1.setLastEpochRenewTime(System.currentTimeMillis());
-
+ Epoch mockEpoch = getMockEpoch("test1", "owner1");
epochStore.executeWithTransaction(() -> {
- epochStore.insert(e1);
-
+ epochStore.insert(mockEpoch);
//insert success
- Assert.assertEquals(epochStore.list().size(), 1);
- Assert.assertTrue(compareEpoch(e1, epochStore.list().get(0)));
-
+ Assertions.assertEquals(1, epochStore.list().size());
+ Assertions.assertTrue(compareEpoch(mockEpoch,
epochStore.list().get(0)));
return null;
});
-
}
@Test
public void testBatchUpdate() {
- Epoch e1 = new Epoch();
- e1.setEpochTarget("test1");
- e1.setCurrentEpochOwner("owner1");
- e1.setEpochId(1);
- e1.setLastEpochRenewTime(System.currentTimeMillis());
- epochStore.insert(e1);
+ Epoch e1 = getMockEpoch("test1", "owner1");
+ Epoch e2 = getMockEpoch("test2", "owner2");
- Epoch e2 = new Epoch();
- e2.setEpochTarget("test2");
- e2.setCurrentEpochOwner("owner2");
- e2.setEpochId(1);
- e2.setLastEpochRenewTime(System.currentTimeMillis());
+ epochStore.insert(e1);
epochStore.insert(e2);
- val batchEpochs = Arrays.asList(e1, e2);
-
+ val batchEpochs = Lists.newArrayList(e1, e2);
epochStore.updateBatch(batchEpochs);
-
- batchEpochs.forEach(epoch -> {
- Assert.assertTrue(compareEpoch(epoch,
epochStore.getEpoch(epoch.getEpochTarget())));
- });
-
+ batchEpochs.forEach(
+ epoch -> Assertions.assertTrue(compareEpoch(epoch,
epochStore.getEpoch(epoch.getEpochTarget()))));
}
@Test
public void testBatchUpdateWithError() {
- Epoch e1 = new Epoch();
- e1.setEpochTarget("test1");
- e1.setCurrentEpochOwner("owner1");
- e1.setEpochId(1);
- e1.setLastEpochRenewTime(System.currentTimeMillis());
- epochStore.insert(e1);
+ Epoch e1 = getMockEpoch("test1", "owner1");
+ Epoch e2 = getMockEpoch("test2", "owner2");
- Epoch e2 = new Epoch();
- e2.setEpochTarget("test2");
- e2.setCurrentEpochOwner("owner2");
- e2.setEpochId(1);
- e2.setLastEpochRenewTime(System.currentTimeMillis());
+ epochStore.insert(e1);
- val batchEpochs = Arrays.asList(e1, e2);
boolean isError = false;
try {
- epochStore.updateBatch(batchEpochs);
+ epochStore.updateBatch(Lists.newArrayList(e1, e2));
} catch (Exception e) {
isError = true;
}
if (epochStore instanceof JdbcEpochStore) {
- Assert.assertTrue(isError);
+ Assertions.assertTrue(isError);
}
}
@Test
public void testBatchInsert() {
- Epoch e1 = new Epoch();
- e1.setEpochTarget("test1");
- e1.setCurrentEpochOwner("owner1");
- e1.setEpochId(1);
- e1.setLastEpochRenewTime(System.currentTimeMillis());
-
- Epoch e2 = new Epoch();
- e2.setEpochTarget("test2");
- e2.setCurrentEpochOwner("owner2");
- e2.setEpochId(1);
- e2.setLastEpochRenewTime(System.currentTimeMillis());
+ Epoch e1 = getMockEpoch("test1", "owner1");
+ Epoch e2 = getMockEpoch("test2", "owner2");
val batchEpochs = Arrays.asList(e1, e2);
-
epochStore.insertBatch(batchEpochs);
-
- batchEpochs.forEach(epoch -> {
- Assert.assertTrue(compareEpoch(epoch,
epochStore.getEpoch(epoch.getEpochTarget())));
- });
-
+ batchEpochs.forEach(
+ epoch -> Assertions.assertTrue(compareEpoch(epoch,
epochStore.getEpoch(epoch.getEpochTarget()))));
}
@Test
public void testIsLeaderNodeWithCurrentEpochOwnerNull() {
- Epoch epoch = new Epoch();
- epoch.setEpochTarget("_global");
- epoch.setCurrentEpochOwner(null);
- epochStore.insert(epoch);
+ Epoch mockEpoch = getMockEpoch("_global", null);
+ epochStore.insert(mockEpoch);
Assertions.assertFalse(EpochStore.isLeaderNode());
}
@Test
public void testIsLeaderNodeWithServiceInfoNotEqual() {
- Epoch epoch = new Epoch();
- epoch.setEpochTarget("_global");
- epoch.setCurrentEpochOwner("owner1");
- epochStore.insert(epoch);
+ Epoch mockEpoch = getMockEpoch("_global", "owner1");
+ epochStore.insert(mockEpoch);
Assertions.assertFalse(EpochStore.isLeaderNode());
}
@Test
public void testIsLeaderNodeWithServiceInfoEqual() {
Assertions.assertFalse(EpochStore.isLeaderNode());
- Epoch epoch = new Epoch();
- epoch.setEpochTarget("_global");
- epoch.setCurrentEpochOwner(AddressUtil.getLocalInstance() + "|" +
Long.MAX_VALUE);
- epochStore.insert(epoch);
+ Epoch mockEpoch = getMockEpoch("_global",
AddressUtil.getLocalInstance() + "|" + Long.MAX_VALUE);
+ epochStore.insert(mockEpoch);
Assertions.assertTrue(EpochStore.isLeaderNode());
}
+
+ protected Epoch getMockEpoch(String epochTarget, String epochOwner) {
+ Epoch epoch = new Epoch();
+ epoch.setEpochTarget(epochTarget);
+ epoch.setCurrentEpochOwner(epochOwner);
+ epoch.setEpochId(1);
+ epoch.setLastEpochRenewTime(System.currentTimeMillis());
+ return epoch;
+ }
}
diff --git
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/JdbcEpochStoreTest.java
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/JdbcEpochStoreTest.java
index 06e1074264..d51c15ec66 100644
---
a/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/JdbcEpochStoreTest.java
+++
b/src/core-common/src/test/java/org/apache/kylin/common/persistence/metadata/epochstore/JdbcEpochStoreTest.java
@@ -19,17 +19,22 @@ package
org.apache.kylin.common.persistence.metadata.epochstore;
import static
org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.datasourceParameters;
import static org.apache.kylin.common.util.TestUtils.getTestConfig;
+import static org.awaitility.Awaitility.await;
+
+import java.util.concurrent.TimeUnit;
import org.apache.commons.dbcp2.BasicDataSourceFactory;
import org.apache.kylin.common.persistence.metadata.Epoch;
import org.apache.kylin.junit.annotation.OverwriteProp;
-import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.TransactionException;
import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
import lombok.val;
@@ -55,34 +60,62 @@ public final class JdbcEpochStoreTest extends
AbstractEpochStoreTest {
}
@Test
- public void testExecuteWithTransaction_RollBack() {
-
- Epoch e1 = new Epoch();
- e1.setEpochTarget("test1");
- e1.setCurrentEpochOwner("owner1");
- e1.setEpochId(1);
- e1.setLastEpochRenewTime(System.currentTimeMillis());
+ void testExecuteWithTransaction_RollBack() {
+ Epoch mockEpoch = getMockEpoch("test1", "owner1");
try {
epochStore.executeWithTransaction(() -> {
- epochStore.insert(e1);
-
//insert success
- Assert.assertEquals(epochStore.list().size(), 1);
- Assert.assertTrue(compareEpoch(e1, epochStore.list().get(0)));
+ epochStore.insert(mockEpoch);
+ Assertions.assertEquals(1, epochStore.list().size());
+ Assertions.assertTrue(compareEpoch(mockEpoch,
epochStore.list().get(0)));
if (epochStore.list().size() == 1) {
throw new RuntimeException("mock transaction error");
}
-
return null;
});
+ Assertions.fail();
+ } catch (RuntimeException e) {
+ Assertions.assertEquals("mock transaction error",
Throwables.getRootCause(e).getMessage());
+ Assertions.assertEquals(0, epochStore.list().size());
+ }
+ }
- Assert.fail();
+ @Test
+ void testExecuteWithTransactionTimeout_RollBack() {
+
+ Epoch mockEpoch = getMockEpoch("test1", "owner1");
+ // before transaction
+ Assertions.assertEquals(0, epochStore.list().size());
+ try {
+ epochStore.executeWithTransaction(() -> {
+ // mock transaction timeout
+ await().pollDelay(1100, TimeUnit.MILLISECONDS).until(() ->
true);
+ epochStore.insertBatch(Lists.newArrayList(mockEpoch));
+ return null;
+ }, 1);
+ Assertions.fail();
} catch (RuntimeException e) {
- Assert.assertEquals(Throwables.getRootCause(e).getMessage(), "mock
transaction error");
- Assert.assertEquals(epochStore.list().size(), 0);
+ Throwable rootCause = Throwables.getRootCause(e);
+ Assertions.assertTrue(rootCause instanceof TransactionException);
+ Assertions.assertTrue(rootCause.getMessage().contains("Transaction
timed out"));
}
+ // rollback result
+ Assertions.assertEquals(0, epochStore.list().size());
+ }
+ @Test
+ void testExecuteWithTransactionTimeoutSuccess() {
+
+ Epoch mockEpoch = getMockEpoch("test1", "owner1");
+ Assertions.assertEquals(0, epochStore.list().size());
+ epochStore.executeWithTransaction(() -> {
+ await().pollDelay(1100, TimeUnit.MILLISECONDS).until(() -> true);
+ epochStore.insertBatch(Lists.newArrayList(mockEpoch));
+ Assertions.assertEquals(1, epochStore.list().size());
+ return null;
+ });
+ Assertions.assertEquals(1, epochStore.list().size());
}
}
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
index 1dff56cbcf..bb7f2c3d2f 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochManager.java
@@ -19,7 +19,6 @@
package org.apache.kylin.metadata.epoch;
import static org.apache.kylin.common.util.AddressUtil.MAINTAIN_MODE_MOCK_PORT;
-import static
org.apache.kylin.metadata.epoch.EpochUpdateLockManager.executeEpochWithLock;
import java.util.ArrayList;
import java.util.Collection;
@@ -46,9 +45,6 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.Singletons;
-import org.apache.kylin.common.util.NamedThreadFactory;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.common.persistence.metadata.Epoch;
import org.apache.kylin.common.persistence.metadata.EpochStore;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
@@ -58,7 +54,10 @@ import
org.apache.kylin.common.scheduler.ProjectControlledNotifier;
import org.apache.kylin.common.scheduler.ProjectEscapedNotifier;
import org.apache.kylin.common.scheduler.SourceUsageVerifyNotifier;
import org.apache.kylin.common.util.AddressUtil;
+import org.apache.kylin.common.util.NamedThreadFactory;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.resourcegroup.ResourceGroupManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,6 +96,7 @@ public class EpochManager {
private final String serverMode;
private final boolean epochCheckEnabled;
private final long epochExpiredTime;
+ private final int epochRenewTimeout;
@Getter
private final EpochUpdateManager epochUpdateManager;
@@ -113,8 +113,13 @@ public class EpochManager {
this.serverMode = config.getServerMode();
this.epochCheckEnabled = config.getEpochCheckerEnabled();
this.epochExpiredTime = config.getEpochExpireTimeSecond();
+ this.epochRenewTimeout = getEpochRenewTimeout();
this.epochUpdateManager = new EpochUpdateManager();
+ }
+ private int getEpochRenewTimeout() {
+ double timeoutRate = config.getEpochRenewTimeoutRate() <= 0 ? 1 :
config.getEpochRenewTimeoutRate();
+ return (int) (epochExpiredTime * timeoutRate);
}
public class EpochUpdateManager {
@@ -260,7 +265,7 @@ public class EpochManager {
});
try {
- if (!countDownLatch.await(epochExpiredTime, TimeUnit.SECONDS))
{
+ if (!countDownLatch.await(epochRenewTimeout,
TimeUnit.SECONDS)) {
logger.error("renew not finished,{}/{}...",
newRenewEpochSets.size(), oriEpochs.size());
}
} catch (InterruptedException e) {
@@ -449,7 +454,7 @@ public class EpochManager {
epochStore.updateBatch(needUpdateEpochList);
}
return null;
- });
+ }, epochRenewTimeout);
}
/**
diff --git
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
index 8e8d0ac2f3..164d952020 100644
---
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
+++
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochManagerTest.java
@@ -227,9 +227,7 @@ class EpochManagerTest {
void testUpdateProjectEpochWithResourceGroupEnabled() {
val manager = ResourceGroupManager.getInstance(getTestConfig());
manager.getResourceGroup();
- manager.updateResourceGroup(copyForWrite -> {
- copyForWrite.setResourceGroupEnabled(true);
- });
+ manager.updateResourceGroup(copyForWrite ->
copyForWrite.setResourceGroupEnabled(true));
EpochManager epochManager = EpochManager.getInstance();
val prjMgr = NProjectManager.getInstance(getTestConfig());
for (ProjectInstance prj : prjMgr.listAllProjects()) {
@@ -244,9 +242,7 @@ class EpochManagerTest {
@Test
void testGetEpochOwnerWithException() {
EpochManager epochManager = EpochManager.getInstance();
- Assertions.assertThrows(IllegalStateException.class, () -> {
- epochManager.getEpochOwner(null);
- });
+ Assertions.assertThrows(IllegalStateException.class, () ->
epochManager.getEpochOwner(null));
}
@Test
@@ -392,6 +388,55 @@ class EpochManagerTest {
}
+ @Test
+ void testEpochRenewTimeoutDefault() {
+ KylinConfig config = getTestConfig();
+ double epochRenewTimeoutRate = config.getEpochRenewTimeoutRate();
+ Assertions.assertEquals(0.8, epochRenewTimeoutRate);
+ EpochManager manager = EpochManager.getInstance();
+ Object epochExpiredTime = ReflectionTestUtils.getField(manager,
"epochExpiredTime");
+ Assertions.assertNotNull(epochExpiredTime);
+ Assertions.assertEquals(60, (long) epochExpiredTime);
+
+ Object epochRenewTimeout = ReflectionTestUtils.getField(manager,
"epochRenewTimeout");
+ Assertions.assertNotNull(epochRenewTimeout);
+ Assertions.assertEquals(60 * epochRenewTimeoutRate, (int)
epochRenewTimeout);
+ }
+
+ @Test
+ @OverwriteProp(key = "kylin.server.leader-race.heart-beat-timeout-rate",
value = "0.0")
+ void testEpochRenewTimeoutOverride1() {
+ KylinConfig config = getTestConfig();
+ double epochRenewTimeoutRate = config.getEpochRenewTimeoutRate();
+ Assertions.assertEquals(0.0, epochRenewTimeoutRate);
+
+ EpochManager manager = EpochManager.getInstance();
+ Object epochExpiredTime = ReflectionTestUtils.getField(manager,
"epochExpiredTime");
+ Assertions.assertNotNull(epochExpiredTime);
+ Assertions.assertEquals(60, (long) epochExpiredTime);
+
+ Object epochRenewTimeout = ReflectionTestUtils.getField(manager,
"epochRenewTimeout");
+ Assertions.assertNotNull(epochRenewTimeout);
+ Assertions.assertEquals(60, (int) epochRenewTimeout);
+ }
+
+ @Test
+ @OverwriteProp(key = "kylin.server.leader-race.heart-beat-timeout-rate",
value = "1.5")
+ void testEpochRenewTimeoutOverride2() {
+ KylinConfig config = getTestConfig();
+ double epochRenewTimeoutRate = config.getEpochRenewTimeoutRate();
+ Assertions.assertEquals(1.5, epochRenewTimeoutRate);
+
+ EpochManager manager = EpochManager.getInstance();
+ Object epochExpiredTime = ReflectionTestUtils.getField(manager,
"epochExpiredTime");
+ Assertions.assertNotNull(epochExpiredTime);
+ Assertions.assertEquals(60, (long) epochExpiredTime);
+
+ Object epochRenewTimeout = ReflectionTestUtils.getField(manager,
"epochRenewTimeout");
+ Assertions.assertNotNull(epochRenewTimeout);
+ Assertions.assertEquals(60 * epochRenewTimeoutRate, (int)
epochRenewTimeout);
+ }
+
EpochStore getEpochStore() {
try {
return EpochStore.getEpochStore(getTestConfig());