This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new cd339658bd KYLIN-5381 Avoid timeout when cleaning query history by
limiting the number of data deleted each time (#29355)
cd339658bd is described below
commit cd339658bd37822b720ced6526c3356d3938b068
Author: Wang Hui <[email protected]>
AuthorDate: Mon Oct 31 12:03:03 2022 +0800
KYLIN-5381 Avoid timeout when cleaning query history by limiting the number
of data deleted each time (#29355)
Co-authored-by: hui.wang <[email protected]>
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +
.../metadata/query/JdbcQueryHistoryStore.java | 101 +++++++++++++++------
.../kylin/metadata/query/QueryHistoryDAO.java | 8 +-
.../kylin/metadata/query/QueryHistoryMapper.java | 6 ++
.../metadata/query/QueryHistoryProjectInfo.java | 20 ++++
.../kylin/metadata/query/RDBMSQueryHistoryDAO.java | 78 +++++++++++-----
.../metadata/query/util/QueryHisStoreUtil.java | 39 +++++---
.../metadata/query/RDBMSQueryHistoryDaoTest.java | 72 +++++++++++++--
8 files changed, 253 insertions(+), 75 deletions(-)
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 9c4601705f..c917c96e7c 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2801,6 +2801,10 @@ public abstract class KylinConfigBase implements
Serializable {
return
Integer.parseInt(getOptional("kylin.query.queryhistory.project-max-size",
"1000000"));
}
+ public int getQueryHistorySingleDeletionSize() {
+ return
Integer.parseInt(getOptional("kylin.query.queryhistory.single-deletion-size",
"2000"));
+ }
+
public long getQueryHistorySurvivalThreshold() {
return
TimeUtil.timeStringAs(getOptional("kylin.query.queryhistory.survival-time-threshold",
"30d"),
TimeUnit.MILLISECONDS);
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java
index a4b3194f34..250c704991 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/JdbcQueryHistoryStore.java
@@ -25,6 +25,7 @@ import static
org.mybatis.dynamic.sql.SqlBuilder.isGreaterThan;
import static org.mybatis.dynamic.sql.SqlBuilder.isGreaterThanOrEqualTo;
import static org.mybatis.dynamic.sql.SqlBuilder.isIn;
import static org.mybatis.dynamic.sql.SqlBuilder.isLessThan;
+import static org.mybatis.dynamic.sql.SqlBuilder.isLessThanOrEqualTo;
import static org.mybatis.dynamic.sql.SqlBuilder.isLike;
import static org.mybatis.dynamic.sql.SqlBuilder.isLikeCaseInsensitive;
import static org.mybatis.dynamic.sql.SqlBuilder.isNotEqualTo;
@@ -40,8 +41,10 @@ import java.io.PrintWriter;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -67,6 +70,7 @@ import org.mybatis.dynamic.sql.render.RenderingStrategies;
import org.mybatis.dynamic.sql.select.QueryExpressionDSL;
import org.mybatis.dynamic.sql.select.SelectModel;
import org.mybatis.dynamic.sql.select.join.EqualTo;
+import org.mybatis.dynamic.sql.select.aggregate.Count;
import org.mybatis.dynamic.sql.select.render.SelectStatementProvider;
import org.mybatis.dynamic.sql.update.render.UpdateStatementProvider;
@@ -198,24 +202,28 @@ public class JdbcQueryHistoryStore {
try (SqlSession session = sqlSessionFactory.openSession()) {
QueryHistoryMapper mapper =
session.getMapper(QueryHistoryMapper.class);
SelectStatementProvider statementProvider =
selectDistinct(queryHistoryRealizationTable.queryId)
-
.from(queryHistoryRealizationTable).where(queryHistoryRealizationTable.model,
isIn(modelIds))
+ .from(queryHistoryRealizationTable) //
+ .where(queryHistoryRealizationTable.model, isIn(modelIds))
//
.build().render(RenderingStrategies.MYBATIS3);
- return
mapper.selectMany(statementProvider).stream().map(QueryHistory::getQueryId).collect(Collectors.toList());
+ return
mapper.selectMany(statementProvider).stream().map(QueryHistory::getQueryId)
+ .collect(Collectors.toList());
}
}
public List<QueryStatistics>
queryQueryHistoriesModelIds(QueryHistoryRequest request, int size) {
try (SqlSession session = sqlSessionFactory.openSession()) {
QueryStatisticsMapper mapper =
session.getMapper(QueryStatisticsMapper.class);
- SelectStatementProvider statementProvider1 =
selectDistinct(queryHistoryTable.engineType).from(queryHistoryTable)
- .where(queryHistoryTable.engineType,
isNotEqualTo("NATIVE"))
- .and(queryHistoryTable.projectName,
isEqualTo(request.getProject()))
+ SelectStatementProvider statementProvider1 =
selectDistinct(queryHistoryTable.engineType)
+ .from(queryHistoryTable) //
+ .where(queryHistoryTable.engineType,
isNotEqualTo("NATIVE")) //
+ .and(queryHistoryTable.projectName,
isEqualTo(request.getProject())) //
.build().render(RenderingStrategies.MYBATIS3);
List<QueryStatistics> engineTypes =
mapper.selectMany(statementProvider1);
- SelectStatementProvider statementProvider2 =
selectDistinct(queryHistoryRealizationTable.model).from(queryHistoryRealizationTable)
- .where(queryHistoryRealizationTable.projectName,
isEqualTo(request.getProject()))
- .limit(size)
+ SelectStatementProvider statementProvider2 =
selectDistinct(queryHistoryRealizationTable.model)
+ .from(queryHistoryRealizationTable) //
+ .where(queryHistoryRealizationTable.projectName,
isEqualTo(request.getProject())) //
+ .limit(size) //
.build().render(RenderingStrategies.MYBATIS3);
List<QueryStatistics> modelIds =
mapper.selectMany(statementProvider2);
engineTypes.addAll(modelIds);
@@ -223,33 +231,70 @@ public class JdbcQueryHistoryStore {
}
}
- public QueryHistory queryOldestQueryHistory(long maxSize) {
+ public QueryHistory getOldestQueryHistory(long index) {
try (SqlSession session = sqlSessionFactory.openSession()) {
QueryHistoryMapper mapper =
session.getMapper(QueryHistoryMapper.class);
SelectStatementProvider statementProvider =
select(getSelectFields(queryHistoryTable))
.from(queryHistoryTable) //
- .orderBy(queryHistoryTable.id.descending()) //
+ .orderBy(queryHistoryTable.id) //
.limit(1) //
- .offset(maxSize - 1) //
+ .offset(index - 1L) //
.build().render(RenderingStrategies.MYBATIS3);
return mapper.selectOne(statementProvider);
}
}
- public QueryHistory queryOldestQueryHistory(long maxSize, String project) {
+ public QueryHistory getOldestQueryHistory(String project, long index) {
try (SqlSession session = sqlSessionFactory.openSession()) {
QueryHistoryMapper mapper =
session.getMapper(QueryHistoryMapper.class);
- SelectStatementProvider statementProvider =
select(getSelectFields(queryHistoryTable)) //
+ SelectStatementProvider statementProvider =
select(getSelectFields(queryHistoryTable))
.from(queryHistoryTable) //
.where(queryHistoryTable.projectName, isEqualTo(project))
//
- .orderBy(queryHistoryTable.id.descending()) //
+ .orderBy(queryHistoryTable.id) //
.limit(1) //
- .offset(maxSize - 1) //
+ .offset(index - 1L) //
.build().render(RenderingStrategies.MYBATIS3);
return mapper.selectOne(statementProvider);
}
}
+ public Long getCountOnQueryHistory() {
+ try (SqlSession session = sqlSessionFactory.openSession()) {
+ QueryHistoryMapper mapper =
session.getMapper(QueryHistoryMapper.class);
+ SelectStatementProvider statementProvider =
select(Count.of(queryHistoryTable.id)) //
+ .from(queryHistoryTable) //
+ .build().render(RenderingStrategies.MYBATIS3);
+ return mapper.selectAsLong(statementProvider);
+ }
+ }
+
+ public Long getCountOnQueryHistory(long retainTime) {
+ try (SqlSession session = sqlSessionFactory.openSession()) {
+ QueryHistoryMapper mapper =
session.getMapper(QueryHistoryMapper.class);
+ SelectStatementProvider statementProvider =
select(Count.of(queryHistoryTable.id).as(COUNT)) //
+ .from(queryHistoryTable) //
+ .where(queryHistoryTable.queryTime,
isLessThan(retainTime)) //
+ .build().render(RenderingStrategies.MYBATIS3);
+ return mapper.selectAsLong(statementProvider);
+ }
+ }
+
+ public Map<String, Long> getCountGroupByProject() {
+ Map<String, Long> projectCounts = new HashMap<>();
+ List<QueryHistoryProjectInfo> projectInfos;
+ try (SqlSession session = sqlSessionFactory.openSession()) {
+ QueryHistoryMapper mapper =
session.getMapper(QueryHistoryMapper.class);
+ SelectStatementProvider statementProvider =
select(queryHistoryTable.projectName,
+ count(queryHistoryTable.id).as(COUNT)) //
+ .from(queryHistoryTable) //
+ .groupBy(queryHistoryTable.projectName) //
+ .build().render(RenderingStrategies.MYBATIS3);
+ projectInfos = mapper.selectByProject(statementProvider);
+ }
+ projectInfos.forEach(projectInfo ->
projectCounts.put(projectInfo.getProjectName(), projectInfo.getCount()));
+ return projectCounts;
+ }
+
public QueryHistory queryByQueryId(String queryId) {
try (SqlSession session = sqlSessionFactory.openSession()) {
QueryHistoryMapper mapper =
session.getMapper(QueryHistoryMapper.class);
@@ -398,27 +443,28 @@ public class JdbcQueryHistoryStore {
}
}
- public void deleteQueryHistory(long queryTime) {
+ public int deleteQueryHistory(long id) {
long startTime = System.currentTimeMillis();
try (SqlSession session = sqlSessionFactory.openSession()) {
QueryHistoryMapper mapper =
session.getMapper(QueryHistoryMapper.class);
DeleteStatementProvider deleteStatement =
SqlBuilder.deleteFrom(queryHistoryTable) //
- .where(queryHistoryTable.queryTime, isLessThan(queryTime))
//
+ .where(queryHistoryTable.id, isLessThanOrEqualTo(id)) //
.build().render(RenderingStrategies.MYBATIS3);
int deleteRows = mapper.delete(deleteStatement);
session.commit();
if (deleteRows > 0) {
log.info("Delete {} row query history takes {} ms",
deleteRows, System.currentTimeMillis() - startTime);
}
+ return deleteRows;
}
}
- public void deleteQueryHistory(long queryTime, String project) {
+ public int deleteQueryHistory(String project, long id) {
long startTime = System.currentTimeMillis();
try (SqlSession session = sqlSessionFactory.openSession()) {
QueryHistoryMapper mapper =
session.getMapper(QueryHistoryMapper.class);
DeleteStatementProvider deleteStatement =
SqlBuilder.deleteFrom(queryHistoryTable) //
- .where(queryHistoryTable.queryTime, isLessThan(queryTime))
//
+ .where(queryHistoryTable.id, isLessThanOrEqualTo(id)) //
.and(queryHistoryTable.projectName, isEqualTo(project)) //
.build().render(RenderingStrategies.MYBATIS3);
int deleteRows = mapper.delete(deleteStatement);
@@ -427,6 +473,7 @@ public class JdbcQueryHistoryStore {
log.info("Delete {} row query history for project [{}] takes
{} ms", deleteRows, project,
System.currentTimeMillis() - startTime);
}
+ return deleteRows;
}
}
@@ -463,7 +510,7 @@ public class JdbcQueryHistoryStore {
}
}
- public void deleteQueryHistoryRealization(long queryTime, String project) {
+ public void deleteQueryHistoryRealization(String project, long queryTime) {
long startTime = System.currentTimeMillis();
try (SqlSession session = sqlSessionFactory.openSession()) {
QueryHistoryMapper mapper =
session.getMapper(QueryHistoryMapper.class);
@@ -631,7 +678,8 @@ public class JdbcQueryHistoryStore {
if (request.isSubmitterExactlyMatch()) {
filterSql = filterSql.and(queryHistoryTable.querySubmitter,
isIn(request.getFilterSubmitter()));
} else if (request.getFilterSubmitter().size() == 1) {
- filterSql = filterSql.and(queryHistoryTable.querySubmitter,
isLikeCaseInsensitive("%" + request.getFilterSubmitter().get(0) + "%"));
+ filterSql = filterSql.and(queryHistoryTable.querySubmitter,
+ isLikeCaseInsensitive("%" +
request.getFilterSubmitter().get(0) + "%"));
}
}
@@ -655,12 +703,14 @@ public class JdbcQueryHistoryStore {
}
} else if (selectAllModels) {
// Process CONSTANTS, HIVE, RDBMS and all model
- filterSql = filterSql.and(queryHistoryTable.engineType,
isIn(realizations), or(queryHistoryTable.indexHit, isEqualTo(true)));
+ filterSql = filterSql.and(queryHistoryTable.engineType,
isIn(realizations),
+ or(queryHistoryTable.indexHit, isEqualTo(true)));
} else if (request.getFilterModelIds() != null &&
!request.getFilterModelIds().isEmpty()) {
// Process CONSTANTS, HIVE, RDBMS and model1, model2, model3...
- filterSql = filterSql.and(queryHistoryTable.engineType,
isIn(realizations), or(queryHistoryTable.queryId,
-
isIn(selectDistinct(queryHistoryRealizationTable.queryId).from(queryHistoryRealizationTable)
- .where(queryHistoryRealizationTable.model,
isIn(request.getFilterModelIds())))));
+ filterSql = filterSql.and(queryHistoryTable.engineType,
isIn(realizations),
+ or(queryHistoryTable.queryId,
+
isIn(selectDistinct(queryHistoryRealizationTable.queryId).from(queryHistoryRealizationTable)
+ .where(queryHistoryRealizationTable.model,
isIn(request.getFilterModelIds())))));
} else {
// Process CONSTANTS, HIVE, RDBMS
filterSql = filterSql.and(queryHistoryTable.engineType,
isIn(realizations));
@@ -762,5 +812,4 @@ public class JdbcQueryHistoryStore {
queryHistoryTable.queryTime, queryHistoryTable.resultRowCount,
queryHistoryTable.sql,
queryHistoryTable.sqlPattern,
queryHistoryTable.totalScanBytes, queryHistoryTable.totalScanCount);
}
-
}
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
index cd91f42287..d954368f35 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryDAO.java
@@ -19,6 +19,7 @@
package org.apache.kylin.metadata.query;
import java.util.List;
+import java.util.Map;
public interface QueryHistoryDAO {
@@ -44,10 +45,10 @@ public interface QueryHistoryDAO {
void deleteQueryHistoriesIfMaxSizeReached();
- void deleteQueryHistoriesIfProjectMaxSizeReached(String project);
-
void deleteQueryHistoriesIfRetainTimeReached();
+ void deleteOldestQueryHistoriesByProject(String project, int deleteCount);
+
long getQueryHistoriesSize(QueryHistoryRequest request, String project);
QueryHistory getByQueryId(String queryId);
@@ -59,4 +60,7 @@ public interface QueryHistoryDAO {
String getRealizationMetricMeasurement();
List<QueryDailyStatistic> getQueryDailyStatistic(long startTime, long
endTime);
+
+ Map<String, Long> getQueryCountByProject();
+
}
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryMapper.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryMapper.java
index ed39f0a9c6..e5a24f4678 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryMapper.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryMapper.java
@@ -69,6 +69,12 @@ public interface QueryHistoryMapper {
@Result(column = "reserved_field_3", property =
"queryHistoryInfo", jdbcType = JdbcType.BLOB, typeHandler =
QueryHistoryTable.QueryHistoryInfoHandler.class) })
List<QueryHistory> selectMany(SelectStatementProvider selectStatement);
+ @SelectProvider(type = SqlProviderAdapter.class, method = "select")
+ @Results(id = "QueryHistoryProjectInfoResult", value = {
+ @Result(column = "project_name", property = "projectName",
jdbcType = JdbcType.VARCHAR),
+ @Result(column = "count", property = "count", jdbcType =
JdbcType.BIGINT) })
+ List<QueryHistoryProjectInfo> selectByProject(SelectStatementProvider
selectStatement);
+
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
@ResultMap("QueryHistoryResult")
QueryHistory selectOne(SelectStatementProvider selectStatement);
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryProjectInfo.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryProjectInfo.java
new file mode 100644
index 0000000000..2324979f71
--- /dev/null
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistoryProjectInfo.java
@@ -0,0 +1,20 @@
+package org.apache.kylin.metadata.query;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+@Getter
+@Setter
+@Slf4j
+public class QueryHistoryProjectInfo {
+
+ public static final String PROJECT_NAME = "project_name";
+
+ @JsonProperty(PROJECT_NAME)
+ private String projectName;
+
+ private long count;
+
+}
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
index 73124496de..14fdf0c4a1 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDAO.java
@@ -23,18 +23,19 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
+import java.util.function.IntFunction;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.Singletons;
import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,8 +46,8 @@ public class RDBMSQueryHistoryDAO implements QueryHistoryDAO {
private static final Logger logger =
LoggerFactory.getLogger(RDBMSQueryHistoryDAO.class);
@Setter
private String queryMetricMeasurement;
- private String realizationMetricMeasurement;
- private JdbcQueryHistoryStore jdbcQueryHisStore;
+ private final String realizationMetricMeasurement;
+ private final JdbcQueryHistoryStore jdbcQueryHisStore;
public static final String WEEK = "week";
public static final String DAY = "day";
@@ -104,34 +105,43 @@ public class RDBMSQueryHistoryDAO implements
QueryHistoryDAO {
jdbcQueryHisStore.deleteQueryHistoryRealization(project);
}
- public void deleteQueryHistoriesIfMaxSizeReached() {
- QueryHistory queryHistory = jdbcQueryHisStore
-
.queryOldestQueryHistory(KylinConfig.getInstanceFromEnv().getQueryHistoryMaxSize());
- if (Objects.nonNull(queryHistory)) {
- long time = queryHistory.getQueryTime();
- jdbcQueryHisStore.deleteQueryHistory(time);
- jdbcQueryHisStore.deleteQueryHistoryRealization(time);
- }
- }
-
public QueryHistory getByQueryId(String queryId) {
return jdbcQueryHisStore.queryByQueryId(queryId);
}
- public void deleteQueryHistoriesIfProjectMaxSizeReached(String project) {
- QueryHistory queryHistory = jdbcQueryHisStore
-
.queryOldestQueryHistory(KylinConfig.getInstanceFromEnv().getQueryHistoryProjectMaxSize(),
project);
- if (Objects.nonNull(queryHistory)) {
- long time = queryHistory.getQueryTime();
- jdbcQueryHisStore.deleteQueryHistory(time, project);
- jdbcQueryHisStore.deleteQueryHistoryRealization(time, project);
+ public void deleteQueryHistoriesIfMaxSizeReached() {
+ long maxSize =
KylinConfig.getInstanceFromEnv().getQueryHistoryMaxSize();
+ long totalCount = jdbcQueryHisStore.getCountOnQueryHistory();
+ if (totalCount > maxSize) {
+ deleteQueryHistoryAndRealization((int) (totalCount - maxSize));
}
}
public void deleteQueryHistoriesIfRetainTimeReached() {
- long retainTime = getRetainTime();
- jdbcQueryHisStore.deleteQueryHistory(retainTime);
- jdbcQueryHisStore.deleteQueryHistoryRealization(retainTime);
+ long rangeOutCount =
jdbcQueryHisStore.getCountOnQueryHistory(getRetainTime());
+ if (rangeOutCount > 0) {
+ deleteQueryHistoryAndRealization((int) rangeOutCount);
+ }
+ }
+
+ public void deleteQueryHistoryAndRealization(int deleteCount) {
+ int singleLimit =
KylinConfig.getInstanceFromEnv().getQueryHistorySingleDeletionSize();
+ largeSplitToSmallTask(deleteCount, singleLimit, currentCount -> {
+ QueryHistory queryHistory =
jdbcQueryHisStore.getOldestQueryHistory(currentCount);
+ int deletedRows =
jdbcQueryHisStore.deleteQueryHistory(queryHistory.getId());
+
jdbcQueryHisStore.deleteQueryHistoryRealization(queryHistory.getQueryTime());
+ return deletedRows;
+ }, "Cleanup all query history");
+ }
+
+ public void deleteOldestQueryHistoriesByProject(String project, int
deleteCount) {
+ int singleLimit =
KylinConfig.getInstanceFromEnv().getQueryHistorySingleDeletionSize();
+ largeSplitToSmallTask(deleteCount, singleLimit, currentCount -> {
+ QueryHistory queryHistory =
jdbcQueryHisStore.getOldestQueryHistory(project, currentCount);
+ int deletedRows = jdbcQueryHisStore.deleteQueryHistory(project,
queryHistory.getId());
+ jdbcQueryHisStore.deleteQueryHistoryRealization(project,
queryHistory.getQueryTime());
+ return deletedRows;
+ }, "Cleanup project<" + project + "> query history");
}
public void batchUpdateQueryHistoriesInfo(List<Pair<Long,
QueryHistoryInfo>> idToQHInfoList) {
@@ -215,6 +225,11 @@ public class RDBMSQueryHistoryDAO implements
QueryHistoryDAO {
return jdbcQueryHisStore.queryAvgDurationByTime(startTime, endTime,
timeDimension, project);
}
+ @Override
+ public Map<String, Long> getQueryCountByProject() {
+ return jdbcQueryHisStore.getCountGroupByProject();
+ }
+
public static void fillZeroForQueryStatistics(List<QueryStatistics>
queryStatistics, long startTime, long endTime,
String dimension) {
if (!dimension.equalsIgnoreCase(DAY) &&
!dimension.equalsIgnoreCase(WEEK)) {
@@ -245,4 +260,19 @@ public class RDBMSQueryHistoryDAO implements
QueryHistoryDAO {
}
}
}
+
+ public static void largeSplitToSmallTask(int totalCount, int singleSize,
IntFunction<Integer> function,
+ String description) {
+ int retainCount = totalCount;
+ while (retainCount > 0) {
+ int currentCount = Math.min(retainCount, singleSize);
+ int actualCount = function.apply(currentCount);
+ if (currentCount != actualCount && logger.isWarnEnabled()) {
+ logger.warn("The task {} was not performed as expected,
expect:{}, actual:{}", description,
+ currentCount, actualCount);
+ }
+ retainCount -= currentCount;
+ }
+ }
+
}
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java
index 1c0b8c779e..f6589ed5d0 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/util/QueryHisStoreUtil.java
@@ -27,11 +27,13 @@ import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Locale;
+import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;
import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.lang3.time.StopWatch;
import org.apache.ibatis.jdbc.ScriptRunner;
import org.apache.ibatis.mapping.Environment;
import org.apache.ibatis.session.Configuration;
@@ -45,7 +47,6 @@ import org.apache.kylin.common.Singletons;
import org.apache.kylin.common.logging.LogOutputStream;
import org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil;
import org.apache.kylin.common.util.SetThreadName;
-import org.apache.kylin.metadata.epoch.EpochManager;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.query.QueryHistoryDAO;
@@ -165,28 +166,40 @@ public class QueryHisStoreUtil {
try (SetThreadName ignored = new
SetThreadName("QueryHistoryCleanWorker")) {
val config = KylinConfig.getInstanceFromEnv();
val projectManager = NProjectManager.getInstance(config);
+
getQueryHistoryDao().deleteQueryHistoriesIfMaxSizeReached();
getQueryHistoryDao().deleteQueryHistoriesIfRetainTimeReached();
+
+ Map<String, Long> projectCounts =
getQueryHistoryDao().getQueryCountByProject();
for (ProjectInstance project : projectManager.listAllProjects()) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Thread is interrupted: " +
Thread.currentThread().getName());
}
- if
(!EpochManager.getInstance().checkEpochOwner(project.getName()))
- continue;
- try {
- long startTime = System.currentTimeMillis();
- log.info("Start to delete query histories that are beyond
max size for project<{}>",
- project.getName());
-
getQueryHistoryDao().deleteQueryHistoriesIfProjectMaxSizeReached(project.getName());
- log.info("Query histories cleanup for project<{}>
finished, it took {}ms", project.getName(),
- System.currentTimeMillis() - startTime);
- } catch (Exception e) {
- log.error("clean query histories<" + project.getName() +
"> failed", e);
- }
+ long projectCount =
projectCounts.getOrDefault(project.getName(), 0L);
+ cleanQueryHistory(project.getName(), projectCount);
}
}
}
+ public static void cleanQueryHistory(String projectName, long
historyCount) {
+ long projectMaxSize =
KylinConfig.getInstanceFromEnv().getQueryHistoryProjectMaxSize();
+ if (historyCount <= projectMaxSize) {
+ log.info("Query histories of project<{}> is less than the maximum
limit, so skip it.", projectName);
+ return;
+ }
+ try {
+ StopWatch watch = StopWatch.createStarted();
+ log.info("Start to delete query histories that are beyond max size
for project<{}>, records:{}",
+ projectName, historyCount);
+
getQueryHistoryDao().deleteOldestQueryHistoriesByProject(projectName,
+ (int) (historyCount - projectMaxSize));
+ watch.stop();
+ log.info("Query histories cleanup for project<{}> finished, it
took {}ms", projectName, watch.getTime());
+ } catch (Exception e) {
+ log.error("Clean query histories for project<{}> failed",
projectName, e);
+ }
+ }
+
private static QueryHistoryDAO getQueryHistoryDao() {
return RDBMSQueryHistoryDAO.getInstance();
}
diff --git
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
index 18443017dc..e6ea6c176c 100644
---
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
+++
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/query/RDBMSQueryHistoryDaoTest.java
@@ -19,13 +19,16 @@
package org.apache.kylin.metadata.query;
import static
org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO.fillZeroForQueryStatistics;
+import static
org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO.largeSplitToSmallTask;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.TimeUtil;
import org.apache.kylin.junit.TimeZoneTestRunner;
+import org.apache.kylin.metadata.query.util.QueryHisStoreUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -71,7 +74,7 @@ public class RDBMSQueryHistoryDaoTest extends
NLocalFileMetadataTestCase {
}
@Test
- public void testGetQueryHistoriesfilterByIsIndexHit() throws Exception {
+ public void testGetQueryHistoriesFilterByIsIndexHit() throws Exception {
queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, true,
PROJECT, true));
queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, false,
PROJECT, true));
queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, false,
PROJECT, true));
@@ -102,7 +105,7 @@ public class RDBMSQueryHistoryDaoTest extends
NLocalFileMetadataTestCase {
}
@Test
- public void testGetQueryHistoriesfilterByQueryTime() throws Exception {
+ public void testGetQueryHistoriesFilterByQueryTime() throws Exception {
// 2020-01-29 23:25:12
queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, true,
PROJECT, true));
// 2020-01-30 23:25:12
@@ -123,7 +126,7 @@ public class RDBMSQueryHistoryDaoTest extends
NLocalFileMetadataTestCase {
}
@Test
- public void testGetQueryHistoriesfilterByDuration() throws Exception {
+ public void testGetQueryHistoriesFilterByDuration() throws Exception {
queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1000L, true,
PROJECT, true));
queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 2000L,
false, PROJECT, true));
queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 3000L,
false, PROJECT, true));
@@ -145,7 +148,7 @@ public class RDBMSQueryHistoryDaoTest extends
NLocalFileMetadataTestCase {
}
@Test
- public void testGetQueryHistoriesfilterBySql() throws Exception {
+ public void testGetQueryHistoriesFilterBySql() throws Exception {
QueryMetrics queryMetrics1 = createQueryMetrics(1580311512000L, 1L,
true, PROJECT, true);
queryMetrics1.setSql("select 2 LIMIT 500\n");
queryHistoryDAO.insert(queryMetrics1);
@@ -333,6 +336,38 @@ public class RDBMSQueryHistoryDaoTest extends
NLocalFileMetadataTestCase {
Assert.assertEquals(2, monthQueryStatistics.get(0).getMeanDuration(),
0.1);
}
+ @Test
+ public void testDeleteQueryHistories() throws Exception {
+ overwriteSystemProp("kylin.query.queryhistory.max-size", "2");
+ overwriteSystemProp("kylin.query.queryhistory.project-max-size", "5");
+
+ String PROJECT_V1 = PROJECT + "_v1";
+
+ // 2020-01-29 23:25:12
+ queryHistoryDAO.insert(createQueryMetrics(1580311512000L, 1L, true,
PROJECT, true));
+ // 2020-01-30 23:25:12
+ queryHistoryDAO.insert(createQueryMetrics(1580397912000L, 2L, false,
PROJECT, true));
+ // 2030-01-28 23:25:12
+ queryHistoryDAO.insert(createQueryMetrics(1895844312000L, 3L, false,
PROJECT_V1, true));
+ // 2030-01-29 23:25:12
+ queryHistoryDAO.insert(createQueryMetrics(1895930712000L, 1L, false,
PROJECT, true));
+
+ // before delete
+ List<QueryHistory> queryHistoryList =
queryHistoryDAO.queryQueryHistoriesByIdOffset(0, 100, PROJECT);
+ Assert.assertEquals(3, queryHistoryList.size());
+
+ // after delete
+ QueryHisStoreUtil.cleanQueryHistory();
+
+ queryHistoryList = queryHistoryDAO.queryQueryHistoriesByIdOffset(0,
100, PROJECT_V1);
+ Assert.assertEquals(1, queryHistoryList.size());
+ Assert.assertEquals(1895844312000L,
queryHistoryList.get(0).getQueryTime());
+
+ queryHistoryList = queryHistoryDAO.queryQueryHistoriesByIdOffset(0,
100, PROJECT);
+ Assert.assertEquals(1, queryHistoryList.size());
+ Assert.assertEquals(1895930712000L,
queryHistoryList.get(0).getQueryTime());
+ }
+
@Test
public void testDeleteQueryHistoriesIfRetainTimeReached() throws Exception
{
// 2020-01-29 23:25:12
@@ -401,12 +436,12 @@ public class RDBMSQueryHistoryDaoTest extends
NLocalFileMetadataTestCase {
Assert.assertEquals(4, queryHistoryList.size());
// after delete
- queryHistoryDAO.deleteQueryHistoriesIfProjectMaxSizeReached(PROJECT);
+ QueryHisStoreUtil.cleanQueryHistory(PROJECT, 4);
queryHistoryList = queryHistoryDAO.getAllQueryHistories();
Assert.assertEquals(2, queryHistoryList.size());
// test delete empty
- queryHistoryDAO.deleteQueryHistoriesIfProjectMaxSizeReached(PROJECT);
+ QueryHisStoreUtil.cleanQueryHistory(PROJECT, 2);
queryHistoryList = queryHistoryDAO.getAllQueryHistories();
Assert.assertEquals(2, queryHistoryList.size());
}
@@ -670,12 +705,12 @@ public class RDBMSQueryHistoryDaoTest extends
NLocalFileMetadataTestCase {
Assert.assertEquals(2, queryHistoryList.size());
- Assert.assertEquals(false,
queryHistoryList.get(0).getQueryHistoryInfo().isExactlyMatch());
+
Assert.assertFalse(queryHistoryList.get(0).getQueryHistoryInfo().isExactlyMatch());
Assert.assertEquals(5,
queryHistoryList.get(0).getQueryHistoryInfo().getScanSegmentNum());
Assert.assertEquals("PENDING",
queryHistoryList.get(0).getQueryHistoryInfo().getState().toString());
- Assert.assertEquals(false,
queryHistoryList.get(0).getQueryHistoryInfo().isExecutionError());
+
Assert.assertFalse(queryHistoryList.get(0).getQueryHistoryInfo().isExecutionError());
- Assert.assertEquals(true,
queryHistoryList.get(1).getQueryHistoryInfo().isExactlyMatch());
+
Assert.assertTrue(queryHistoryList.get(1).getQueryHistoryInfo().isExactlyMatch());
Assert.assertEquals(3,
queryHistoryList.get(1).getQueryHistoryInfo().getScanSegmentNum());
Assert.assertEquals("PENDING",
queryHistoryList.get(1).getQueryHistoryInfo().getState().toString());
Assert.assertTrue(queryHistoryList.get(1).getQueryHistoryInfo().isExecutionError());
@@ -721,6 +756,23 @@ public class RDBMSQueryHistoryDaoTest extends
NLocalFileMetadataTestCase {
Assert.assertEquals(2L, queryDailyStatistic.get(0).getLt3sNum());
}
+ @Test
+ public void testLargeSplitToSmallTask() {
+ AtomicInteger executions = new AtomicInteger(0);
+ AtomicInteger actualSize = new AtomicInteger(0);
+ largeSplitToSmallTask(105, 10, currentCount -> {
+ executions.incrementAndGet();
+ actualSize.addAndGet(currentCount);
+ if (currentCount < 10) {
+ return currentCount - 1;
+ } else {
+ return currentCount;
+ }
+ }, "Test LargeSplitToSmall Task");
+ Assert.assertEquals(105, actualSize.get());
+ Assert.assertEquals(11, executions.get());
+ }
+
public static QueryMetrics createQueryMetrics(long queryTime, long
duration, boolean indexHit, String project,
boolean hitModel) {
QueryMetrics queryMetrics = new
QueryMetrics("6a9a151f-f992-4d52-a8ec-8ff3fd3de6b1", "192.168.1.6:7070");
@@ -753,7 +805,7 @@ public class RDBMSQueryHistoryDaoTest extends
NLocalFileMetadataTestCase {
realizationMetrics.setModelId("82fa7671-a935-45f5-8779-85703601f49a.json");
realizationMetrics.setSnapshots(
- Lists.newArrayList(new String[] {
"DEFAULT.TEST_KYLIN_ACCOUNT", "DEFAULT.TEST_COUNTRY" }));
+ Lists.newArrayList("DEFAULT.TEST_KYLIN_ACCOUNT",
"DEFAULT.TEST_COUNTRY"));
List<QueryMetrics.RealizationMetrics> realizationMetricsList =
Lists.newArrayList();
realizationMetricsList.add(realizationMetrics);