This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 2e29cffe56 KYLIN-5367 fix updata layoutHits error in fusion model
2e29cffe56 is described below
commit 2e29cffe56bf8a9511cc86fabe6bcea6b8d8a260
Author: binbin.zheng <[email protected]>
AuthorDate: Tue Oct 25 22:26:21 2022 +0800
KYLIN-5367 fix updata layoutHits error in fusion model
---
.../service/task/QueryHistoryTaskScheduler.java | 30 +++--
.../task/QueryHistoryTaskSchedulerTest.java | 122 +++++++++++++++++++++
.../kylin/metadata/cube/model/NDataflow.java | 5 +
.../apache/kylin/metadata/model/FusionModel.java | 2 +-
.../kylin/metadata/model/FusionModelManager.java | 15 +++
.../apache/kylin/metadata/query/QueryHistory.java | 12 +-
.../metadata/cube/model/NDataflowManagerTest.java | 20 ++++
.../metadata/streaming/FusionModelManagerTest.java | 24 ++++
.../kylin/rest/service/AsyncTaskService.java | 2 +-
.../kylin/rest/service/QueryCacheManager.java | 2 +-
.../kylin/rest/service/QueryHistoryService.java | 21 ++--
.../apache/kylin/rest/service/QueryService.java | 7 ++
.../rest/service/QueryHistoryServiceTest.java | 13 ++-
.../kylin/rest/service/QueryServiceTest.java | 87 ++++++++++++---
14 files changed, 318 insertions(+), 44 deletions(-)
diff --git
a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
index 1d3664e4ec..eae1ed351b 100644
---
a/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
+++
b/src/common-service/src/main/java/org/apache/kylin/rest/service/task/QueryHistoryTaskScheduler.java
@@ -28,8 +28,10 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ExecutorServiceUtil;
import org.apache.kylin.common.util.NamedThreadFactory;
@@ -224,20 +226,19 @@ public class QueryHistoryTaskScheduler {
}
private Map<String, DataflowHitCount>
collectDataflowHitCount(List<QueryHistory> queryHistories) {
- val dfManager =
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
val result = Maps.<String, DataflowHitCount> newHashMap();
for (QueryHistory queryHistory : queryHistories) {
- val realizations = queryHistory.transformRealizations();
+ val realizations = queryHistory.transformRealizations(project);
if (CollectionUtils.isEmpty(realizations)) {
continue;
}
- for (val realization : realizations) {
- if (dfManager.getDataflow(realization.getModelId()) ==
null || realization.getLayoutId() == null) {
- continue;
- }
- result.computeIfAbsent(realization.getModelId(), k -> new
DataflowHitCount());
- result.get(realization.getModelId()).dataflowHit += 1;
- val layoutHits =
result.get(realization.getModelId()).getLayoutHits();
+ val realizationList =
realizations.stream().filter(this::isValidRealization)
+ .collect(Collectors.toList());
+ for (val realization : realizationList) {
+ String modelId = realization.getModelId();
+ result.computeIfAbsent(modelId, k -> new
DataflowHitCount());
+ result.get(modelId).dataflowHit += 1;
+ val layoutHits = result.get(modelId).getLayoutHits();
layoutHits.computeIfAbsent(realization.getLayoutId(), k ->
new FrequencyMap());
layoutHits.get(realization.getLayoutId()).incFrequency(queryHistory.getQueryTime());
}
@@ -245,6 +246,12 @@ public class QueryHistoryTaskScheduler {
return result;
}
+ private boolean isValidRealization(NativeQueryRealization realization)
{
+ val config = KylinConfig.getInstanceFromEnv();
+ val dfManager = NDataflowManager.getInstance(config, project);
+ return dfManager.getDataflow(realization.getModelId()) != null &&
realization.getLayoutId() != null;
+ }
+
private Map<TableExtDesc, Integer>
collectSnapshotHitCount(List<QueryHistory> queryHistories) {
val tableManager =
NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
val results = Maps.<TableExtDesc, Integer> newHashMap();
@@ -263,10 +270,13 @@ public class QueryHistoryTaskScheduler {
}
private void collectModelLastQueryTime(QueryHistory queryHistory,
Map<String, Long> modelsLastQueryTime) {
- List<NativeQueryRealization> realizations =
queryHistory.transformRealizations();
+ List<NativeQueryRealization> realizations =
queryHistory.transformRealizations(project);
long queryTime = queryHistory.getQueryTime();
for (NativeQueryRealization realization : realizations) {
String modelId = realization.getModelId();
+ if (StringUtils.isEmpty(modelId)) {
+ continue;
+ }
modelsLastQueryTime.put(modelId, queryTime);
}
}
diff --git
a/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
index dc4dba454e..18e2a86842 100644
---
a/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
+++
b/src/common-service/src/test/java/org/apache/kylin/rest/service/task/QueryHistoryTaskSchedulerTest.java
@@ -67,6 +67,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.val;
+import lombok.var;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TimeZoneTestRunner.class)
@@ -77,6 +78,8 @@ public class QueryHistoryTaskSchedulerTest extends
NLocalFileMetadataTestCase {
private static final String DATAFLOW =
"89af4ee2-2cdb-4b07-b39e-4c29856309aa";
private static final String LAYOUT1 = "20000000001";
private static final String LAYOUT2 = "1000001";
+ private static final String LAYOUT3 = "30001";
+ private static final String LAYOUT4 = "40001";
private static final Long QUERY_TIME = 1586760398338L;
private QueryHistoryTaskScheduler qhAccelerateScheduler;
@@ -343,6 +346,46 @@ public class QueryHistoryTaskSchedulerTest extends
NLocalFileMetadataTestCase {
Assert.assertEquals(16,
idOffsetManager.get().getStatMetaUpdateOffset());
}
+ @Test
+ public void testUpdateStatMeta() {
+ QueryHistoryTaskScheduler taskScheduler = new
QueryHistoryTaskScheduler("streaming_test");
+ QueryHistoryTaskScheduler.QueryHistoryMetaUpdateRunner
metaUpdateRunner = taskScheduler.new QueryHistoryMetaUpdateRunner();
+ NDataflowManager manager =
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(),
"streaming_test");
+ {
+ var dataflow =
manager.getDataflow("334671fd-e383-4fc9-b5c2-94fce832f77a");
+ Assert.assertTrue(dataflow.getLayoutHitCount().isEmpty());
+ ReflectionTestUtils.invokeMethod(metaUpdateRunner,
"updateStatMeta", batchModelQueryHistory());
+ dataflow =
manager.getDataflow("334671fd-e383-4fc9-b5c2-94fce832f77a");
+ Assert.assertEquals(1, countDateFrequency(dataflow, LAYOUT3));
+ }
+ {
+ var batchDataflow =
manager.getDataflow("334671fd-e383-4fc9-b5c2-94fce832f77a");
+ var streamingDataflow =
manager.getDataflow("b05034a8-c037-416b-aa26-9e6b4a41ee40");
+
+
Assert.assertFalse(batchDataflow.getLayoutHitCount().containsKey(Long.parseLong(LAYOUT4)));
+
Assert.assertFalse(streamingDataflow.getLayoutHitCount().containsKey(Long.parseLong(LAYOUT3)));
+
+ ReflectionTestUtils.invokeMethod(metaUpdateRunner,
"updateStatMeta", fusionModelQueryHistory());
+ batchDataflow =
manager.getDataflow("334671fd-e383-4fc9-b5c2-94fce832f77a");
+ streamingDataflow =
manager.getDataflow("b05034a8-c037-416b-aa26-9e6b4a41ee40");
+
+ Assert.assertEquals(1, countDateFrequency(batchDataflow, LAYOUT4));
+ Assert.assertEquals(1, countDateFrequency(streamingDataflow,
LAYOUT3));
+ }
+ {
+ var streamingDataflow =
manager.getDataflow("b05034a8-c037-416b-aa26-9e6b4a41ee40");
+ Assert.assertEquals(1, countDateFrequency(streamingDataflow,
LAYOUT3));
+ ReflectionTestUtils.invokeMethod(metaUpdateRunner,
"updateStatMeta", streamingModelQueryHistory());
+ streamingDataflow =
manager.getDataflow("b05034a8-c037-416b-aa26-9e6b4a41ee40");
+ Assert.assertEquals(2, countDateFrequency(streamingDataflow,
LAYOUT3));
+ }
+ }
+
+ private int countDateFrequency(NDataflow dataflow, String layout) {
+ return
dataflow.getLayoutHitCount().get(Long.parseLong(layout)).getDateFrequency().values().stream()
+ .mapToInt(Integer::intValue).sum();
+ }
+
private List<QueryHistory> queryHistories() {
QueryHistory queryHistory1 = new QueryHistory();
queryHistory1.setSqlPattern("select * from sql1");
@@ -517,6 +560,85 @@ public class QueryHistoryTaskSchedulerTest extends
NLocalFileMetadataTestCase {
return histories;
}
+ /**
+ * sql match batch model of fusion model
+ * @return
+ */
+ private List<QueryHistory> batchModelQueryHistory() {
+ String fusionModelId = "b05034a8-c037-416b-aa26-9e6b4a41ee40";
+ QueryHistory queryHistory1 = new QueryHistory();
+ queryHistory1.setSqlPattern("SELECT MAX(LO_ORDERKEY) FROM
SSB.KAFKA_FUSION");
+ queryHistory1.setQueryStatus(QueryHistory.QUERY_HISTORY_SUCCEEDED);
+ queryHistory1.setDuration(1000L);
+ queryHistory1.setQueryTime(QUERY_TIME);
+ queryHistory1.setEngineType("NATIVE");
+ QueryHistoryInfo queryHistoryInfo1 = new QueryHistoryInfo();
+ queryHistoryInfo1.setRealizationMetrics(Lists.newArrayList(
+ new QueryMetrics.RealizationMetrics(LAYOUT3, "Agg Index",
fusionModelId, Lists.newArrayList())));
+ queryHistory1.setQueryHistoryInfo(queryHistoryInfo1);
+ queryHistory1.setId(10);
+ return Lists.newArrayList(queryHistory1);
+ }
+
+ /**
+ * sql match both batch model and streaming model in fusion model
+ * @return
+ */
+ private List<QueryHistory> fusionModelQueryHistory() {
+ String fusionModelId = "b05034a8-c037-416b-aa26-9e6b4a41ee40";
+ String batchModelId = "334671fd-e383-4fc9-b5c2-94fce832f77a";
+ QueryHistory queryHistory1 = new QueryHistory();
+ queryHistory1.setSqlPattern("SELECT MAX(LO_ORDERKEY) FROM
SSB.KAFKA_FUSION");
+ queryHistory1.setQueryStatus(QueryHistory.QUERY_HISTORY_SUCCEEDED);
+ queryHistory1.setDuration(1000L);
+ queryHistory1.setQueryTime(QUERY_TIME);
+ queryHistory1.setEngineType("NATIVE");
+ QueryHistoryInfo queryHistoryInfo1 = new QueryHistoryInfo();
+ QueryMetrics.RealizationMetrics streamingRealizationMetric = new
QueryMetrics.RealizationMetrics(LAYOUT3,
+ "Agg Index", fusionModelId, Lists.newArrayList());
+ streamingRealizationMetric.setStreamingLayout(true);
+
queryHistoryInfo1.setRealizationMetrics(Lists.newArrayList(streamingRealizationMetric,
+ new QueryMetrics.RealizationMetrics(LAYOUT4, "Agg Index",
batchModelId, Lists.newArrayList())));
+ queryHistory1.setQueryHistoryInfo(queryHistoryInfo1);
+ queryHistory1.setId(11);
+ return Lists.newArrayList(queryHistory1);
+ }
+
+ /**
+ * sql match streaming model in fusion model
+ * @return
+ */
+ private List<QueryHistory> streamingModelQueryHistory() {
+ String fusionModelId = "b05034a8-c037-416b-aa26-9e6b4a41ee40";
+ QueryHistory queryHistory1 = new QueryHistory();
+ queryHistory1.setSqlPattern("SELECT MAX(LO_ORDERKEY) FROM
SSB.KAFKA_FUSION");
+ queryHistory1.setQueryStatus(QueryHistory.QUERY_HISTORY_SUCCEEDED);
+ queryHistory1.setDuration(1000L);
+ queryHistory1.setQueryTime(QUERY_TIME);
+ queryHistory1.setEngineType("NATIVE");
+ QueryHistoryInfo queryHistoryInfo1 = new QueryHistoryInfo();
+ QueryMetrics.RealizationMetrics streamingRealizationMetric = new
QueryMetrics.RealizationMetrics(LAYOUT3,
+ "Agg Index", fusionModelId, Lists.newArrayList());
+ streamingRealizationMetric.setStreamingLayout(true);
+
queryHistoryInfo1.setRealizationMetrics(Lists.newArrayList(streamingRealizationMetric));
+ queryHistory1.setQueryHistoryInfo(queryHistoryInfo1);
+ queryHistory1.setId(10);
+
+ QueryHistory queryHistory2 = new QueryHistory();
+ queryHistory2.setSqlPattern("SELECT MAX(LO_ORDERKEY) FROM
SSB.KAFKA_FUSION");
+ queryHistory2.setQueryStatus(QueryHistory.QUERY_HISTORY_SUCCEEDED);
+ queryHistory2.setDuration(1000L);
+ queryHistory2.setQueryTime(QUERY_TIME);
+ queryHistory2.setEngineType("NATIVE");
+ QueryHistoryInfo queryHistoryInfo2 = new QueryHistoryInfo();
+ QueryMetrics.RealizationMetrics realizationMetric = new
QueryMetrics.RealizationMetrics(LAYOUT3,
+ "Agg Index", "error", Lists.newArrayList());
+
queryHistoryInfo2.setRealizationMetrics(Lists.newArrayList(realizationMetric));
+ queryHistory2.setQueryHistoryInfo(queryHistoryInfo2);
+ queryHistory2.setId(11);
+ return Lists.newArrayList(queryHistory1, queryHistory2);
+ }
+
int startOffset = 0;
}
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
index c72b4842e5..7a5d81b145 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataflow.java
@@ -242,6 +242,11 @@ public class NDataflow extends RootPersistentEntity
implements Serializable, IRe
return model == null ? null : model.getAlias();
}
+ public String getFusionModelAlias() {
+ NDataModel model = getModel();
+ return model == null ? null : model.getFusionModelAlias();
+ }
+
@Override
public Set<TblColRef> getAllColumns() {
return getIndexPlan().listAllTblColRefs();
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java
index d1ce44915d..28719ab1e1 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModel.java
@@ -120,7 +120,7 @@ public class FusionModel extends RootPersistentEntity
implements Serializable {
NDataModelManager modelManager = NDataModelManager.getInstance(config,
project);
for (String modelId : getModelsId()) {
NDataModel model = modelManager.getDataModelDesc(modelId);
- if (model.getModelType() == modelType) {
+ if (model != null && model.getModelType() == modelType) {
return model;
}
}
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModelManager.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModelManager.java
index f0623f5fa2..45fdf65aae 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModelManager.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/FusionModelManager.java
@@ -18,9 +18,14 @@
package org.apache.kylin.metadata.model;
+import java.util.Objects;
+import java.util.Optional;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+import org.apache.kylin.metadata.query.NativeQueryRealization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,4 +98,14 @@ public class FusionModelManager {
return crud.save(desc);
}
+
+ public String getModelId(NativeQueryRealization realization) {
+ String modelId = realization.getModelId();
+ FusionModel fusionModel = getFusionModel(modelId);
+ if (!realization.isStreamingLayout() && !Objects.isNull(fusionModel)) {
+ NDataModel dataModel = fusionModel.getBatchModel();
+ modelId =
Optional.ofNullable(dataModel).map(RootPersistentEntity::getId).orElse("");
+ }
+ return modelId;
+ }
}
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistory.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistory.java
index c24f1ae908..4414ff2fb4 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistory.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/query/QueryHistory.java
@@ -22,8 +22,10 @@ import java.io.IOException;
import java.util.List;
import java.util.regex.Pattern;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.metadata.model.FusionModelManager;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
@@ -31,7 +33,7 @@ import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
import lombok.val;
-import org.apache.kylin.common.util.JsonUtil;
+import lombok.extern.slf4j.Slf4j;
@SuppressWarnings("serial")
@Getter
@@ -192,7 +194,7 @@ public class QueryHistory {
return queryStatus.equals(QUERY_HISTORY_FAILED);
}
- public List<NativeQueryRealization> transformRealizations() {
+ public List<NativeQueryRealization> transformRealizations(String project) {
List<NativeQueryRealization> realizations = Lists.newArrayList();
if (queryHistoryInfo == null ||
queryHistoryInfo.getRealizationMetrics() == null
|| queryHistoryInfo.getRealizationMetrics().isEmpty()) {
@@ -200,7 +202,7 @@ public class QueryHistory {
}
List<QueryMetrics.RealizationMetrics> realizationMetrics =
queryHistoryInfo.realizationMetrics;
-
+ val fusionModelManager =
FusionModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
for (QueryMetrics.RealizationMetrics metrics : realizationMetrics) {
val realization = new NativeQueryRealization(metrics.modelId,
metrics.layoutId == null ||
metrics.layoutId.equals("null") ? null
@@ -210,6 +212,8 @@ public class QueryHistory {
: metrics.snapshots);
realization.setSecondStorage(metrics.isSecondStorage);
realization.setStreamingLayout(metrics.isStreamingLayout);
+ String modelId = fusionModelManager.getModelId(realization);
+ realization.setModelId(modelId);
realizations.add(realization);
}
return realizations;
diff --git
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowManagerTest.java
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowManagerTest.java
index b83682d724..67796a1eb5 100644
---
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowManagerTest.java
+++
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/model/NDataflowManagerTest.java
@@ -989,4 +989,24 @@ public class NDataflowManagerTest extends
NLocalFileMetadataTestCase {
Assert.assertEquals(5, flatTableDesc2.getUsedColumns().size());
}
+ @Test
+ public void testGetFusionModelAlias() {
+ String streamingModelId = "14e00a6f-d910-14b6-ee67-e0a5775012c4";
+ String batchModelId = "3d69e1c0-0165-c144-7dae-8ae5dc0cf16c";
+ NDataflowManager mgr = NDataflowManager.getInstance(getTestConfig(),
"streaming_test");
+ Assert.assertEquals("fusion_model",
mgr.getDataflow(streamingModelId).getFusionModelAlias());
+ Assert.assertEquals("fusion_model",
mgr.getDataflow(batchModelId).getFusionModelAlias());
+
+ Assert.assertEquals("stream_merge",
+
mgr.getDataflow("e78a89dd-847f-4574-8afa-8768b4228b72").getFusionModelAlias());
+
+ mgr = NDataflowManager.getInstance(getTestConfig(), projectDefault);
+ Assert.assertEquals("nmodel_basic_inner",
+
mgr.getDataflow("741ca86a-1f13-46da-a59f-95fb68615e3a").getFusionModelAlias());
+
+ val modelManager =
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), projectDefault);
+ modelManager.dropModel("741ca86a-1f13-46da-a59f-95fb68615e3a");
+
Assert.assertNull(mgr.getDataflow("741ca86a-1f13-46da-a59f-95fb68615e3a").getFusionModelAlias());
+
+ }
}
diff --git
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelManagerTest.java
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelManagerTest.java
index 54866b4fdc..c44fdec332 100644
---
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelManagerTest.java
+++
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/streaming/FusionModelManagerTest.java
@@ -22,6 +22,7 @@ import
org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.metadata.model.FusionModel;
import org.apache.kylin.metadata.model.FusionModelManager;
import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.query.NativeQueryRealization;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -106,4 +107,27 @@ public class FusionModelManagerTest extends
NLocalFileMetadataTestCase {
Assert.assertTrue(streamingModel.fusionModelStreamingPart());
}
+ @Test
+ public void testGetModelId() {
+ String streamingModelId = "14e00a6f-d910-14b6-ee67-e0a5775012c4";
+ String batchModelId = "3d69e1c0-0165-c144-7dae-8ae5dc0cf16c";
+
+ val realization = new NativeQueryRealization();
+ realization.setModelId(streamingModelId);
+ Assert.assertEquals(batchModelId, mgr.getModelId(realization));
+
+ realization.setModelId(streamingModelId);
+ realization.setStreamingLayout(true);
+ Assert.assertEquals(streamingModelId, mgr.getModelId(realization));
+
+ realization.setModelId(batchModelId);
+ realization.setStreamingLayout(false);
+ Assert.assertEquals(batchModelId, mgr.getModelId(realization));
+
+ realization.setModelId(streamingModelId);
+ val modelMgr = NDataModelManager.getInstance(getTestConfig(), PROJECT);
+ modelMgr.dropModel(batchModelId);
+ Assert.assertEquals("", mgr.getModelId(realization));
+ }
+
}
diff --git
a/src/job-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java
b/src/job-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java
index 1dcad68b23..1b26150623 100644
---
a/src/job-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java
+++
b/src/job-service/src/main/java/org/apache/kylin/rest/service/AsyncTaskService.java
@@ -128,7 +128,7 @@ public class AsyncTaskService implements
AsyncTaskServiceSupporter {
val noBrokenModels = NDataflowManager.getInstance(kylinConfig,
project).listUnderliningDataModels().stream()
.collect(Collectors.toMap(NDataModel::getAlias,
RootPersistentEntity::getUuid));
val dataModelManager = NDataModelManager.getInstance(kylinConfig,
project);
- List<NativeQueryRealization> realizations = qh.transformRealizations();
+ List<NativeQueryRealization> realizations =
qh.transformRealizations(project);
realizations.forEach(realization -> {
NDataModel nDataModel =
dataModelManager.getDataModelDesc(realization.getModelId());
diff --git
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
index bc38e2c99a..6f7ecff94b 100644
---
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
+++
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryCacheManager.java
@@ -195,7 +195,7 @@ public class QueryCacheManager implements
CommonQueryCacheSupporter {
val modelId = nativeQueryRealization.getModelId();
val dataflow =
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project)
.getDataflow(modelId);
- nativeQueryRealization.setModelAlias(dataflow.getModelAlias());
+
nativeQueryRealization.setModelAlias(dataflow.getFusionModelAlias());
}
return cached;
diff --git
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryHistoryService.java
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryHistoryService.java
index f7c3bfc747..7479631821 100644
---
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryHistoryService.java
+++
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryHistoryService.java
@@ -40,7 +40,6 @@ import java.util.stream.Collectors;
import javax.servlet.http.HttpServletResponse;
-import io.kyligence.kap.guava20.shaded.common.collect.ImmutableMap;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.kylin.common.KylinConfig;
@@ -48,9 +47,6 @@ import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.rest.exception.ForbiddenException;
-import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.common.util.Unsafe;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
@@ -59,6 +55,7 @@ import
org.apache.kylin.metadata.favorite.QueryHistoryIdOffsetManager;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.query.NativeQueryRealization;
import org.apache.kylin.metadata.query.QueryHistory;
import org.apache.kylin.metadata.query.QueryHistoryDAO;
@@ -66,8 +63,10 @@ import org.apache.kylin.metadata.query.QueryHistoryInfo;
import org.apache.kylin.metadata.query.QueryHistoryRequest;
import org.apache.kylin.metadata.query.QueryStatistics;
import org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO;
+import org.apache.kylin.rest.exception.ForbiddenException;
import org.apache.kylin.rest.response.NDataModelResponse;
import org.apache.kylin.rest.response.QueryStatisticsResponse;
+import org.apache.kylin.rest.util.AclEvaluate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -79,6 +78,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import io.kyligence.kap.guava20.shaded.common.collect.ImmutableMap;
import lombok.val;
@Component("queryHistoryService")
@@ -191,13 +191,16 @@ public class QueryHistoryService extends BasicService
implements AsyncTaskQueryH
.listUnderliningDataModels().stream()
.collect(Collectors.toMap(NDataModel::getAlias,
RootPersistentEntity::getUuid));
- val dataModelManager =
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
- val indexPlanManager =
NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
- List<NativeQueryRealization> realizations =
query.transformRealizations();
+ val config = KylinConfig.getInstanceFromEnv();
+ val indexPlanManager = NIndexPlanManager.getInstance(config, project);
+ val modelManager = NDataModelManager.getInstance(config, project);
+
+ List<NativeQueryRealization> realizations =
query.transformRealizations(project);
realizations.forEach(realization -> {
- NDataModel nDataModel =
dataModelManager.getDataModelDesc(realization.getModelId());
- if (noBrokenModels.containsValue(realization.getModelId())) {
+ String modelId = realization.getModelId();
+ NDataModel nDataModel = modelManager.getDataModelDesc(modelId);
+ if (noBrokenModels.containsValue(modelId)) {
NDataModelResponse model = (NDataModelResponse) modelService
.updateResponseAcl(new NDataModelResponse(nDataModel),
project);
realization.setModelAlias(model.getFusionModelAlias());
diff --git
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 69d75265df..b1361132f9 100644
---
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -83,6 +83,7 @@ import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.acl.AclTCR;
import org.apache.kylin.metadata.acl.AclTCRManager;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
+import org.apache.kylin.metadata.model.FusionModelManager;
import org.apache.kylin.metadata.model.JoinDesc;
import org.apache.kylin.metadata.model.JoinTableDesc;
import org.apache.kylin.metadata.model.NDataModel;
@@ -636,6 +637,12 @@ public class QueryService extends BasicService implements
CacheSignatureQuerySup
QueryContext.currentMetrics().getTotalScanRows());
}
+ val fusionManager =
FusionModelManager.getInstance(KylinConfig.getInstanceFromEnv(),
+ sqlRequest.getProject());
+ if
(CollectionUtils.isNotEmpty(sqlResponse.getNativeRealizations())) {
+ sqlResponse.getNativeRealizations().stream()
+ .forEach(realization ->
realization.setModelId(fusionManager.getModelId(realization)));
+ }
//check query result row count
NCircuitBreaker.verifyQueryResultRowCount(sqlResponse.getResultRowCount());
diff --git
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
index da763292c8..a53f9af8e7 100644
---
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
+++
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryHistoryServiceTest.java
@@ -902,12 +902,17 @@ public class QueryHistoryServiceTest extends
NLocalFileMetadataTestCase {
QueryHistory query2 = new QueryHistory();
query2.setSql("select * from test_table_2");
+ QueryHistory query3 = new QueryHistory();
+ query3.setSql("select * from test_table_3");
+
QueryMetrics.RealizationMetrics metrics1 = new
QueryMetrics.RealizationMetrics("1", "Agg Index",
"b05034a8-c037-416b-aa26-9e6b4a41ee40", Lists.newArrayList(new
String[] {}));
QueryMetrics.RealizationMetrics metrics2 = new
QueryMetrics.RealizationMetrics("1", "Agg Index",
"334671fd-e383-4fc9-b5c2-94fce832f77a", Lists.newArrayList(new
String[] {}));
QueryMetrics.RealizationMetrics metrics3 = new
QueryMetrics.RealizationMetrics("1", "Agg Index",
"554671fd-e383-4fc9-b5c2-94fce832f77a", Lists.newArrayList(new
String[] {}));
+ QueryMetrics.RealizationMetrics metrics4 = new
QueryMetrics.RealizationMetrics("1", "Agg Index",
+ "b05034a8-c037-416b-aa26-9e6b4a41ee40", Lists.newArrayList(new
String[] {}));
QueryHistoryInfo queryHistoryInfo1 = new QueryHistoryInfo();
queryHistoryInfo1.setRealizationMetrics(
@@ -917,8 +922,12 @@ public class QueryHistoryServiceTest extends
NLocalFileMetadataTestCase {
QueryHistoryInfo queryHistoryInfo2 = new QueryHistoryInfo();
queryHistoryInfo2.setRealizationMetrics(Lists.newArrayList(new
QueryMetrics.RealizationMetrics[] { metrics3 }));
query2.setQueryHistoryInfo(queryHistoryInfo2);
+
+ QueryHistoryInfo queryHistoryInfo3 = new QueryHistoryInfo();
+ queryHistoryInfo3.setRealizationMetrics(Lists.newArrayList(new
QueryMetrics.RealizationMetrics[] { metrics4 }));
+ query3.setQueryHistoryInfo(queryHistoryInfo3);
RDBMSQueryHistoryDAO queryHistoryDAO =
Mockito.mock(RDBMSQueryHistoryDAO.class);
- Mockito.doReturn(Lists.newArrayList(query1,
query2)).when(queryHistoryDAO)
+ Mockito.doReturn(Lists.newArrayList(query1, query2,
query3)).when(queryHistoryDAO)
.getQueryHistoriesByConditions(Mockito.any(),
Mockito.anyInt(), Mockito.anyInt());
Mockito.doReturn(10L).when(queryHistoryDAO).getQueryHistoriesSize(Mockito.any(),
Mockito.anyString());
Mockito.doReturn(queryHistoryDAO).when(queryHistoryService).getQueryHistoryDao();
@@ -930,6 +939,8 @@ public class QueryHistoryServiceTest extends
NLocalFileMetadataTestCase {
Assert.assertEquals("streaming_test",
queryHistories.get(0).getNativeQueryRealizations().get(1).getModelAlias());
Assert.assertEquals("batch",
queryHistories.get(1).getNativeQueryRealizations().get(0).getModelAlias());
+ Assert.assertEquals("334671fd-e383-4fc9-b5c2-94fce832f77a",
+
queryHistories.get(2).getNativeQueryRealizations().get(0).getModelId());
}
@Test
diff --git
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index 3a6f6670da..fec084ee69 100644
---
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -586,6 +586,39 @@ public class QueryServiceTest extends
NLocalFileMetadataTestCase {
mockQueryWithSqlMassage();
}
+ private void mockOLAPContextWithBatchPart() throws Exception {
+ val modelManager = Mockito
+
.spy(NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(),
"streaming_test"));
+
+
Mockito.doReturn(modelManager).when(queryService).getManager(NDataModelManager.class,
"streaming_test");
+ // mock agg index realization
+ OLAPContext aggMock = new OLAPContext(1);
+ NDataModel mockModel1 = Mockito.spy(new NDataModel());
+
Mockito.when(mockModel1.getUuid()).thenReturn("4965c827-fbb4-4ea1-a744-3f341a3b030d");
+ Mockito.when(mockModel1.getAlias()).thenReturn("model_streaming");
+
Mockito.doReturn(mockModel1).when(modelManager).getDataModelDesc("4965c827-fbb4-4ea1-a744-3f341a3b030d");
+
+ IRealization batchRealization = Mockito.mock(IRealization.class);
+
Mockito.when(batchRealization.getUuid()).thenReturn("cd2b9a23-699c-4699-b0dd-38c9412b3dfd");
+
+ HybridRealization hybridRealization =
Mockito.mock(HybridRealization.class);
+ Mockito.when(hybridRealization.getModel()).thenReturn(mockModel1);
+
Mockito.when(hybridRealization.getBatchRealization()).thenReturn(batchRealization);
+
+ aggMock.realization = hybridRealization;
+ IndexEntity mockIndexEntity1 = new IndexEntity();
+ mockIndexEntity1.setId(1);
+ LayoutEntity mockLayout1 = new LayoutEntity();
+ mockLayout1.setIndex(mockIndexEntity1);
+ aggMock.storageContext.setCandidate(new NLayoutCandidate(mockLayout1));
+ aggMock.storageContext.setLayoutId(20001L);
+ aggMock.storageContext.setPrunedSegments(Lists.newArrayList(new
NDataSegment()));
+ OLAPContext.registerContext(aggMock);
+
+ Mockito.doNothing().when(queryService).clearThreadLocalContexts();
+ mockQueryWithSqlMassage();
+ }
+
private void mockOLAPContextWithStreaming() throws Exception {
val modelManager =
Mockito.spy(NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(),
"demo"));
@@ -1991,28 +2024,48 @@ public class QueryServiceTest extends
NLocalFileMetadataTestCase {
@Test
public void testQueryContextWithFusionModel() throws Exception {
final String project = "streaming_test";
- final String sql = "select count(*) from SSB_STREAMING";
- stubQueryConnection(sql, project);
- mockOLAPContextWithHybrid();
+ {
+ final String sql = "select count(*) from SSB_STREAMING";
- final SQLRequest request = new SQLRequest();
- request.setProject(project);
- request.setSql(sql);
-
Mockito.when(SpringContext.getBean(QueryService.class)).thenReturn(queryService);
- SQLResponse sqlResponse = queryService.doQueryWithCache(request);
+ stubQueryConnection(sql, project);
+ mockOLAPContextWithHybrid();
- Assert.assertEquals(2, sqlResponse.getNativeRealizations().size());
+ final SQLRequest request = new SQLRequest();
+ request.setProject(project);
+ request.setSql(sql);
+
Mockito.when(SpringContext.getBean(QueryService.class)).thenReturn(queryService);
+ SQLResponse sqlResponse = queryService.doQueryWithCache(request);
- Assert.assertEquals("4965c827-fbb4-4ea1-a744-3f341a3b030d",
- sqlResponse.getNativeRealizations().get(0).getModelId());
- Assert.assertEquals((Long) 10001L,
sqlResponse.getNativeRealizations().get(0).getLayoutId());
- Assert.assertEquals("cd2b9a23-699c-4699-b0dd-38c9412b3dfd",
- sqlResponse.getNativeRealizations().get(1).getModelId());
- Assert.assertEquals((Long) 20001L,
sqlResponse.getNativeRealizations().get(1).getLayoutId());
+ Assert.assertEquals(2, sqlResponse.getNativeRealizations().size());
-
Assert.assertTrue(sqlResponse.getNativeRealizations().get(0).isStreamingLayout());
-
Assert.assertFalse(sqlResponse.getNativeRealizations().get(1).isStreamingLayout());
+ Assert.assertEquals("4965c827-fbb4-4ea1-a744-3f341a3b030d",
+ sqlResponse.getNativeRealizations().get(0).getModelId());
+ Assert.assertEquals((Long) 10001L,
sqlResponse.getNativeRealizations().get(0).getLayoutId());
+ Assert.assertEquals("cd2b9a23-699c-4699-b0dd-38c9412b3dfd",
+ sqlResponse.getNativeRealizations().get(1).getModelId());
+ Assert.assertEquals((Long) 20001L,
sqlResponse.getNativeRealizations().get(1).getLayoutId());
+
+
Assert.assertTrue(sqlResponse.getNativeRealizations().get(0).isStreamingLayout());
+
Assert.assertFalse(sqlResponse.getNativeRealizations().get(1).isStreamingLayout());
+ }
+ {
+ final String sql = "select count(1) from SSB_STREAMING";
+
+ stubQueryConnection(sql, project);
+ mockOLAPContextWithBatchPart();
+
+ final SQLRequest request = new SQLRequest();
+ request.setProject(project);
+ request.setSql(sql);
+
Mockito.when(SpringContext.getBean(QueryService.class)).thenReturn(queryService);
+ SQLResponse sqlResponse = queryService.doQueryWithCache(request);
+
+ Assert.assertEquals(1, sqlResponse.getNativeRealizations().size());
+
Assert.assertFalse(sqlResponse.getNativeRealizations().get(0).isStreamingLayout());
+ Assert.assertEquals("cd2b9a23-699c-4699-b0dd-38c9412b3dfd",
+ sqlResponse.getNativeRealizations().get(0).getModelId());
+ }
}
@Test