This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 03a44dc0c0da5030fdef47ca53abe3e6d77f6b23 Author: ChenliangLu <[email protected]> AuthorDate: Thu Mar 16 13:58:28 2023 +0800 KYLIN-5564 Support building BloomFilters for the desired columns --- pom.xml | 2 +- .../apache/kylin/rest/config/AppInitializer.java | 4 + .../org/apache/kylin/common/KylinConfigBase.java | 28 +++ .../org/apache/kylin/newten/BloomFilterTest.java | 218 +++++++++++++++++++++ .../org/apache/kylin/newten/LogicalViewTest.java | 2 +- .../metadata/_global/project/bloomfilter.json | 35 ++++ .../c41390c5-b93d-4db3-b167-029874b85a2c.json | 13 ++ .../c41390c5-b93d-4db3-b167-029874b85a2c.json | 63 ++++++ .../c41390c5-b93d-4db3-b167-029874b85a2c.json | 158 +++++++++++++++ .../metadata/bloomfilter/table/SSB.CUSTOMER.json | 68 +++++++ .../bloomfilter/table/SSB.P_LINEORDER.json | 118 +++++++++++ .../apache/kylin/rest/service/QueryService.java | 2 + .../kylin/engine/spark/job/SegmentExec.scala | 6 +- .../engine/spark/utils/TestJobMetricsUtils.scala | 5 +- .../kylin/query/runtime/plan/ResultPlan.scala | 11 +- .../scala/org/apache/spark/sql/SparderEnv.scala | 28 ++- .../spark/filter/BloomFilterSkipCollector.java | 122 ++++++++++++ .../engine/spark/filter/ParquetBloomFilter.java | 168 ++++++++++++++++ .../engine/spark/filter/QueryFiltersCollector.java | 151 ++++++++++++++ .../kylin/engine/spark/utils/Repartitioner.java | 8 +- .../kylin/engine/spark/utils/StorageUtils.scala | 14 +- .../org/apache/spark/sql/SparderConstants.scala | 1 + .../sql/execution/datasource/FilePruner.scala | 18 +- 23 files changed, 1217 insertions(+), 26 deletions(-) diff --git a/pom.xml b/pom.xml index 2f827a3c5f..f6663758f4 100644 --- a/pom.xml +++ b/pom.xml @@ -314,7 +314,7 @@ <zkclient.version>0.8</zkclient.version> <grpc.version>1.0.2</grpc.version> <fastPFOR.version>0.0.13</fastPFOR.version> - <parquet.version>1.12.2-kylin-r1</parquet.version> + <parquet.version>1.12.2-kylin-r3</parquet.version> <quartz.version>2.1.1</quartz.version> <janino.version>3.0.9</janino.version> diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java index 5e7cbd8814..c934e45bce 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java @@ -35,6 +35,7 @@ import org.apache.kylin.common.persistence.transaction.EventListenerRegistry; import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.common.util.HostInfoFetcher; +import org.apache.kylin.engine.spark.filter.QueryFiltersCollector; import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils; import org.apache.kylin.metadata.epoch.EpochOrchestrator; import org.apache.kylin.metadata.project.NProjectLoader; @@ -138,6 +139,9 @@ public class AppInitializer { } else { context.publishEvent(new SparderStartEvent.SyncEvent(context)); } + if (kylinConfig.isBloomCollectFilterEnabled()) { + QueryFiltersCollector.initScheduler(); + } } EventBusFactory.getInstance().register(new ProcessStatusListener(), true); // register acl update listener diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 6d1e60608b..387433edd7 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3902,4 +3902,32 @@ public abstract class KylinConfigBase implements Serializable { public boolean isIndexEnableOperatorDesign() { return Boolean.parseBoolean(getOptional("kylin.index.enable-operator-design", FALSE)); } + + public int getQueryFilterCollectInterval() { + return Integer.parseInt(getOptional("kylin.query.filter.collect-interval", "1800")); + } + + public boolean isBloomCollectFilterEnabled() { + return Boolean.parseBoolean(getOptional("kylin.bloom.collect-filter.enabled", TRUE)); + } + + public boolean isCollectQueryMetricsEnabled() { + return Boolean.parseBoolean(getOptional("kylin.query.collect-metrics.enabled", TRUE)); + } + + public boolean isBloomBuildEnabled() { + return Boolean.parseBoolean(getOptional("kylin.bloom.build.enabled", FALSE)); + } + + public int getBloomBuildColumnMaxNum() { + return Integer.parseInt(getOptional("kylin.bloom.build.column.max-size", "3")); + } + + public String getBloomBuildColumn() { + return getOptional("kylin.bloom.build.column", ""); + } + + public int getBloomBuildColumnNvd() { + return Integer.parseInt(getOptional("kylin.bloom.build.column.nvd", "200000")); + } } diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java new file mode 100644 index 0000000000..3241711864 --- /dev/null +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.kylin.newten; + +import static org.apache.kylin.engine.spark.filter.QueryFiltersCollector.SERVER_HOST; +import static org.apache.kylin.engine.spark.filter.QueryFiltersCollector.getProjectFiltersFile; +import static org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Shell; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.RandomUtil; +import org.apache.kylin.common.util.TempMetadataBuilder; +import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; +import org.apache.kylin.engine.spark.filter.BloomFilterSkipCollector; +import org.apache.kylin.engine.spark.filter.ParquetBloomFilter; +import org.apache.kylin.engine.spark.filter.QueryFiltersCollector; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; +import org.apache.kylin.junit.TimeZoneTestRunner; +import org.apache.kylin.metadata.cube.model.LayoutEntity; +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.cube.model.NDataflowManager; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.util.ExecAndComp; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparderEnv; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper; +import org.apache.spark.sql.internal.StaticSQLConf; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.sparkproject.guava.collect.Sets; + +@RunWith(TimeZoneTestRunner.class) +public class BloomFilterTest extends NLocalWithSparkSessionTest implements AdaptiveSparkPlanHelper { + + private NDataflowManager dfMgr = null; + + @BeforeClass + public static void initSpark() { + if (Shell.MAC) { + overwriteSystemPropBeforeClass("org.xerial.snappy.lib.name", "libsnappyjava.jnilib");//for snappy + } + if (ss != null && !ss.sparkContext().isStopped()) { + ss.stop(); + } + sparkConf = new SparkConf().setAppName(RandomUtil.randomUUIDStr()).setMaster("local[2]"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer"); + sparkConf.set(StaticSQLConf.CATALOG_IMPLEMENTATION().key(), "in-memory"); + sparkConf.set("spark.sql.shuffle.partitions", "1"); + sparkConf.set("spark.memory.fraction", "0.1"); + sparkConf.set(StaticSQLConf.WAREHOUSE_PATH().key(), + TempMetadataBuilder.TEMP_TEST_METADATA + "/spark-warehouse"); + ss = SparkSession.builder().config(sparkConf).getOrCreate(); + SparderEnv.setSparkSession(ss); + } + + @Before + public void setup() throws Exception { + overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1"); + overwriteSystemProp("kylin.bloom.collect-filter.enabled", "true"); + overwriteSystemProp("kylin.bloom.build.enabled", "true"); + overwriteSystemProp("kylin.query.filter.collect-interval", "10"); + this.createTestMetadata("src/test/resources/ut_meta/bloomfilter"); + NDefaultScheduler scheduler = NDefaultScheduler.getInstance(getProject()); + scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv())); + if (!scheduler.hasStarted()) { + throw new RuntimeException("scheduler has not been started"); + } + QueryFiltersCollector.initScheduler(); + dfMgr = NDataflowManager.getInstance(getTestConfig(), getProject()); + } + + @After + public void after() throws Exception { + NDefaultScheduler.destroyInstance(); + QueryFiltersCollector.destoryScheduler(); + cleanupTestMetadata(); + } + + public String getProject() { + return "bloomfilter"; + } + + @Test + public void testBuildBloomFilter() throws Exception { + String dfID = "c41390c5-b93d-4db3-b167-029874b85a2c"; + NDataflow dataflow = dfMgr.getDataflow(dfID); + LayoutEntity layout = dataflow.getIndexPlan().getLayoutEntity(20000000001L); + Assert.assertNotNull(layout); + populateSSWithCSVData(getTestConfig(), getProject(), SparderEnv.getSparkSession()); + + overwriteSystemProp("kylin.bloom.build.column", "0#10000#1#10000"); + indexDataConstructor.buildIndex(dfID, SegmentRange.TimePartitionedSegmentRange.createInfinite(), + Sets.newHashSet( + dataflow.getIndexPlan().getLayoutEntity(20000000001L)), true); + // In our PR UT environment, building job may not be executed when there has cache, + // so we will ignore it if job is skipped + if (ParquetBloomFilter.isLoaded()) { + // set BloomFilter to build manually, see "kylin.bloom.build.column" + Assert.assertTrue(ParquetBloomFilter.getBuildBloomColumns().contains("0")); + Assert.assertTrue(ParquetBloomFilter.getBuildBloomColumns().contains("1")); + } + + List<Pair<String, String>> query = new ArrayList<>(); + String sql1 = "select * from SSB.P_LINEORDER where LO_CUSTKEY in (13,8) and LO_SHIPPRIOTITY = 0 "; + query.add(Pair.newPair("bloomfilter", sql1)); + ExecAndComp.execAndCompare(query, getProject(), ExecAndComp.CompareLevel.NONE, "inner"); + Path projectFilterPath = getProjectFiltersFile(SERVER_HOST, getProject()); + FileSystem fs = HadoopUtil.getFileSystem(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()); + // wait until `QueryFiltersCollector` record filter info + await().atMost(120, TimeUnit.SECONDS).until(() -> { + try { + if (!fs.exists(projectFilterPath)) { + return false; + } + } catch (Exception e) { + return false; + } + return true; + }); + Map<String, Map<String, Integer>> history = JsonUtil.readValue( + HadoopUtil.readStringFromHdfs(fs, projectFilterPath), Map.class); + Assert.assertTrue(history.get(dfID).keySet().contains("8")); + Assert.assertTrue(history.get(dfID).keySet().contains("9")); + Integer hitNum = history.get(dfID).get("8"); + Assert.assertTrue(fs.exists(projectFilterPath)); + String sql2 = "select * from SSB.P_LINEORDER where LO_CUSTKEY in (13,8)"; + query.add(Pair.newPair("bloomfilter", sql2)); + ExecAndComp.execAndCompare(query, getProject(), ExecAndComp.CompareLevel.NONE, "inner"); + + await().atMost(120, TimeUnit.SECONDS).until(() -> { + try { + Map<String, Map<String, Integer>> newHistory = JsonUtil.readValue( + HadoopUtil.readStringFromHdfs(fs, projectFilterPath), Map.class); + Integer newHitNum = newHistory.get(dfID).get("8"); + if (newHitNum <= hitNum) { + return false; + } + } catch (Exception e) { + return false; + } + return true; + }); + + overwriteSystemProp("kylin.bloom.build.column", ""); + overwriteSystemProp("kylin.bloom.build.column.max-size", "1"); + ParquetBloomFilter.resetParquetBloomFilter(); + indexDataConstructor.buildIndex(dfID, SegmentRange.TimePartitionedSegmentRange.createInfinite(), + Sets.newHashSet(dataflow.getIndexPlan().getLayoutEntity(20000000001L)), true); + // In our PR UT environment, building job may not be executed when there has cache, + // so we will ignore it if job is skipped + if (ParquetBloomFilter.isLoaded()) { + // build BloomFilter according to query statics + Assert.assertTrue(ParquetBloomFilter.getBuildBloomColumns().contains("8")); + // `kylin.bloom.build.column.max-size=1`, only build one column + Assert.assertFalse(ParquetBloomFilter.getBuildBloomColumns().contains("9")); + } + ExecAndComp.execAndCompare(query, getProject(), ExecAndComp.CompareLevel.SAME, "inner"); + + testBloomFilterSkipCollector(); + } + + private void testBloomFilterSkipCollector() { + String queryId1 = "query-id1"; + BloomFilterSkipCollector.addQueryMetrics(queryId1, 3, + 2, 20, 100, 1); + BloomFilterSkipCollector.addQueryMetrics(queryId1, 1, + 1, 10, 100, 1); + Assert.assertEquals(4L, BloomFilterSkipCollector.queryTotalBloomBlocks.getIfPresent(queryId1).get()); + Assert.assertEquals(3L, BloomFilterSkipCollector.querySkipBloomBlocks.getIfPresent(queryId1).get()); + Assert.assertEquals(30L, BloomFilterSkipCollector.querySkipBloomRows.getIfPresent(queryId1).get()); + Assert.assertEquals(200L, BloomFilterSkipCollector.queryFooterReadTime.getIfPresent(queryId1).get()); + Assert.assertEquals(2L, BloomFilterSkipCollector.queryFooterReadNumber.getIfPresent(queryId1).get()); + BloomFilterSkipCollector.logAndCleanStatus(queryId1); + BloomFilterSkipCollector.logAndCleanStatus("query-id2"); + Assert.assertNull(BloomFilterSkipCollector.queryTotalBloomBlocks.getIfPresent(queryId1)); + Assert.assertNull(BloomFilterSkipCollector.querySkipBloomBlocks.getIfPresent(queryId1)); + Assert.assertNull(BloomFilterSkipCollector.querySkipBloomRows.getIfPresent(queryId1)); + Assert.assertNull(BloomFilterSkipCollector.queryFooterReadTime.getIfPresent(queryId1)); + Assert.assertNull(BloomFilterSkipCollector.queryFooterReadNumber.getIfPresent(queryId1)); + for (int i = 0; i < 200; i++) { + BloomFilterSkipCollector.logAndCleanStatus("query-id2"); + } + Assert.assertTrue(BloomFilterSkipCollector.logCounter.get() <= 100); + } +} diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/LogicalViewTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/LogicalViewTest.java index 58d1109a55..9e58a2a4dc 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/LogicalViewTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/LogicalViewTest.java @@ -86,6 +86,6 @@ public class LogicalViewTest extends NLocalWithSparkSessionTest { + " INNER JOIN SSB.CUSTOMER t2 on t1.C_CUSTKEY = t2.C_CUSTKEY "; query.add(Pair.newPair("logical_view", sql1)); ExecAndComp.execAndCompare( - query, getProject(), ExecAndComp.CompareLevel.NONE, "inner"); + query, getProject(), ExecAndComp.CompareLevel.SAME, "inner"); } } diff --git a/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/_global/project/bloomfilter.json b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/_global/project/bloomfilter.json new file mode 100644 index 0000000000..f20eac5496 --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/_global/project/bloomfilter.json @@ -0,0 +1,35 @@ +{ + "uuid" : "24854c55-8298-623e-e64a-42ae6faf1f1a", + "last_modified" : 1677855913210, + "create_time" : 1677855913185, + "version" : "4.0.0.0", + "name" : "bloomfilter", + "owner" : "ADMIN", + "status" : "ENABLED", + "create_time_utc" : 1677855913185, + "default_database" : "DEFAULT", + "description" : "", + "principal" : null, + "keytab" : null, + "maintain_model_type" : "MANUAL_MAINTAIN", + "override_kylin_properties" : { + "kylin.metadata.semi-automatic-mode" : "false", + "kylin.query.metadata.expose-computed-column" : "true", + "kylin.source.default" : "9" + }, + "segment_config" : { + "auto_merge_enabled" : false, + "auto_merge_time_ranges" : [ "WEEK", "MONTH", "QUARTER", "YEAR" ], + "volatile_range" : { + "volatile_range_number" : 0, + "volatile_range_enabled" : false, + "volatile_range_type" : "DAY" + }, + "retention_range" : { + "retention_range_number" : 1, + "retention_range_enabled" : false, + "retention_range_type" : "MONTH" + }, + "create_empty_segment_enabled" : false + } +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/dataflow/c41390c5-b93d-4db3-b167-029874b85a2c.json b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/dataflow/c41390c5-b93d-4db3-b167-029874b85a2c.json new file mode 100644 index 0000000000..d107d4b0d6 --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/dataflow/c41390c5-b93d-4db3-b167-029874b85a2c.json @@ -0,0 +1,13 @@ +{ + "uuid" : "c41390c5-b93d-4db3-b167-029874b85a2c", + "last_modified" : 0, + "create_time" : 1677856069390, + "version" : "4.0.0.0", + "status" : "OFFLINE", + "last_status" : null, + "cost" : 50, + "query_hit_count" : 0, + "last_query_time" : 0, + "layout_query_hit_count" : { }, + "segments" : [ ] +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/index_plan/c41390c5-b93d-4db3-b167-029874b85a2c.json b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/index_plan/c41390c5-b93d-4db3-b167-029874b85a2c.json new file mode 100644 index 0000000000..37895a2385 --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/index_plan/c41390c5-b93d-4db3-b167-029874b85a2c.json @@ -0,0 +1,63 @@ +{ + "uuid" : "c41390c5-b93d-4db3-b167-029874b85a2c", + "last_modified" : 1677856069297, + "create_time" : 1677856069297, + "version" : "4.0.0.0", + "description" : null, + "rule_based_index" : null, + "indexes" : [ { + "id" : 0, + "dimensions" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17 ], + "measures" : [ 100000 ], + "layouts" : [ { + "id" : 1, + "name" : null, + "owner" : null, + "col_order" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 100000 ], + "shard_by_columns" : [ ], + "partition_by_columns" : [ ], + "sort_by_columns" : [ ], + "storage_type" : 20, + "update_time" : 1677856069309, + "manual" : false, + "auto" : false, + "base" : true, + "draft_version" : null, + "index_range" : null + } ], + "next_layout_offset" : 2 + }, { + "id" : 20000000000, + "dimensions" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17 ], + "measures" : [ ], + "layouts" : [ { + "id" : 20000000001, + "name" : null, + "owner" : null, + "col_order" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17 ], + "shard_by_columns" : [ ], + "partition_by_columns" : [ ], + "sort_by_columns" : [ ], + "storage_type" : 20, + "update_time" : 1677856069310, + "manual" : false, + "auto" : false, + "base" : true, + "draft_version" : null, + "index_range" : null + } ], + "next_layout_offset" : 2 + } ], + "override_properties" : { }, + "to_be_deleted_indexes" : [ ], + "auto_merge_time_ranges" : null, + "retention_range" : 0, + "engine_type" : 80, + "next_aggregation_index_id" : 10000, + "next_table_index_id" : 20000010000, + "agg_shard_by_columns" : [ ], + "extend_partition_columns" : [ ], + "layout_bucket_num" : { }, + "approved_additional_recs" : 0, + "approved_removal_recs" : 0 +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/model_desc/c41390c5-b93d-4db3-b167-029874b85a2c.json b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/model_desc/c41390c5-b93d-4db3-b167-029874b85a2c.json new file mode 100644 index 0000000000..8eb8d72b45 --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/model_desc/c41390c5-b93d-4db3-b167-029874b85a2c.json @@ -0,0 +1,158 @@ +{ + "uuid" : "c41390c5-b93d-4db3-b167-029874b85a2c", + "last_modified" : 1677856069295, + "create_time" : 1677856068306, + "version" : "4.0.0.0", + "alias" : "p_lineorder", + "owner" : "ADMIN", + "config_last_modifier" : null, + "config_last_modified" : 0, + "description" : "", + "fact_table" : "SSB.P_LINEORDER", + "fact_table_alias" : null, + "management_type" : "MODEL_BASED", + "join_tables" : [ ], + "filter_condition" : "", + "partition_desc" : { + "partition_date_column" : "P_LINEORDER.LO_ORDERDATE", + "partition_date_start" : 0, + "partition_date_format" : "yyyy-MM-dd", + "partition_type" : "APPEND", + "partition_condition_builder" : "org.apache.kylin.metadata.model.PartitionDesc$DefaultPartitionConditionBuilder" + }, + "capacity" : "MEDIUM", + "segment_config" : { + "auto_merge_enabled" : null, + "auto_merge_time_ranges" : null, + "volatile_range" : null, + "retention_range" : null, + "create_empty_segment_enabled" : false + }, + "data_check_desc" : null, + "semantic_version" : 0, + "storage_type" : 0, + "model_type" : "BATCH", + "all_named_columns" : [ { + "id" : 0, + "name" : "LO_SHIPMODE", + "column" : "P_LINEORDER.LO_SHIPMODE", + "status" : "DIMENSION" + }, { + "id" : 1, + "name" : "LO_LINENUMBER", + "column" : "P_LINEORDER.LO_LINENUMBER", + "status" : "DIMENSION" + }, { + "id" : 2, + "name" : "LO_ORDTOTALPRICE", + "column" : "P_LINEORDER.LO_ORDTOTALPRICE", + "status" : "DIMENSION" + }, { + "id" : 3, + "name" : "LO_SUPPLYCOST", + "column" : "P_LINEORDER.LO_SUPPLYCOST", + "status" : "DIMENSION" + }, { + "id" : 4, + "name" : "LO_SUPPKEY", + "column" : "P_LINEORDER.LO_SUPPKEY", + "status" : "DIMENSION" + }, { + "id" : 5, + "name" : "LO_QUANTITY", + "column" : "P_LINEORDER.LO_QUANTITY", + "status" : "DIMENSION" + }, { + "id" : 6, + "name" : "LO_PARTKEY", + "column" : "P_LINEORDER.LO_PARTKEY", + "status" : "DIMENSION" + }, { + "id" : 7, + "name" : "LO_ORDERKEY", + "column" : "P_LINEORDER.LO_ORDERKEY", + "status" : "DIMENSION" + }, { + "id" : 8, + "name" : "LO_CUSTKEY", + "column" : "P_LINEORDER.LO_CUSTKEY", + "status" : "DIMENSION" + }, { + "id" : 9, + "name" : "LO_SHIPPRIOTITY", + "column" : "P_LINEORDER.LO_SHIPPRIOTITY", + "status" : "DIMENSION" + }, { + "id" : 10, + "name" : "LO_DISCOUNT", + "column" : "P_LINEORDER.LO_DISCOUNT", + "status" : "DIMENSION" + }, { + "id" : 11, + "name" : "LO_ORDERPRIOTITY", + "column" : "P_LINEORDER.LO_ORDERPRIOTITY", + "status" : "DIMENSION" + }, { + "id" : 12, + "name" : "LO_ORDERDATE", + "column" : "P_LINEORDER.LO_ORDERDATE", + "status" : "DIMENSION" + }, { + "id" : 13, + "name" : "LO_REVENUE", + "column" : "P_LINEORDER.LO_REVENUE", + "status" : "DIMENSION" + }, { + "id" : 14, + "name" : "V_REVENUE", + "column" : "P_LINEORDER.V_REVENUE", + "status" : "DIMENSION" + }, { + "id" : 15, + "name" : "LO_COMMITDATE", + "column" : "P_LINEORDER.LO_COMMITDATE", + "status" : "DIMENSION" + }, { + "id" : 16, + "name" : "LO_EXTENDEDPRICE", + "column" : "P_LINEORDER.LO_EXTENDEDPRICE", + "status" : "DIMENSION" + }, { + "id" : 17, + "name" : "LO_TAX", + "column" : "P_LINEORDER.LO_TAX", + "status" : "DIMENSION" + } ], + "all_measures" : [ { + "name" : "COUNT_ALL", + "function" : { + "expression" : "COUNT", + "parameters" : [ { + "type" : "constant", + "value" : "1" + } ], + "returntype" : "bigint" + }, + "column" : null, + "comment" : null, + "id" : 100000, + "type" : "NORMAL", + "internal_ids" : [ ] + } ], + "recommendations_count" : 0, + "computed_columns" : [ ], + "canvas" : { + "coordinate" : { + "P_LINEORDER" : { + "x" : 530.6666395399305, + "y" : 249.00000678168405, + "width" : 220.0, + "height" : 200.0 + } + }, + "zoom" : 9.0 + }, + "multi_partition_desc" : null, + "multi_partition_key_mapping" : null, + "fusion_id" : null +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/table/SSB.CUSTOMER.json b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/table/SSB.CUSTOMER.json new file mode 100644 index 0000000000..188df51654 --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/table/SSB.CUSTOMER.json @@ -0,0 +1,68 @@ +{ + "uuid" : "9481374f-63be-5f0b-5a6d-22c7a561b1d9", + "last_modified" : 0, + "create_time" : 1677856024986, + "version" : "4.0.0.0", + "name" : "CUSTOMER", + "columns" : [ { + "id" : "1", + "name" : "C_CUSTKEY", + "datatype" : "integer", + "case_sensitive_name" : "c_custkey" + }, { + "id" : "2", + "name" : "C_NAME", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_name" + }, { + "id" : "3", + "name" : "C_ADDRESS", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_address" + }, { + "id" : "4", + "name" : "C_CITY", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_city" + }, { + "id" : "5", + "name" : "C_NATION", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_nation" + }, { + "id" : "6", + "name" : "C_REGION", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_region" + }, { + "id" : "7", + "name" : "C_PHONE", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_phone" + }, { + "id" : "8", + "name" : "C_MKTSEGMENT", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "c_mktsegment" + } ], + "source_type" : 9, + "table_type" : "EXTERNAL", + "top" : false, + "increment_loading" : false, + "last_snapshot_path" : null, + "last_snapshot_size" : 0, + "snapshot_last_modified" : 0, + "query_hit_count" : 0, + "partition_column" : null, + "snapshot_partitions" : { }, + "snapshot_partitions_info" : { }, + "snapshot_total_rows" : 0, + "snapshot_partition_col" : null, + "selected_snapshot_partition_col" : null, + "temp_snapshot_path" : null, + "snapshot_has_broken" : false, + "database" : "SSB", + "transactional" : false, + "rangePartition" : false, + "partition_desc" : null +} \ No newline at end of file diff --git a/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/table/SSB.P_LINEORDER.json b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/table/SSB.P_LINEORDER.json new file mode 100644 index 0000000000..f17cb19cc6 --- /dev/null +++ b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/table/SSB.P_LINEORDER.json @@ -0,0 +1,118 @@ +{ + "uuid" : "f0dc3d45-0eb0-26c0-4681-b6c0d0c8e270", + "last_modified" : 0, + "create_time" : 1677856025283, + "version" : "4.0.0.0", + "name" : "P_LINEORDER", + "columns" : [ { + "id" : "1", + "name" : "LO_ORDERKEY", + "datatype" : "bigint", + "case_sensitive_name" : "lo_orderkey" + }, { + "id" : "2", + "name" : "LO_LINENUMBER", + "datatype" : "bigint", + "case_sensitive_name" : "lo_linenumber" + }, { + "id" : "3", + "name" : "LO_CUSTKEY", + "datatype" : "integer", + "case_sensitive_name" : "lo_custkey" + }, { + "id" : "4", + "name" : "LO_PARTKEY", + "datatype" : "integer", + "case_sensitive_name" : "lo_partkey" + }, { + "id" : "5", + "name" : "LO_SUPPKEY", + "datatype" : "integer", + "case_sensitive_name" : "lo_suppkey" + }, { + "id" : "6", + "name" : "LO_ORDERDATE", + "datatype" : "date", + "case_sensitive_name" : "lo_orderdate" + }, { + "id" : "7", + "name" : "LO_ORDERPRIOTITY", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "lo_orderpriotity" + }, { + "id" : "8", + "name" : "LO_SHIPPRIOTITY", + "datatype" : "integer", + "case_sensitive_name" : "lo_shippriotity" + }, { + "id" : "9", + "name" : "LO_QUANTITY", + "datatype" : "bigint", + "case_sensitive_name" : "lo_quantity" + }, { + "id" : "10", + "name" : "LO_EXTENDEDPRICE", + "datatype" : "bigint", + "case_sensitive_name" : "lo_extendedprice" + }, { + "id" : "11", + "name" : "LO_ORDTOTALPRICE", + "datatype" : "bigint", + "case_sensitive_name" : "lo_ordtotalprice" + }, { + "id" : "12", + "name" : "LO_DISCOUNT", + "datatype" : "bigint", + "case_sensitive_name" : "lo_discount" + }, { + "id" : "13", + "name" : "LO_REVENUE", + "datatype" : "bigint", + "case_sensitive_name" : "lo_revenue" + }, { + "id" : "14", + "name" : "LO_SUPPLYCOST", + "datatype" : "bigint", + "case_sensitive_name" : "lo_supplycost" + }, { + "id" : "15", + "name" : "LO_TAX", + "datatype" : "bigint", + "case_sensitive_name" : "lo_tax" + }, { + "id" : "16", + "name" : "LO_COMMITDATE", + "datatype" : "date", + "case_sensitive_name" : "lo_commitdate" + }, { + "id" : "17", + "name" : "LO_SHIPMODE", + "datatype" : "varchar(4096)", + "case_sensitive_name" : "lo_shipmode" + }, { + "id" : "18", + "name" : "V_REVENUE", + "datatype" : "bigint", + "case_sensitive_name" : "v_revenue" + } ], + "source_type" : 9, + "table_type" : "VIEW", + "top" : false, + "increment_loading" : false, + "last_snapshot_path" : null, + "last_snapshot_size" : 0, + "snapshot_last_modified" : 0, + "query_hit_count" : 0, + "partition_column" : null, + "snapshot_partitions" : { }, + "snapshot_partitions_info" : { }, + "snapshot_total_rows" : 0, + "snapshot_partition_col" : null, + "selected_snapshot_partition_col" : null, + "temp_snapshot_path" : null, + "snapshot_has_broken" : false, + "database" : "SSB", + "transactional" : false, + "rangePartition" : false, + "partition_desc" : null +} \ No newline at end of file diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java index c22b2ee0d9..b542c6d77c 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -80,6 +80,7 @@ import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.SetThreadName; +import org.apache.kylin.engine.spark.filter.BloomFilterSkipCollector; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.acl.AclTCR; @@ -428,6 +429,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup errorMsg = errorMsg.length() > maxLength ? errorMsg.substring(0, maxLength) : errorMsg; } + BloomFilterSkipCollector.logAndCleanStatus(QueryContext.current().getQueryId()); LogReport report = new LogReport().put(LogReport.QUERY_ID, QueryContext.current().getQueryId()) .put(LogReport.SQL, sql).put(LogReport.USER, user) .put(LogReport.SUCCESS, null == response.getExceptionMessage()).put(LogReport.DURATION, duration) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala index 0b104eb42a..a6517769fd 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala @@ -32,10 +32,12 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.datasource.storage.{StorageListener, StorageStoreFactory, WriteTaskStats} import org.apache.spark.sql.{Column, Dataset, Row, SparkSession} import org.apache.spark.tracker.BuildContext - import java.util import java.util.Objects import java.util.concurrent.{BlockingQueue, ForkJoinPool, LinkedBlockingQueue, TimeUnit} + +import org.apache.kylin.engine.spark.filter.ParquetBloomFilter + import scala.collection.JavaConverters._ import scala.collection.parallel.ForkJoinTaskSupport @@ -325,7 +327,7 @@ trait SegmentExec extends Logging { case Some(x) => store.setStorageListener(x) case None => } - + ParquetBloomFilter.registerBloomColumnIfNeed(project, dataflowId); val stats = store.save(layout, new Path(storagePath), KapConfig.wrap(config), layoutDS) sparkSession.sparkContext.setJobDescription(null) stats diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/utils/TestJobMetricsUtils.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/utils/TestJobMetricsUtils.scala index bd1bc65bea..50d3e89f96 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/utils/TestJobMetricsUtils.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/utils/TestJobMetricsUtils.scala @@ -20,7 +20,7 @@ package org.apache.kylin.engine.spark.utils import org.apache.commons.io.FileUtils import org.apache.kylin.common.util.RandomUtil -import org.apache.spark.sql.common.{SharedSparkSession, SparderBaseFunSuite} +import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode} @@ -30,7 +30,8 @@ import java.io.File import java.util.concurrent.{Executors, TimeUnit} -class TestJobMetricsUtils extends SparderBaseFunSuite with SharedSparkSession with BeforeAndAfterAll { +class TestJobMetricsUtils extends SparderBaseFunSuite with SharedSparkSession with BeforeAndAfterAll + with LocalMetadata { private val path1 = "./temp1" private val id1 = RandomUtil.randomUUIDStr diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala index 7815e981b0..fc8e6044e1 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala @@ -22,7 +22,8 @@ import java.io.{File, FileOutputStream, OutputStreamWriter} import java.nio.charset.StandardCharsets import java.util.concurrent.atomic.AtomicLong import java.{lang, util} - +import com.google.common.cache.{Cache, CacheBuilder} +import io.kyligence.kap.secondstorage.SecondStorageUtil import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.Path @@ -41,17 +42,13 @@ import org.apache.poi.xssf.usermodel.{XSSFSheet, XSSFWorkbook} import org.apache.spark.SparkConf import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.QueryMetricUtils -import org.apache.spark.sql.util.SparderTypeUtil +import org.apache.spark.sql.util.{SparderConstants, SparderTypeUtil} import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparderEnv} import scala.collection.JavaConverters._ import scala.collection.convert.ImplicitConversions.`iterator asScala` import scala.collection.mutable -import com.google.common.cache.{Cache, CacheBuilder} - -import io.kyligence.kap.secondstorage.SecondStorageUtil - // scalastyle:off object ResultType extends Enumeration { type ResultType = Value @@ -505,7 +502,7 @@ object ResultPlan extends LogEx { } object QueryToExecutionIDCache extends LogEx { - val KYLIN_QUERY_ID_KEY = "kylin.query.id" + val KYLIN_QUERY_ID_KEY = SparderConstants.KYLIN_QUERY_ID_KEY val KYLIN_QUERY_EXECUTION_ID = "kylin.query.execution.id" private val queryID2ExecutionID: Cache[String, String] = diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala index cfb469d547..81934163c3 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala @@ -23,17 +23,20 @@ import java.security.PrivilegedAction import java.util.Map import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{Callable, ExecutorService} + +import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.UserGroupInformation import org.apache.kylin.common.exception.{KylinException, KylinTimeoutException, ServerErrorCode} import org.apache.kylin.common.msg.MsgPicker import org.apache.kylin.common.util.{DefaultHostInfoFetcher, HadoopUtil, S3AUtil} import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext} +import org.apache.kylin.engine.spark.filter.BloomFilterSkipCollector import org.apache.kylin.metadata.model.{NTableMetadataManager, TableExtDesc} import org.apache.kylin.metadata.project.NProjectManager import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerLogRollUp} +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerLogRollUp, SparkListenerTaskEnd} import org.apache.spark.sql.KylinSession._ import org.apache.spark.sql.catalyst.optimizer.ConvertInnerJoinToSemiJoin import org.apache.spark.sql.catalyst.parser.ParseException @@ -242,6 +245,7 @@ object SparderEnv extends Logging { .getContextClassLoader .toString) registerListener(sparkSession.sparkContext) + registerQueryMetrics(sparkSession.sparkContext) APP_MASTER_TRACK_URL = null startSparkFailureTimes = 0 lastStartSparkFailureTime = 0 @@ -289,6 +293,28 @@ object SparderEnv extends Logging { sc.addSparkListener(sparkListener) } + def registerQueryMetrics(sc: SparkContext): Unit = { + if (!KylinConfig.getInstanceFromEnv.isCollectQueryMetricsEnabled) { + return + } + val taskListener = new SparkListener { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + try { + if (StringUtils.isNotBlank(taskEnd.queryId)) { + val inputMetrics = taskEnd.taskMetrics.inputMetrics + BloomFilterSkipCollector.addQueryMetrics(taskEnd.queryId, + inputMetrics.totalBloomBlocks, inputMetrics.totalSkipBloomBlocks, + inputMetrics.totalSkipBloomRows, inputMetrics.footerReadTime, + inputMetrics.footerReadNumber) + } + } catch { + case e: Throwable => logWarning("error when add metrics for query", e) + } + } + } + sc.addSparkListener(taskListener) + } + /** * @param sqlText SQL to be validated * @return The logical plan diff --git a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/BloomFilterSkipCollector.java b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/BloomFilterSkipCollector.java new file mode 100644 index 0000000000..65e9d41ae8 --- /dev/null +++ b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/BloomFilterSkipCollector.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.filter; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AutoReadWriteLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +public class BloomFilterSkipCollector { + public static final Logger LOGGER = LoggerFactory.getLogger(BloomFilterSkipCollector.class); + + // use guava cache is for auto clean even if there are something went wrong in query + public static final Cache<String, AtomicLong> queryTotalBloomBlocks = CacheBuilder + .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); + public static final Cache<String, AtomicLong> querySkipBloomBlocks = CacheBuilder + .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); + public static final Cache<String, AtomicLong> querySkipBloomRows = CacheBuilder + .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); + public static final Cache<String, AtomicLong> queryFooterReadTime = CacheBuilder + .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); + public static final Cache<String, AtomicLong> queryFooterReadNumber = CacheBuilder + .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); + + public static final AtomicLong logCounter = new AtomicLong(0); + private static final AtomicLong globalBloomBlocks = new AtomicLong(0); + private static final AtomicLong globalSkipBloomBlocks = new AtomicLong(0); + + private static final AutoReadWriteLock LOCK = new AutoReadWriteLock(new ReentrantReadWriteLock()); + + public static void addQueryMetrics( + String queryId, long totalBloomBlocks, + long skipBloomBlocks, long skipBloomRows, long footerReadTime, long footerReadNumber) { + long start = System.currentTimeMillis(); + try (AutoReadWriteLock.AutoLock writeLock = LOCK.lockForWrite()) { + addQueryCounter(queryId, queryTotalBloomBlocks, totalBloomBlocks); + addQueryCounter(queryId, querySkipBloomBlocks, skipBloomBlocks); + addQueryCounter(queryId, querySkipBloomRows, skipBloomRows); + addQueryCounter(queryId, queryFooterReadTime, footerReadTime); + addQueryCounter(queryId, queryFooterReadNumber, footerReadNumber); + globalBloomBlocks.addAndGet(totalBloomBlocks); + globalSkipBloomBlocks.addAndGet(skipBloomBlocks); + } catch (Exception e) { + LOGGER.error("Error when add query metrics.", e); + } + long end = System.currentTimeMillis(); + if ((end - start) > 100) { + LOGGER.warn("BloomFilter collector cost too much time: {} ms ", (end - start)); + } + } + + private static void addQueryCounter(String queryId, + Cache<String, AtomicLong> counter, long step) throws ExecutionException { + AtomicLong totalBlocks = counter.get(queryId, () -> new AtomicLong(0L)); + totalBlocks.addAndGet(step); + } + + public static void logAndCleanStatus(String queryId) { + try { + AtomicLong readTime = queryFooterReadTime.get(queryId, () -> new AtomicLong(0L)); + AtomicLong readNumber = queryFooterReadNumber.get(queryId, () -> new AtomicLong(1L)); + if (readNumber.get() > 0L && readTime.get() > 0L) { + LOGGER.info("Reading footer avg time is {}, total read time is {}, number of row groups is {}", + readTime.get() / readNumber.get(), readTime.get(), readNumber); + } + AtomicLong totalBloomBlocks = queryTotalBloomBlocks.getIfPresent(queryId); + if (KylinConfig.getInstanceFromEnv().isBloomCollectFilterEnabled() + && totalBloomBlocks != null && totalBloomBlocks.get() > 0) { + AtomicLong skipBlocks = querySkipBloomBlocks.get(queryId, () -> new AtomicLong(0L)); + AtomicLong skipRows = querySkipBloomRows.get(queryId, () -> new AtomicLong(0L)); + LOGGER.info("BloomFilter total bloom blocks is {}, skip bloom blocks is {}, skip rows is {}", + totalBloomBlocks.get(), skipBlocks.get(), skipRows.get()); + } + queryFooterReadTime.invalidate(queryId); + queryFooterReadNumber.invalidate(queryId); + queryTotalBloomBlocks.invalidate(queryId); + querySkipBloomBlocks.invalidate(queryId); + querySkipBloomRows.invalidate(queryId); + logCounter.incrementAndGet(); + if (logCounter.get() >= 100) { + LOGGER.info("Global BloomFilter total bloom blocks is {}, " + + " skip bloom blocks is {}", + globalBloomBlocks.get(), globalSkipBloomBlocks.get()); + logCounter.set(0); + } + if (globalBloomBlocks.get() < 0 || globalSkipBloomBlocks.get() < 0) { + // globalBloomBlocks number > Long.MAX_VALUE, almost impossible to get here + globalBloomBlocks.set(0); + globalSkipBloomBlocks.set(0); + } + } catch (ExecutionException e) { + LOGGER.error("Error when log query metrics.", e); + } + } + + private BloomFilterSkipCollector() { + } +} diff --git a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java new file mode 100644 index 0000000000..890edcfef1 --- /dev/null +++ b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.filter; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.JsonUtil; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +public class ParquetBloomFilter { + + public static final Logger LOGGER = LoggerFactory.getLogger(ParquetBloomFilter.class); + + private static final SortedSet<ColumnFilter> columnFilters = new TreeSet<>(); + private static boolean loaded = false; + private static final List<String> buildBloomColumns = Lists.newArrayList(); + + public static void registerBloomColumnIfNeed(String project, String modelId) { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + if (!config.isBloomBuildEnabled()) { + return; + } + if (StringUtils.isBlank(project) || StringUtils.isBlank(modelId)) { + // won't happen + return; + } + if (loaded) { + return; + } + try { + project = project.toUpperCase(); + FileSystem fs = HadoopUtil.getFileSystem(config.getHdfsWorkingDirectory()); + Path filterInfo = new Path(QueryFiltersCollector.FILTER_STORAGE_PATH); + if (!fs.exists(filterInfo)) { + loaded = true; + return; + } + FileStatus[] hostsDir = fs.listStatus(new Path(QueryFiltersCollector.FILTER_STORAGE_PATH)); + HashMap<String, Integer> columnsHits = Maps.newHashMap(); + for (FileStatus host : hostsDir) { + String hostName = host.getPath().getName(); + Path projectFiltersFile = QueryFiltersCollector.getProjectFiltersFile(hostName, project); + Map<String, Map<String, Integer>> modelColumns = JsonUtil.readValue( + HadoopUtil.readStringFromHdfs(fs, projectFiltersFile), Map.class); + if (modelColumns.containsKey(modelId)) { + modelColumns.get(modelId).forEach((column, hit) -> { + int originHit = columnsHits.getOrDefault(column, 0); + columnsHits.put(column, originHit + hit); + }); + } + } + columnsHits.forEach((column, hit) -> columnFilters.add(new ColumnFilter(column, hit))); + String columnFiltersLog = Arrays.toString(columnFilters.toArray()); + LOGGER.info("register BloomFilter info : {}", columnFiltersLog); + } catch (Exception e) { + LOGGER.error("Error when register BloomFilter.", e); + } + loaded = true; + } + + public static void configBloomColumnIfNeed(Dataset<Row> data, DataFrameWriter<Row> dataWriter) { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + if (!config.isBloomBuildEnabled()) { + return; + } + String manualColumn = config.getBloomBuildColumn(); + if (StringUtils.isNotBlank(manualColumn)) { + String[] blooms = manualColumn.split("#"); + for (int i = 0; i < blooms.length; i += 2) { + String nvd = blooms[i + 1]; + dataWriter.option("parquet.bloom.filter.enabled#" + blooms[i], "true"); + dataWriter.option("parquet.bloom.filter.expected.ndv#" + blooms[i], nvd); + LOGGER.info("build BloomFilter info: columnId is {}, nvd is {}", blooms[i], nvd); + buildBloomColumns.add(blooms[i]); + } + return; + } + Set<String> columns = Arrays.stream(data.columns()).collect(Collectors.toSet()); + Set<ColumnFilter> dataColumns = columnFilters.stream() + .filter(column -> columns.contains(column.columnId)).collect(Collectors.toSet()); + int count = 0; + for (ColumnFilter columnFilter : dataColumns) { + if (count >= config.getBloomBuildColumnMaxNum()) { + break; + } + dataWriter.option("parquet.bloom.filter.enabled#" + columnFilter.columnId, "true"); + dataWriter.option("parquet.bloom.filter.expected.ndv#" + columnFilter.columnId, config.getBloomBuildColumnNvd()); + buildBloomColumns.add(columnFilter.columnId); + LOGGER.info("building BloomFilter : columnId is {}; nvd is {}", + columnFilter.columnId, config.getBloomBuildColumnNvd()); + count++; + } + } + + private ParquetBloomFilter() { + } + + // Only for Unit Test + public static void resetParquetBloomFilter() { + ParquetBloomFilter.loaded = false; + ParquetBloomFilter.buildBloomColumns.clear(); + ParquetBloomFilter.columnFilters.clear(); + } + + public static List<String> getBuildBloomColumns() { + return buildBloomColumns; + } + + public static boolean isLoaded() { + return loaded; + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class ColumnFilter implements Comparable<ColumnFilter> { + private String columnId; + private int hit; + + @Override + public int compareTo(ColumnFilter o) { + if (o.hit != this.hit) { + return Integer.compare(o.hit, this.hit); + } + return o.columnId.compareTo(this.columnId); + } + } +} diff --git a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java new file mode 100644 index 0000000000..9066321403 --- /dev/null +++ b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.filter; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AddressUtil; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.NamedThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +public class QueryFiltersCollector { + + public static final Logger LOGGER = LoggerFactory.getLogger(QueryFiltersCollector.class); + + // To reduce HDFS storage, only use map to record it. + // schema: <project, <modelId, <columnId, filter_hit_number>> + protected static final ConcurrentMap<String, Map<String, Map<String, Integer>>> currentQueryFilters = + Maps.newConcurrentMap(); + + public static final ScheduledExecutorService executor = + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("query-filter-collector")); + + public static final String SERVER_HOST = AddressUtil.getLocalServerInfo(); + + public static final String FILTER_STORAGE_PATH = + KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/query_filter/"; + + + public static void increaseHit(String project, String modelId, String columnId) { + if (!KylinConfig.getInstanceFromEnv().isBloomCollectFilterEnabled()) { + return; + } + project = project.toUpperCase(); + Map<String, Integer> modelFilters = getModelFilters(project, modelId); + int hit = modelFilters.getOrDefault(columnId, 0); + modelFilters.put(columnId, ++hit); + } + + public static void initScheduler() { + if (!KylinConfig.getInstanceFromEnv().isBloomCollectFilterEnabled()) { + return; + } + executor.scheduleAtFixedRate(() -> { + long startTime = System.currentTimeMillis(); + LOGGER.info("Start sync query filters, current query filters is " + currentQueryFilters); + try { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + FileSystem fs = HadoopUtil.getFileSystem(config.getHdfsWorkingDirectory()); + currentQueryFilters.forEach((project, currentFilters) -> { + try { + Path projectFilterPath = getProjectFiltersFile(SERVER_HOST, project); + Map<String, Map<String, Integer>> mergedHistory; + if (!fs.exists(projectFilterPath)) { + mergedHistory = currentFilters; + } else { + mergedHistory = mergeHistory(fs, currentFilters, projectFilterPath); + } + HadoopUtil.writeStringToHdfs(fs, JsonUtil.writeValueAsString(mergedHistory), projectFilterPath); + currentQueryFilters.remove(project); + } catch (IOException e) { + LOGGER.error("Error when sync query filters for project : " + project, e); + } + }); + long endTime = System.currentTimeMillis(); + LOGGER.info("Sync query filters success. cost time " + (endTime - startTime) + " ms." + + " the failed filters maybe " + currentQueryFilters); + } catch (Throwable e) { + LOGGER.error("Error when sync query filters...", e); + } + }, 0, KylinConfig.getInstanceFromEnv().getQueryFilterCollectInterval(), TimeUnit.SECONDS); + } + + private static Map<String, Map<String, Integer>> mergeHistory(FileSystem fs, + Map<String, Map<String, Integer>> currentFilters, Path projectFilterPath) throws IOException { + Map<String, Map<String, Integer>> history = JsonUtil.readValue( + HadoopUtil.readStringFromHdfs(fs, projectFilterPath), Map.class); + currentFilters.forEach((currentModel, currentColumns) -> { + if (!history.containsKey(currentModel)) { + history.put(currentModel, currentColumns); + } else { + Map<String, Integer> historyColumns = history.get(currentModel); + currentColumns.forEach((column, hit) -> { + Integer oriHit = historyColumns.getOrDefault(column, 0); + if (oriHit < 0) { + // hit number > Integer.MAX_VALUE, almost impossible to get here + oriHit = Integer.MAX_VALUE / 2; + } + historyColumns.put(column, oriHit + hit); + }); + } + }); + return history; + } + + public static void destoryScheduler() { + executor.shutdown(); + } + + // schema: <modelId, <columnId, filter_hit_number>> + private static Map<String, Map<String, Integer>> getProjectFilters(String project) { + project = project.toUpperCase(); + currentQueryFilters.computeIfAbsent(project, key -> Maps.newConcurrentMap()); + return currentQueryFilters.get(project); + } + + // schema: <columnId, filter_hit_number> + private static Map<String, Integer> getModelFilters(String project, String modelId) { + project = project.toUpperCase(); + Map<String, Map<String, Integer>> projectFilters = getProjectFilters(project); + projectFilters.computeIfAbsent(modelId, key -> Maps.newConcurrentMap()); + return projectFilters.get(modelId); + } + + public static Path getProjectFiltersFile(String host, String project) { + project = project.toUpperCase(); + return new Path(FILTER_STORAGE_PATH + host + "/" + project); + } + + private QueryFiltersCollector() { + + } +} diff --git a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java index 240c55b564..cd26d5c514 100644 --- a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java +++ b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java @@ -24,10 +24,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.util.HadoopUtil; import org.apache.spark.sql.Column; +import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; + +import org.apache.kylin.engine.spark.filter.ParquetBloomFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,8 +151,9 @@ public class Repartitioner { data = ss.read().parquet(inputPath).repartition(repartitionNum) .sortWithinPartitions(convertIntegerToColumns(sortByColumns)); } - - data.write().mode(SaveMode.Overwrite).parquet(outputPath); + DataFrameWriter<Row> writer = data.write().mode(SaveMode.Overwrite); + ParquetBloomFilter.configBloomColumnIfNeed(data, writer); + writer.parquet(outputPath); if (needRepartitionForShardByColumns()) { if (optimizeShardEnabled) ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.skewRepartition.enabled", null); diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/kylin/engine/spark/utils/StorageUtils.scala b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/engine/spark/utils/StorageUtils.scala index 6a2a5238d8..b6ec7d6969 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/kylin/engine/spark/utils/StorageUtils.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/engine/spark/utils/StorageUtils.scala @@ -17,18 +17,20 @@ package org.apache.kylin.engine.spark.utils -import org.apache.kylin.metadata.cube.model.LayoutEntity -import org.apache.kylin.metadata.model.NDataModel +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.kylin.common.KapConfig import org.apache.kylin.common.util.{HadoopUtil, JsonUtil, RandomUtil} +import org.apache.kylin.engine.spark.filter.ParquetBloomFilter import org.apache.kylin.measure.bitmap.BitmapMeasureType +import org.apache.kylin.metadata.cube.model.LayoutEntity +import org.apache.kylin.metadata.model.NDataModel + import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} -import scala.collection.JavaConverters._ - object StorageUtils extends Logging { val MB: Long = 1024 * 1024 @@ -72,7 +74,9 @@ object StorageUtils extends Logging { def writeWithMetrics(data: DataFrame, path: String): JobMetrics = { withMetrics(data.sparkSession) { - data.write.mode(SaveMode.Overwrite).parquet(path) + val writer = data.write.mode(SaveMode.Overwrite) + ParquetBloomFilter.configBloomColumnIfNeed(data, writer) + writer.parquet(path) } } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderConstants.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderConstants.scala index 025f9e785b..ac548e4bae 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderConstants.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderConstants.scala @@ -45,4 +45,5 @@ object SparderConstants { val KYLIN_CONF = "kylin_conf" val PARQUET_FILE_CUBE_TYPE = "cube" val DATE_DICT = "date_dict" + val KYLIN_QUERY_ID_KEY = "kylin.query.id" } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala index b97d402217..e99a17a2fb 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala @@ -18,12 +18,15 @@ package org.apache.spark.sql.execution.datasource -import io.kyligence.kap.guava20.shaded.common.collect.Sets +import com.google.common.collect.Sets +import java.sql.{Date, Timestamp} +import java.util import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.kylin.common.exception.TargetSegmentNotFoundException import org.apache.kylin.common.util.{DateFormat, HadoopUtil} import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext} +import org.apache.kylin.engine.spark.filter.QueryFiltersCollector.increaseHit import org.apache.kylin.engine.spark.utils.{LogEx, LogUtils} import org.apache.kylin.metadata.cube.model.{DimensionRangeInfo, LayoutEntity, NDataflow, NDataflowManager} import org.apache.kylin.metadata.datatype.DataType @@ -39,8 +42,6 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.util.collection.BitSet -import java.sql.{Date, Timestamp} -import java.util import scala.collection.JavaConverters._ import scala.collection.mutable @@ -390,6 +391,8 @@ class FilePruner(val session: SparkSession, private def pruneSegmentsDimRange(filters: Seq[Expression], segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = { + val hitColumns = Sets.newHashSet[String]() + val project = options.getOrElse("project", "") val filteredStatuses = if (filters.isEmpty) { segDirs } else { @@ -400,7 +403,8 @@ class FilePruner(val session: SparkSession, e => { val dimRange = dataflow.getSegment(e.segmentID).getDimensionRangeInfoMap if (dimRange != null && !dimRange.isEmpty) { - SegDimFilters(dimRange, dataflow.getIndexPlan.getEffectiveDimCols).foldFilter(reducedFilter) match { + SegDimFilters(dimRange, dataflow.getIndexPlan.getEffectiveDimCols, dataflow.getId, project, hitColumns) + .foldFilter(reducedFilter) match { case Trivial(true) => true case Trivial(false) => false } @@ -410,6 +414,7 @@ class FilePruner(val session: SparkSession, } } } + hitColumns.forEach(col => increaseHit(project, dataflow.getId, col)) filteredStatuses } @@ -729,7 +734,8 @@ abstract class PushableColumnBase { } -case class SegDimFilters(dimRange: java.util.Map[String, DimensionRangeInfo], dimCols: java.util.Map[Integer, TblColRef]) extends Logging { +case class SegDimFilters(dimRange: java.util.Map[String, DimensionRangeInfo], dimCols: java.util.Map[Integer, TblColRef], + dataflowId: String, project: String, hitColumns: java.util.Set[String]) extends Logging { private def insurance(id: String, value: Any) (func: Any => Filter): Filter = { @@ -759,6 +765,7 @@ case class SegDimFilters(dimRange: java.util.Map[String, DimensionRangeInfo], di filter match { case EqualTo(id, value: Any) => val col = escapeQuote(id) + hitColumns.add(col) insurance(col, value) { ts => { val dataType = getDataType(col, value) @@ -768,6 +775,7 @@ case class SegDimFilters(dimRange: java.util.Map[String, DimensionRangeInfo], di } case In(id, values: Array[Any]) => val col = escapeQuote(id) + hitColumns.add(col) val satisfied = values.map(v => insurance(col, v) { ts => { val dataType = getDataType(col, v)
