This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 03a44dc0c0da5030fdef47ca53abe3e6d77f6b23
Author: ChenliangLu <[email protected]>
AuthorDate: Thu Mar 16 13:58:28 2023 +0800

    KYLIN-5564 Support building BloomFilters for the desired columns
---
 pom.xml                                            |   2 +-
 .../apache/kylin/rest/config/AppInitializer.java   |   4 +
 .../org/apache/kylin/common/KylinConfigBase.java   |  28 +++
 .../org/apache/kylin/newten/BloomFilterTest.java   | 218 +++++++++++++++++++++
 .../org/apache/kylin/newten/LogicalViewTest.java   |   2 +-
 .../metadata/_global/project/bloomfilter.json      |  35 ++++
 .../c41390c5-b93d-4db3-b167-029874b85a2c.json      |  13 ++
 .../c41390c5-b93d-4db3-b167-029874b85a2c.json      |  63 ++++++
 .../c41390c5-b93d-4db3-b167-029874b85a2c.json      | 158 +++++++++++++++
 .../metadata/bloomfilter/table/SSB.CUSTOMER.json   |  68 +++++++
 .../bloomfilter/table/SSB.P_LINEORDER.json         | 118 +++++++++++
 .../apache/kylin/rest/service/QueryService.java    |   2 +
 .../kylin/engine/spark/job/SegmentExec.scala       |   6 +-
 .../engine/spark/utils/TestJobMetricsUtils.scala   |   5 +-
 .../kylin/query/runtime/plan/ResultPlan.scala      |  11 +-
 .../scala/org/apache/spark/sql/SparderEnv.scala    |  28 ++-
 .../spark/filter/BloomFilterSkipCollector.java     | 122 ++++++++++++
 .../engine/spark/filter/ParquetBloomFilter.java    | 168 ++++++++++++++++
 .../engine/spark/filter/QueryFiltersCollector.java | 151 ++++++++++++++
 .../kylin/engine/spark/utils/Repartitioner.java    |   8 +-
 .../kylin/engine/spark/utils/StorageUtils.scala    |  14 +-
 .../org/apache/spark/sql/SparderConstants.scala    |   1 +
 .../sql/execution/datasource/FilePruner.scala      |  18 +-
 23 files changed, 1217 insertions(+), 26 deletions(-)

diff --git a/pom.xml b/pom.xml
index 2f827a3c5f..f6663758f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -314,7 +314,7 @@
         <zkclient.version>0.8</zkclient.version>
         <grpc.version>1.0.2</grpc.version>
         <fastPFOR.version>0.0.13</fastPFOR.version>
-        <parquet.version>1.12.2-kylin-r1</parquet.version>
+        <parquet.version>1.12.2-kylin-r3</parquet.version>
         <quartz.version>2.1.1</quartz.version>
         <janino.version>3.0.9</janino.version>
 
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
index 5e7cbd8814..c934e45bce 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/AppInitializer.java
@@ -35,6 +35,7 @@ import 
org.apache.kylin.common.persistence.transaction.EventListenerRegistry;
 import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.common.util.AddressUtil;
 import org.apache.kylin.common.util.HostInfoFetcher;
+import org.apache.kylin.engine.spark.filter.QueryFiltersCollector;
 import org.apache.kylin.engine.spark.utils.SparkJobFactoryUtils;
 import org.apache.kylin.metadata.epoch.EpochOrchestrator;
 import org.apache.kylin.metadata.project.NProjectLoader;
@@ -138,6 +139,9 @@ public class AppInitializer {
             } else {
                 context.publishEvent(new SparderStartEvent.SyncEvent(context));
             }
+            if (kylinConfig.isBloomCollectFilterEnabled()) {
+                QueryFiltersCollector.initScheduler();
+            }
         }
         EventBusFactory.getInstance().register(new ProcessStatusListener(), 
true);
         // register acl update listener
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 6d1e60608b..387433edd7 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3902,4 +3902,32 @@ public abstract class KylinConfigBase implements 
Serializable {
     public boolean isIndexEnableOperatorDesign() {
         return 
Boolean.parseBoolean(getOptional("kylin.index.enable-operator-design", FALSE));
     }
+
+    public int getQueryFilterCollectInterval() {
+        return 
Integer.parseInt(getOptional("kylin.query.filter.collect-interval", "1800"));
+    }
+
+    public boolean isBloomCollectFilterEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.bloom.collect-filter.enabled", TRUE));
+    }
+
+    public boolean isCollectQueryMetricsEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.query.collect-metrics.enabled", TRUE));
+    }
+
+    public boolean isBloomBuildEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.bloom.build.enabled", 
FALSE));
+    }
+
+    public int getBloomBuildColumnMaxNum() {
+        return 
Integer.parseInt(getOptional("kylin.bloom.build.column.max-size", "3"));
+    }
+
+    public String getBloomBuildColumn() {
+        return getOptional("kylin.bloom.build.column", "");
+    }
+
+    public int getBloomBuildColumnNvd() {
+        return Integer.parseInt(getOptional("kylin.bloom.build.column.nvd", 
"200000"));
+    }
 }
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java
new file mode 100644
index 0000000000..3241711864
--- /dev/null
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/BloomFilterTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kylin.newten;
+
+import static 
org.apache.kylin.engine.spark.filter.QueryFiltersCollector.SERVER_HOST;
+import static 
org.apache.kylin.engine.spark.filter.QueryFiltersCollector.getProjectFiltersFile;
+import static org.awaitility.Awaitility.await;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.common.util.TempMetadataBuilder;
+import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
+import org.apache.kylin.engine.spark.filter.BloomFilterSkipCollector;
+import org.apache.kylin.engine.spark.filter.ParquetBloomFilter;
+import org.apache.kylin.engine.spark.filter.QueryFiltersCollector;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.junit.TimeZoneTestRunner;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.util.ExecAndComp;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper;
+import org.apache.spark.sql.internal.StaticSQLConf;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.sparkproject.guava.collect.Sets;
+
+@RunWith(TimeZoneTestRunner.class)
+public class BloomFilterTest extends NLocalWithSparkSessionTest implements 
AdaptiveSparkPlanHelper {
+
+    private NDataflowManager dfMgr = null;
+
+    @BeforeClass
+    public static void initSpark() {
+        if (Shell.MAC) {
+            overwriteSystemPropBeforeClass("org.xerial.snappy.lib.name", 
"libsnappyjava.jnilib");//for snappy
+        }
+        if (ss != null && !ss.sparkContext().isStopped()) {
+            ss.stop();
+        }
+        sparkConf = new 
SparkConf().setAppName(RandomUtil.randomUUIDStr()).setMaster("local[2]");
+        sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.JavaSerializer");
+        sparkConf.set(StaticSQLConf.CATALOG_IMPLEMENTATION().key(), 
"in-memory");
+        sparkConf.set("spark.sql.shuffle.partitions", "1");
+        sparkConf.set("spark.memory.fraction", "0.1");
+        sparkConf.set(StaticSQLConf.WAREHOUSE_PATH().key(),
+                TempMetadataBuilder.TEMP_TEST_METADATA + "/spark-warehouse");
+        ss = SparkSession.builder().config(sparkConf).getOrCreate();
+        SparderEnv.setSparkSession(ss);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1");
+        overwriteSystemProp("kylin.bloom.collect-filter.enabled", "true");
+        overwriteSystemProp("kylin.bloom.build.enabled", "true");
+        overwriteSystemProp("kylin.query.filter.collect-interval", "10");
+        this.createTestMetadata("src/test/resources/ut_meta/bloomfilter");
+        NDefaultScheduler scheduler = 
NDefaultScheduler.getInstance(getProject());
+        scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+        QueryFiltersCollector.initScheduler();
+        dfMgr = NDataflowManager.getInstance(getTestConfig(), getProject());
+    }
+
+    @After
+    public void after() throws Exception {
+        NDefaultScheduler.destroyInstance();
+        QueryFiltersCollector.destoryScheduler();
+        cleanupTestMetadata();
+    }
+
+    public String getProject() {
+        return "bloomfilter";
+    }
+
+    @Test
+    public void testBuildBloomFilter() throws Exception {
+        String dfID = "c41390c5-b93d-4db3-b167-029874b85a2c";
+        NDataflow dataflow = dfMgr.getDataflow(dfID);
+        LayoutEntity layout = 
dataflow.getIndexPlan().getLayoutEntity(20000000001L);
+        Assert.assertNotNull(layout);
+        populateSSWithCSVData(getTestConfig(), getProject(), 
SparderEnv.getSparkSession());
+
+        overwriteSystemProp("kylin.bloom.build.column", "0#10000#1#10000");
+        indexDataConstructor.buildIndex(dfID, 
SegmentRange.TimePartitionedSegmentRange.createInfinite(),
+                Sets.newHashSet(
+                        
dataflow.getIndexPlan().getLayoutEntity(20000000001L)), true);
+        // In our PR UT environment, building job may not be executed when 
there has cache,
+        // so we will ignore it if job is skipped
+        if (ParquetBloomFilter.isLoaded()) {
+            // set BloomFilter to build manually, see 
"kylin.bloom.build.column"
+            
Assert.assertTrue(ParquetBloomFilter.getBuildBloomColumns().contains("0"));
+            
Assert.assertTrue(ParquetBloomFilter.getBuildBloomColumns().contains("1"));
+        }
+
+        List<Pair<String, String>> query = new ArrayList<>();
+        String sql1 = "select * from SSB.P_LINEORDER where LO_CUSTKEY in 
(13,8) and LO_SHIPPRIOTITY = 0 ";
+        query.add(Pair.newPair("bloomfilter", sql1));
+        ExecAndComp.execAndCompare(query, getProject(), 
ExecAndComp.CompareLevel.NONE, "inner");
+        Path projectFilterPath = getProjectFiltersFile(SERVER_HOST, 
getProject());
+        FileSystem fs = 
HadoopUtil.getFileSystem(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
+        // wait until `QueryFiltersCollector` record filter info
+        await().atMost(120, TimeUnit.SECONDS).until(() -> {
+            try {
+                if (!fs.exists(projectFilterPath)) {
+                    return false;
+                }
+            } catch (Exception e) {
+                return false;
+            }
+            return true;
+        });
+        Map<String, Map<String, Integer>> history = JsonUtil.readValue(
+                HadoopUtil.readStringFromHdfs(fs, projectFilterPath), 
Map.class);
+        Assert.assertTrue(history.get(dfID).keySet().contains("8"));
+        Assert.assertTrue(history.get(dfID).keySet().contains("9"));
+        Integer hitNum = history.get(dfID).get("8");
+        Assert.assertTrue(fs.exists(projectFilterPath));
+        String sql2 = "select * from SSB.P_LINEORDER where LO_CUSTKEY in 
(13,8)";
+        query.add(Pair.newPair("bloomfilter", sql2));
+        ExecAndComp.execAndCompare(query, getProject(), 
ExecAndComp.CompareLevel.NONE, "inner");
+
+        await().atMost(120, TimeUnit.SECONDS).until(() -> {
+            try {
+                Map<String, Map<String, Integer>> newHistory = 
JsonUtil.readValue(
+                        HadoopUtil.readStringFromHdfs(fs, projectFilterPath), 
Map.class);
+                Integer newHitNum = newHistory.get(dfID).get("8");
+                if (newHitNum <= hitNum) {
+                    return false;
+                }
+            } catch (Exception e) {
+                return false;
+            }
+            return true;
+        });
+
+        overwriteSystemProp("kylin.bloom.build.column", "");
+        overwriteSystemProp("kylin.bloom.build.column.max-size", "1");
+        ParquetBloomFilter.resetParquetBloomFilter();
+        indexDataConstructor.buildIndex(dfID, 
SegmentRange.TimePartitionedSegmentRange.createInfinite(),
+                
Sets.newHashSet(dataflow.getIndexPlan().getLayoutEntity(20000000001L)), true);
+        // In our PR UT environment, building job may not be executed when 
there has cache,
+        // so we will ignore it if job is skipped
+        if (ParquetBloomFilter.isLoaded()) {
+            // build BloomFilter according to query statics
+            
Assert.assertTrue(ParquetBloomFilter.getBuildBloomColumns().contains("8"));
+            // `kylin.bloom.build.column.max-size=1`, only build one column
+            
Assert.assertFalse(ParquetBloomFilter.getBuildBloomColumns().contains("9"));
+        }
+        ExecAndComp.execAndCompare(query, getProject(), 
ExecAndComp.CompareLevel.SAME, "inner");
+
+        testBloomFilterSkipCollector();
+    }
+
+    private void testBloomFilterSkipCollector() {
+        String queryId1 = "query-id1";
+        BloomFilterSkipCollector.addQueryMetrics(queryId1, 3,
+                2, 20, 100, 1);
+        BloomFilterSkipCollector.addQueryMetrics(queryId1, 1,
+                1, 10, 100, 1);
+        Assert.assertEquals(4L, 
BloomFilterSkipCollector.queryTotalBloomBlocks.getIfPresent(queryId1).get());
+        Assert.assertEquals(3L, 
BloomFilterSkipCollector.querySkipBloomBlocks.getIfPresent(queryId1).get());
+        Assert.assertEquals(30L, 
BloomFilterSkipCollector.querySkipBloomRows.getIfPresent(queryId1).get());
+        Assert.assertEquals(200L, 
BloomFilterSkipCollector.queryFooterReadTime.getIfPresent(queryId1).get());
+        Assert.assertEquals(2L, 
BloomFilterSkipCollector.queryFooterReadNumber.getIfPresent(queryId1).get());
+        BloomFilterSkipCollector.logAndCleanStatus(queryId1);
+        BloomFilterSkipCollector.logAndCleanStatus("query-id2");
+        
Assert.assertNull(BloomFilterSkipCollector.queryTotalBloomBlocks.getIfPresent(queryId1));
+        
Assert.assertNull(BloomFilterSkipCollector.querySkipBloomBlocks.getIfPresent(queryId1));
+        
Assert.assertNull(BloomFilterSkipCollector.querySkipBloomRows.getIfPresent(queryId1));
+        
Assert.assertNull(BloomFilterSkipCollector.queryFooterReadTime.getIfPresent(queryId1));
+        
Assert.assertNull(BloomFilterSkipCollector.queryFooterReadNumber.getIfPresent(queryId1));
+        for (int i = 0; i < 200; i++) {
+            BloomFilterSkipCollector.logAndCleanStatus("query-id2");
+        }
+        Assert.assertTrue(BloomFilterSkipCollector.logCounter.get() <= 100);
+    }
+}
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/LogicalViewTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/LogicalViewTest.java
index 58d1109a55..9e58a2a4dc 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/newten/LogicalViewTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/LogicalViewTest.java
@@ -86,6 +86,6 @@ public class LogicalViewTest extends 
NLocalWithSparkSessionTest {
         + " INNER JOIN SSB.CUSTOMER t2 on t1.C_CUSTKEY = t2.C_CUSTKEY ";
     query.add(Pair.newPair("logical_view", sql1));
     ExecAndComp.execAndCompare(
-        query, getProject(), ExecAndComp.CompareLevel.NONE, "inner");
+        query, getProject(), ExecAndComp.CompareLevel.SAME, "inner");
   }
 }
diff --git 
a/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/_global/project/bloomfilter.json
 
b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/_global/project/bloomfilter.json
new file mode 100644
index 0000000000..f20eac5496
--- /dev/null
+++ 
b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/_global/project/bloomfilter.json
@@ -0,0 +1,35 @@
+{
+  "uuid" : "24854c55-8298-623e-e64a-42ae6faf1f1a",
+  "last_modified" : 1677855913210,
+  "create_time" : 1677855913185,
+  "version" : "4.0.0.0",
+  "name" : "bloomfilter",
+  "owner" : "ADMIN",
+  "status" : "ENABLED",
+  "create_time_utc" : 1677855913185,
+  "default_database" : "DEFAULT",
+  "description" : "",
+  "principal" : null,
+  "keytab" : null,
+  "maintain_model_type" : "MANUAL_MAINTAIN",
+  "override_kylin_properties" : {
+    "kylin.metadata.semi-automatic-mode" : "false",
+    "kylin.query.metadata.expose-computed-column" : "true",
+    "kylin.source.default" : "9"
+  },
+  "segment_config" : {
+    "auto_merge_enabled" : false,
+    "auto_merge_time_ranges" : [ "WEEK", "MONTH", "QUARTER", "YEAR" ],
+    "volatile_range" : {
+      "volatile_range_number" : 0,
+      "volatile_range_enabled" : false,
+      "volatile_range_type" : "DAY"
+    },
+    "retention_range" : {
+      "retention_range_number" : 1,
+      "retention_range_enabled" : false,
+      "retention_range_type" : "MONTH"
+    },
+    "create_empty_segment_enabled" : false
+  }
+}
\ No newline at end of file
diff --git 
a/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/dataflow/c41390c5-b93d-4db3-b167-029874b85a2c.json
 
b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/dataflow/c41390c5-b93d-4db3-b167-029874b85a2c.json
new file mode 100644
index 0000000000..d107d4b0d6
--- /dev/null
+++ 
b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/dataflow/c41390c5-b93d-4db3-b167-029874b85a2c.json
@@ -0,0 +1,13 @@
+{
+  "uuid" : "c41390c5-b93d-4db3-b167-029874b85a2c",
+  "last_modified" : 0,
+  "create_time" : 1677856069390,
+  "version" : "4.0.0.0",
+  "status" : "OFFLINE",
+  "last_status" : null,
+  "cost" : 50,
+  "query_hit_count" : 0,
+  "last_query_time" : 0,
+  "layout_query_hit_count" : { },
+  "segments" : [ ]
+}
\ No newline at end of file
diff --git 
a/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/index_plan/c41390c5-b93d-4db3-b167-029874b85a2c.json
 
b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/index_plan/c41390c5-b93d-4db3-b167-029874b85a2c.json
new file mode 100644
index 0000000000..37895a2385
--- /dev/null
+++ 
b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/index_plan/c41390c5-b93d-4db3-b167-029874b85a2c.json
@@ -0,0 +1,63 @@
+{
+  "uuid" : "c41390c5-b93d-4db3-b167-029874b85a2c",
+  "last_modified" : 1677856069297,
+  "create_time" : 1677856069297,
+  "version" : "4.0.0.0",
+  "description" : null,
+  "rule_based_index" : null,
+  "indexes" : [ {
+    "id" : 0,
+    "dimensions" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
17 ],
+    "measures" : [ 100000 ],
+    "layouts" : [ {
+      "id" : 1,
+      "name" : null,
+      "owner" : null,
+      "col_order" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 
16, 17, 100000 ],
+      "shard_by_columns" : [ ],
+      "partition_by_columns" : [ ],
+      "sort_by_columns" : [ ],
+      "storage_type" : 20,
+      "update_time" : 1677856069309,
+      "manual" : false,
+      "auto" : false,
+      "base" : true,
+      "draft_version" : null,
+      "index_range" : null
+    } ],
+    "next_layout_offset" : 2
+  }, {
+    "id" : 20000000000,
+    "dimensions" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
17 ],
+    "measures" : [ ],
+    "layouts" : [ {
+      "id" : 20000000001,
+      "name" : null,
+      "owner" : null,
+      "col_order" : [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 
16, 17 ],
+      "shard_by_columns" : [ ],
+      "partition_by_columns" : [ ],
+      "sort_by_columns" : [ ],
+      "storage_type" : 20,
+      "update_time" : 1677856069310,
+      "manual" : false,
+      "auto" : false,
+      "base" : true,
+      "draft_version" : null,
+      "index_range" : null
+    } ],
+    "next_layout_offset" : 2
+  } ],
+  "override_properties" : { },
+  "to_be_deleted_indexes" : [ ],
+  "auto_merge_time_ranges" : null,
+  "retention_range" : 0,
+  "engine_type" : 80,
+  "next_aggregation_index_id" : 10000,
+  "next_table_index_id" : 20000010000,
+  "agg_shard_by_columns" : [ ],
+  "extend_partition_columns" : [ ],
+  "layout_bucket_num" : { },
+  "approved_additional_recs" : 0,
+  "approved_removal_recs" : 0
+}
\ No newline at end of file
diff --git 
a/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/model_desc/c41390c5-b93d-4db3-b167-029874b85a2c.json
 
b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/model_desc/c41390c5-b93d-4db3-b167-029874b85a2c.json
new file mode 100644
index 0000000000..8eb8d72b45
--- /dev/null
+++ 
b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/model_desc/c41390c5-b93d-4db3-b167-029874b85a2c.json
@@ -0,0 +1,158 @@
+{
+  "uuid" : "c41390c5-b93d-4db3-b167-029874b85a2c",
+  "last_modified" : 1677856069295,
+  "create_time" : 1677856068306,
+  "version" : "4.0.0.0",
+  "alias" : "p_lineorder",
+  "owner" : "ADMIN",
+  "config_last_modifier" : null,
+  "config_last_modified" : 0,
+  "description" : "",
+  "fact_table" : "SSB.P_LINEORDER",
+  "fact_table_alias" : null,
+  "management_type" : "MODEL_BASED",
+  "join_tables" : [ ],
+  "filter_condition" : "",
+  "partition_desc" : {
+    "partition_date_column" : "P_LINEORDER.LO_ORDERDATE",
+    "partition_date_start" : 0,
+    "partition_date_format" : "yyyy-MM-dd",
+    "partition_type" : "APPEND",
+    "partition_condition_builder" : 
"org.apache.kylin.metadata.model.PartitionDesc$DefaultPartitionConditionBuilder"
+  },
+  "capacity" : "MEDIUM",
+  "segment_config" : {
+    "auto_merge_enabled" : null,
+    "auto_merge_time_ranges" : null,
+    "volatile_range" : null,
+    "retention_range" : null,
+    "create_empty_segment_enabled" : false
+  },
+  "data_check_desc" : null,
+  "semantic_version" : 0,
+  "storage_type" : 0,
+  "model_type" : "BATCH",
+  "all_named_columns" : [ {
+    "id" : 0,
+    "name" : "LO_SHIPMODE",
+    "column" : "P_LINEORDER.LO_SHIPMODE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 1,
+    "name" : "LO_LINENUMBER",
+    "column" : "P_LINEORDER.LO_LINENUMBER",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 2,
+    "name" : "LO_ORDTOTALPRICE",
+    "column" : "P_LINEORDER.LO_ORDTOTALPRICE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 3,
+    "name" : "LO_SUPPLYCOST",
+    "column" : "P_LINEORDER.LO_SUPPLYCOST",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 4,
+    "name" : "LO_SUPPKEY",
+    "column" : "P_LINEORDER.LO_SUPPKEY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 5,
+    "name" : "LO_QUANTITY",
+    "column" : "P_LINEORDER.LO_QUANTITY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 6,
+    "name" : "LO_PARTKEY",
+    "column" : "P_LINEORDER.LO_PARTKEY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 7,
+    "name" : "LO_ORDERKEY",
+    "column" : "P_LINEORDER.LO_ORDERKEY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 8,
+    "name" : "LO_CUSTKEY",
+    "column" : "P_LINEORDER.LO_CUSTKEY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 9,
+    "name" : "LO_SHIPPRIOTITY",
+    "column" : "P_LINEORDER.LO_SHIPPRIOTITY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 10,
+    "name" : "LO_DISCOUNT",
+    "column" : "P_LINEORDER.LO_DISCOUNT",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 11,
+    "name" : "LO_ORDERPRIOTITY",
+    "column" : "P_LINEORDER.LO_ORDERPRIOTITY",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 12,
+    "name" : "LO_ORDERDATE",
+    "column" : "P_LINEORDER.LO_ORDERDATE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 13,
+    "name" : "LO_REVENUE",
+    "column" : "P_LINEORDER.LO_REVENUE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 14,
+    "name" : "V_REVENUE",
+    "column" : "P_LINEORDER.V_REVENUE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 15,
+    "name" : "LO_COMMITDATE",
+    "column" : "P_LINEORDER.LO_COMMITDATE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 16,
+    "name" : "LO_EXTENDEDPRICE",
+    "column" : "P_LINEORDER.LO_EXTENDEDPRICE",
+    "status" : "DIMENSION"
+  }, {
+    "id" : 17,
+    "name" : "LO_TAX",
+    "column" : "P_LINEORDER.LO_TAX",
+    "status" : "DIMENSION"
+  } ],
+  "all_measures" : [ {
+    "name" : "COUNT_ALL",
+    "function" : {
+      "expression" : "COUNT",
+      "parameters" : [ {
+        "type" : "constant",
+        "value" : "1"
+      } ],
+      "returntype" : "bigint"
+    },
+    "column" : null,
+    "comment" : null,
+    "id" : 100000,
+    "type" : "NORMAL",
+    "internal_ids" : [ ]
+  } ],
+  "recommendations_count" : 0,
+  "computed_columns" : [ ],
+  "canvas" : {
+    "coordinate" : {
+      "P_LINEORDER" : {
+        "x" : 530.6666395399305,
+        "y" : 249.00000678168405,
+        "width" : 220.0,
+        "height" : 200.0
+      }
+    },
+    "zoom" : 9.0
+  },
+  "multi_partition_desc" : null,
+  "multi_partition_key_mapping" : null,
+  "fusion_id" : null
+}
\ No newline at end of file
diff --git 
a/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/table/SSB.CUSTOMER.json
 
b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/table/SSB.CUSTOMER.json
new file mode 100644
index 0000000000..188df51654
--- /dev/null
+++ 
b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/table/SSB.CUSTOMER.json
@@ -0,0 +1,68 @@
+{
+  "uuid" : "9481374f-63be-5f0b-5a6d-22c7a561b1d9",
+  "last_modified" : 0,
+  "create_time" : 1677856024986,
+  "version" : "4.0.0.0",
+  "name" : "CUSTOMER",
+  "columns" : [ {
+    "id" : "1",
+    "name" : "C_CUSTKEY",
+    "datatype" : "integer",
+    "case_sensitive_name" : "c_custkey"
+  }, {
+    "id" : "2",
+    "name" : "C_NAME",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_name"
+  }, {
+    "id" : "3",
+    "name" : "C_ADDRESS",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_address"
+  }, {
+    "id" : "4",
+    "name" : "C_CITY",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_city"
+  }, {
+    "id" : "5",
+    "name" : "C_NATION",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_nation"
+  }, {
+    "id" : "6",
+    "name" : "C_REGION",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_region"
+  }, {
+    "id" : "7",
+    "name" : "C_PHONE",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_phone"
+  }, {
+    "id" : "8",
+    "name" : "C_MKTSEGMENT",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "c_mktsegment"
+  } ],
+  "source_type" : 9,
+  "table_type" : "EXTERNAL",
+  "top" : false,
+  "increment_loading" : false,
+  "last_snapshot_path" : null,
+  "last_snapshot_size" : 0,
+  "snapshot_last_modified" : 0,
+  "query_hit_count" : 0,
+  "partition_column" : null,
+  "snapshot_partitions" : { },
+  "snapshot_partitions_info" : { },
+  "snapshot_total_rows" : 0,
+  "snapshot_partition_col" : null,
+  "selected_snapshot_partition_col" : null,
+  "temp_snapshot_path" : null,
+  "snapshot_has_broken" : false,
+  "database" : "SSB",
+  "transactional" : false,
+  "rangePartition" : false,
+  "partition_desc" : null
+}
\ No newline at end of file
diff --git 
a/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/table/SSB.P_LINEORDER.json
 
b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/table/SSB.P_LINEORDER.json
new file mode 100644
index 0000000000..f17cb19cc6
--- /dev/null
+++ 
b/src/kylin-it/src/test/resources/ut_meta/bloomfilter/metadata/bloomfilter/table/SSB.P_LINEORDER.json
@@ -0,0 +1,118 @@
+{
+  "uuid" : "f0dc3d45-0eb0-26c0-4681-b6c0d0c8e270",
+  "last_modified" : 0,
+  "create_time" : 1677856025283,
+  "version" : "4.0.0.0",
+  "name" : "P_LINEORDER",
+  "columns" : [ {
+    "id" : "1",
+    "name" : "LO_ORDERKEY",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_orderkey"
+  }, {
+    "id" : "2",
+    "name" : "LO_LINENUMBER",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_linenumber"
+  }, {
+    "id" : "3",
+    "name" : "LO_CUSTKEY",
+    "datatype" : "integer",
+    "case_sensitive_name" : "lo_custkey"
+  }, {
+    "id" : "4",
+    "name" : "LO_PARTKEY",
+    "datatype" : "integer",
+    "case_sensitive_name" : "lo_partkey"
+  }, {
+    "id" : "5",
+    "name" : "LO_SUPPKEY",
+    "datatype" : "integer",
+    "case_sensitive_name" : "lo_suppkey"
+  }, {
+    "id" : "6",
+    "name" : "LO_ORDERDATE",
+    "datatype" : "date",
+    "case_sensitive_name" : "lo_orderdate"
+  }, {
+    "id" : "7",
+    "name" : "LO_ORDERPRIOTITY",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "lo_orderpriotity"
+  }, {
+    "id" : "8",
+    "name" : "LO_SHIPPRIOTITY",
+    "datatype" : "integer",
+    "case_sensitive_name" : "lo_shippriotity"
+  }, {
+    "id" : "9",
+    "name" : "LO_QUANTITY",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_quantity"
+  }, {
+    "id" : "10",
+    "name" : "LO_EXTENDEDPRICE",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_extendedprice"
+  }, {
+    "id" : "11",
+    "name" : "LO_ORDTOTALPRICE",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_ordtotalprice"
+  }, {
+    "id" : "12",
+    "name" : "LO_DISCOUNT",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_discount"
+  }, {
+    "id" : "13",
+    "name" : "LO_REVENUE",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_revenue"
+  }, {
+    "id" : "14",
+    "name" : "LO_SUPPLYCOST",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_supplycost"
+  }, {
+    "id" : "15",
+    "name" : "LO_TAX",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "lo_tax"
+  }, {
+    "id" : "16",
+    "name" : "LO_COMMITDATE",
+    "datatype" : "date",
+    "case_sensitive_name" : "lo_commitdate"
+  }, {
+    "id" : "17",
+    "name" : "LO_SHIPMODE",
+    "datatype" : "varchar(4096)",
+    "case_sensitive_name" : "lo_shipmode"
+  }, {
+    "id" : "18",
+    "name" : "V_REVENUE",
+    "datatype" : "bigint",
+    "case_sensitive_name" : "v_revenue"
+  } ],
+  "source_type" : 9,
+  "table_type" : "VIEW",
+  "top" : false,
+  "increment_loading" : false,
+  "last_snapshot_path" : null,
+  "last_snapshot_size" : 0,
+  "snapshot_last_modified" : 0,
+  "query_hit_count" : 0,
+  "partition_column" : null,
+  "snapshot_partitions" : { },
+  "snapshot_partitions_info" : { },
+  "snapshot_total_rows" : 0,
+  "snapshot_partition_col" : null,
+  "selected_snapshot_partition_col" : null,
+  "temp_snapshot_path" : null,
+  "snapshot_has_broken" : false,
+  "database" : "SSB",
+  "transactional" : false,
+  "rangePartition" : false,
+  "partition_desc" : null
+}
\ No newline at end of file
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index c22b2ee0d9..b542c6d77c 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -80,6 +80,7 @@ import org.apache.kylin.common.util.AddressUtil;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SetThreadName;
+import org.apache.kylin.engine.spark.filter.BloomFilterSkipCollector;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.acl.AclTCR;
@@ -428,6 +429,7 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
             errorMsg = errorMsg.length() > maxLength ? errorMsg.substring(0, 
maxLength) : errorMsg;
         }
 
+        
BloomFilterSkipCollector.logAndCleanStatus(QueryContext.current().getQueryId());
         LogReport report = new LogReport().put(LogReport.QUERY_ID, 
QueryContext.current().getQueryId())
                 .put(LogReport.SQL, sql).put(LogReport.USER, user)
                 .put(LogReport.SUCCESS, null == 
response.getExceptionMessage()).put(LogReport.DURATION, duration)
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
index 0b104eb42a..a6517769fd 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/SegmentExec.scala
@@ -32,10 +32,12 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.datasource.storage.{StorageListener, 
StorageStoreFactory, WriteTaskStats}
 import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
 import org.apache.spark.tracker.BuildContext
-
 import java.util
 import java.util.Objects
 import java.util.concurrent.{BlockingQueue, ForkJoinPool, LinkedBlockingQueue, 
TimeUnit}
+
+import org.apache.kylin.engine.spark.filter.ParquetBloomFilter
+
 import scala.collection.JavaConverters._
 import scala.collection.parallel.ForkJoinTaskSupport
 
@@ -325,7 +327,7 @@ trait SegmentExec extends Logging {
       case Some(x) => store.setStorageListener(x)
       case None =>
     }
-
+    ParquetBloomFilter.registerBloomColumnIfNeed(project, dataflowId);
     val stats = store.save(layout, new Path(storagePath), 
KapConfig.wrap(config), layoutDS)
     sparkSession.sparkContext.setJobDescription(null)
     stats
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/utils/TestJobMetricsUtils.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/utils/TestJobMetricsUtils.scala
index bd1bc65bea..50d3e89f96 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/utils/TestJobMetricsUtils.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/utils/TestJobMetricsUtils.scala
@@ -20,7 +20,7 @@ package org.apache.kylin.engine.spark.utils
 
 import org.apache.commons.io.FileUtils
 import org.apache.kylin.common.util.RandomUtil
-import org.apache.spark.sql.common.{SharedSparkSession, SparderBaseFunSuite}
+import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, 
SparderBaseFunSuite}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode}
@@ -30,7 +30,8 @@ import java.io.File
 import java.util.concurrent.{Executors, TimeUnit}
 
 
-class TestJobMetricsUtils extends SparderBaseFunSuite with SharedSparkSession 
with BeforeAndAfterAll {
+class TestJobMetricsUtils extends SparderBaseFunSuite with SharedSparkSession 
with BeforeAndAfterAll
+  with LocalMetadata {
 
   private val path1 = "./temp1"
   private val id1 = RandomUtil.randomUUIDStr
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index 7815e981b0..fc8e6044e1 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -22,7 +22,8 @@ import java.io.{File, FileOutputStream, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.atomic.AtomicLong
 import java.{lang, util}
-
+import com.google.common.cache.{Cache, CacheBuilder}
+import io.kyligence.kap.secondstorage.SecondStorageUtil
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs.Path
@@ -41,17 +42,13 @@ import org.apache.poi.xssf.usermodel.{XSSFSheet, 
XSSFWorkbook}
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive.QueryMetricUtils
-import org.apache.spark.sql.util.SparderTypeUtil
+import org.apache.spark.sql.util.{SparderConstants, SparderTypeUtil}
 import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparderEnv}
 
 import scala.collection.JavaConverters._
 import scala.collection.convert.ImplicitConversions.`iterator asScala`
 import scala.collection.mutable
 
-import com.google.common.cache.{Cache, CacheBuilder}
-
-import io.kyligence.kap.secondstorage.SecondStorageUtil
-
 // scalastyle:off
 object ResultType extends Enumeration {
   type ResultType = Value
@@ -505,7 +502,7 @@ object ResultPlan extends LogEx {
 }
 
 object QueryToExecutionIDCache extends LogEx {
-  val KYLIN_QUERY_ID_KEY = "kylin.query.id"
+  val KYLIN_QUERY_ID_KEY = SparderConstants.KYLIN_QUERY_ID_KEY
   val KYLIN_QUERY_EXECUTION_ID = "kylin.query.execution.id"
 
   private val queryID2ExecutionID: Cache[String, String] =
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
index cfb469d547..81934163c3 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
@@ -23,17 +23,20 @@ import java.security.PrivilegedAction
 import java.util.Map
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{Callable, ExecutorService}
+
+import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.kylin.common.exception.{KylinException, 
KylinTimeoutException, ServerErrorCode}
 import org.apache.kylin.common.msg.MsgPicker
 import org.apache.kylin.common.util.{DefaultHostInfoFetcher, HadoopUtil, 
S3AUtil}
 import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
+import org.apache.kylin.engine.spark.filter.BloomFilterSkipCollector
 import org.apache.kylin.metadata.model.{NTableMetadataManager, TableExtDesc}
 import org.apache.kylin.metadata.project.NProjectManager
 import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache
 import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerLogRollUp}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerLogRollUp, SparkListenerTaskEnd}
 import org.apache.spark.sql.KylinSession._
 import org.apache.spark.sql.catalyst.optimizer.ConvertInnerJoinToSemiJoin
 import org.apache.spark.sql.catalyst.parser.ParseException
@@ -242,6 +245,7 @@ object SparderEnv extends Logging {
           .getContextClassLoader
           .toString)
       registerListener(sparkSession.sparkContext)
+      registerQueryMetrics(sparkSession.sparkContext)
       APP_MASTER_TRACK_URL = null
       startSparkFailureTimes = 0
       lastStartSparkFailureTime = 0
@@ -289,6 +293,28 @@ object SparderEnv extends Logging {
     sc.addSparkListener(sparkListener)
   }
 
+  def registerQueryMetrics(sc: SparkContext): Unit = {
+    if (!KylinConfig.getInstanceFromEnv.isCollectQueryMetricsEnabled) {
+      return
+    }
+    val taskListener = new SparkListener {
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+        try {
+          if (StringUtils.isNotBlank(taskEnd.queryId)) {
+            val inputMetrics = taskEnd.taskMetrics.inputMetrics
+            BloomFilterSkipCollector.addQueryMetrics(taskEnd.queryId,
+              inputMetrics.totalBloomBlocks, inputMetrics.totalSkipBloomBlocks,
+              inputMetrics.totalSkipBloomRows, inputMetrics.footerReadTime,
+              inputMetrics.footerReadNumber)
+          }
+        } catch {
+          case e: Throwable => logWarning("error when add metrics for query", 
e)
+        }
+      }
+    }
+    sc.addSparkListener(taskListener)
+  }
+
   /**
    * @param sqlText SQL to be validated
    * @return The logical plan
diff --git 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/BloomFilterSkipCollector.java
 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/BloomFilterSkipCollector.java
new file mode 100644
index 0000000000..65e9d41ae8
--- /dev/null
+++ 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/BloomFilterSkipCollector.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.filter;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AutoReadWriteLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+public class BloomFilterSkipCollector {
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(BloomFilterSkipCollector.class);
+
+    // use guava cache is for auto clean even if there are something went 
wrong in query
+    public static final Cache<String, AtomicLong> queryTotalBloomBlocks = 
CacheBuilder
+            .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
+    public static final Cache<String, AtomicLong> querySkipBloomBlocks = 
CacheBuilder
+            .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
+    public static final Cache<String, AtomicLong> querySkipBloomRows = 
CacheBuilder
+            .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
+    public static final Cache<String, AtomicLong> queryFooterReadTime = 
CacheBuilder
+            .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
+    public static final Cache<String, AtomicLong> queryFooterReadNumber = 
CacheBuilder
+            .newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
+
+    public static final AtomicLong logCounter = new AtomicLong(0);
+    private static final AtomicLong globalBloomBlocks = new AtomicLong(0);
+    private static final AtomicLong globalSkipBloomBlocks = new AtomicLong(0);
+
+    private static final AutoReadWriteLock LOCK = new AutoReadWriteLock(new 
ReentrantReadWriteLock());
+
+    public static void addQueryMetrics(
+            String queryId, long totalBloomBlocks,
+            long skipBloomBlocks, long skipBloomRows, long footerReadTime, 
long footerReadNumber) {
+        long start = System.currentTimeMillis();
+        try (AutoReadWriteLock.AutoLock writeLock = LOCK.lockForWrite()) {
+            addQueryCounter(queryId, queryTotalBloomBlocks, totalBloomBlocks);
+            addQueryCounter(queryId, querySkipBloomBlocks, skipBloomBlocks);
+            addQueryCounter(queryId, querySkipBloomRows, skipBloomRows);
+            addQueryCounter(queryId, queryFooterReadTime, footerReadTime);
+            addQueryCounter(queryId, queryFooterReadNumber, footerReadNumber);
+            globalBloomBlocks.addAndGet(totalBloomBlocks);
+            globalSkipBloomBlocks.addAndGet(skipBloomBlocks);
+        } catch (Exception e) {
+            LOGGER.error("Error when add query metrics.", e);
+        }
+        long end = System.currentTimeMillis();
+        if ((end - start) > 100) {
+            LOGGER.warn("BloomFilter collector cost too much time: {} ms ", 
(end - start));
+        }
+    }
+
+    private static void addQueryCounter(String queryId,
+            Cache<String, AtomicLong> counter, long step) throws 
ExecutionException {
+        AtomicLong totalBlocks = counter.get(queryId, () -> new 
AtomicLong(0L));
+        totalBlocks.addAndGet(step);
+    }
+
+    public static void logAndCleanStatus(String queryId) {
+        try {
+            AtomicLong readTime = queryFooterReadTime.get(queryId, () -> new 
AtomicLong(0L));
+            AtomicLong readNumber = queryFooterReadNumber.get(queryId, () -> 
new AtomicLong(1L));
+            if (readNumber.get() > 0L && readTime.get() > 0L) {
+                LOGGER.info("Reading footer avg time is {}, total read time is 
{}, number of row groups is {}",
+                        readTime.get() / readNumber.get(), readTime.get(), 
readNumber);
+            }
+            AtomicLong totalBloomBlocks = 
queryTotalBloomBlocks.getIfPresent(queryId);
+            if (KylinConfig.getInstanceFromEnv().isBloomCollectFilterEnabled()
+                    && totalBloomBlocks != null && totalBloomBlocks.get() > 0) 
{
+                AtomicLong skipBlocks = querySkipBloomBlocks.get(queryId, () 
-> new AtomicLong(0L));
+                AtomicLong skipRows = querySkipBloomRows.get(queryId, () -> 
new AtomicLong(0L));
+                LOGGER.info("BloomFilter total bloom blocks is {}, skip bloom 
blocks is {}, skip rows is {}",
+                        totalBloomBlocks.get(), skipBlocks.get(), 
skipRows.get());
+            }
+            queryFooterReadTime.invalidate(queryId);
+            queryFooterReadNumber.invalidate(queryId);
+            queryTotalBloomBlocks.invalidate(queryId);
+            querySkipBloomBlocks.invalidate(queryId);
+            querySkipBloomRows.invalidate(queryId);
+            logCounter.incrementAndGet();
+            if (logCounter.get() >= 100) {
+                LOGGER.info("Global BloomFilter total bloom blocks is {}, "
+                                + " skip bloom blocks is {}",
+                        globalBloomBlocks.get(), globalSkipBloomBlocks.get());
+                logCounter.set(0);
+            }
+            if (globalBloomBlocks.get() < 0 || globalSkipBloomBlocks.get() < 
0) {
+                // globalBloomBlocks number > Long.MAX_VALUE, almost 
impossible to get here
+                globalBloomBlocks.set(0);
+                globalSkipBloomBlocks.set(0);
+            }
+        } catch (ExecutionException e) {
+            LOGGER.error("Error when log query metrics.", e);
+        }
+    }
+
+    private BloomFilterSkipCollector() {
+    }
+}
diff --git 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java
 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java
new file mode 100644
index 0000000000..890edcfef1
--- /dev/null
+++ 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/ParquetBloomFilter.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.filter;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+public class ParquetBloomFilter {
+
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(ParquetBloomFilter.class);
+
+    private static final SortedSet<ColumnFilter> columnFilters = new 
TreeSet<>();
+    private static boolean loaded = false;
+    private static final List<String> buildBloomColumns = Lists.newArrayList();
+
+    public static void registerBloomColumnIfNeed(String project, String 
modelId) {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        if (!config.isBloomBuildEnabled()) {
+            return;
+        }
+        if (StringUtils.isBlank(project) || StringUtils.isBlank(modelId)) {
+            // won't happen
+            return;
+        }
+        if (loaded) {
+            return;
+        }
+        try {
+            project = project.toUpperCase();
+            FileSystem fs = 
HadoopUtil.getFileSystem(config.getHdfsWorkingDirectory());
+            Path filterInfo = new 
Path(QueryFiltersCollector.FILTER_STORAGE_PATH);
+            if (!fs.exists(filterInfo)) {
+                loaded = true;
+                return;
+            }
+            FileStatus[] hostsDir = fs.listStatus(new 
Path(QueryFiltersCollector.FILTER_STORAGE_PATH));
+            HashMap<String, Integer> columnsHits = Maps.newHashMap();
+            for (FileStatus host : hostsDir) {
+                String hostName = host.getPath().getName();
+                Path projectFiltersFile = 
QueryFiltersCollector.getProjectFiltersFile(hostName, project);
+                Map<String, Map<String, Integer>> modelColumns = 
JsonUtil.readValue(
+                        HadoopUtil.readStringFromHdfs(fs, projectFiltersFile), 
Map.class);
+                if (modelColumns.containsKey(modelId)) {
+                    modelColumns.get(modelId).forEach((column, hit) -> {
+                        int originHit = columnsHits.getOrDefault(column, 0);
+                        columnsHits.put(column, originHit + hit);
+                    });
+                }
+            }
+            columnsHits.forEach((column, hit) -> columnFilters.add(new 
ColumnFilter(column, hit)));
+            String columnFiltersLog = Arrays.toString(columnFilters.toArray());
+            LOGGER.info("register BloomFilter info : {}", columnFiltersLog);
+        } catch (Exception e) {
+            LOGGER.error("Error when register BloomFilter.", e);
+        }
+        loaded = true;
+    }
+
+    public static void configBloomColumnIfNeed(Dataset<Row> data, 
DataFrameWriter<Row> dataWriter) {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        if (!config.isBloomBuildEnabled()) {
+            return;
+        }
+        String manualColumn = config.getBloomBuildColumn();
+        if (StringUtils.isNotBlank(manualColumn)) {
+            String[] blooms = manualColumn.split("#");
+            for (int i = 0; i < blooms.length; i += 2) {
+                String nvd = blooms[i + 1];
+                dataWriter.option("parquet.bloom.filter.enabled#" + blooms[i], 
"true");
+                dataWriter.option("parquet.bloom.filter.expected.ndv#" + 
blooms[i], nvd);
+                LOGGER.info("build BloomFilter info: columnId is {}, nvd is 
{}", blooms[i], nvd);
+                buildBloomColumns.add(blooms[i]);
+            }
+            return;
+        }
+        Set<String> columns = 
Arrays.stream(data.columns()).collect(Collectors.toSet());
+        Set<ColumnFilter> dataColumns = columnFilters.stream()
+                .filter(column -> 
columns.contains(column.columnId)).collect(Collectors.toSet());
+        int count = 0;
+        for (ColumnFilter columnFilter : dataColumns) {
+            if (count >= config.getBloomBuildColumnMaxNum()) {
+                break;
+            }
+            dataWriter.option("parquet.bloom.filter.enabled#" + 
columnFilter.columnId, "true");
+            dataWriter.option("parquet.bloom.filter.expected.ndv#" + 
columnFilter.columnId, config.getBloomBuildColumnNvd());
+            buildBloomColumns.add(columnFilter.columnId);
+            LOGGER.info("building BloomFilter : columnId is {}; nvd is {}",
+                    columnFilter.columnId, config.getBloomBuildColumnNvd());
+            count++;
+        }
+    }
+
+    private ParquetBloomFilter() {
+    }
+
+    // Only for Unit Test
+    public static void resetParquetBloomFilter() {
+        ParquetBloomFilter.loaded = false;
+        ParquetBloomFilter.buildBloomColumns.clear();
+        ParquetBloomFilter.columnFilters.clear();
+    }
+
+    public static List<String> getBuildBloomColumns() {
+        return buildBloomColumns;
+    }
+
+    public static boolean isLoaded() {
+        return loaded;
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class ColumnFilter implements Comparable<ColumnFilter> {
+        private String columnId;
+        private int hit;
+
+        @Override
+        public int compareTo(ColumnFilter o) {
+            if (o.hit != this.hit) {
+                return Integer.compare(o.hit, this.hit);
+            }
+            return o.columnId.compareTo(this.columnId);
+        }
+    }
+}
diff --git 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java
 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java
new file mode 100644
index 0000000000..9066321403
--- /dev/null
+++ 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/filter/QueryFiltersCollector.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.filter;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AddressUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.NamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class QueryFiltersCollector {
+
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(QueryFiltersCollector.class);
+
+    // To reduce HDFS storage, only use map to record it.
+    // schema: <project, <modelId, <columnId, filter_hit_number>>
+    protected static final ConcurrentMap<String, Map<String, Map<String, 
Integer>>> currentQueryFilters =
+            Maps.newConcurrentMap();
+
+    public static final ScheduledExecutorService executor =
+            Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("query-filter-collector"));
+
+    public static final String SERVER_HOST = AddressUtil.getLocalServerInfo();
+
+    public static final String FILTER_STORAGE_PATH =
+            KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + 
"/query_filter/";
+
+
+    public static void increaseHit(String project, String modelId, String 
columnId) {
+        if (!KylinConfig.getInstanceFromEnv().isBloomCollectFilterEnabled()) {
+            return;
+        }
+        project = project.toUpperCase();
+        Map<String, Integer> modelFilters = getModelFilters(project, modelId);
+        int hit = modelFilters.getOrDefault(columnId, 0);
+        modelFilters.put(columnId, ++hit);
+    }
+
+    public static void initScheduler() {
+        if (!KylinConfig.getInstanceFromEnv().isBloomCollectFilterEnabled()) {
+            return;
+        }
+        executor.scheduleAtFixedRate(() -> {
+            long startTime = System.currentTimeMillis();
+            LOGGER.info("Start sync query filters, current query filters is " 
+ currentQueryFilters);
+            try {
+                KylinConfig config = KylinConfig.getInstanceFromEnv();
+                FileSystem fs = 
HadoopUtil.getFileSystem(config.getHdfsWorkingDirectory());
+                currentQueryFilters.forEach((project, currentFilters) -> {
+                    try {
+                        Path projectFilterPath = 
getProjectFiltersFile(SERVER_HOST, project);
+                        Map<String, Map<String, Integer>> mergedHistory;
+                        if (!fs.exists(projectFilterPath)) {
+                            mergedHistory = currentFilters;
+                        } else {
+                            mergedHistory = mergeHistory(fs, currentFilters, 
projectFilterPath);
+                        }
+                        HadoopUtil.writeStringToHdfs(fs, 
JsonUtil.writeValueAsString(mergedHistory), projectFilterPath);
+                        currentQueryFilters.remove(project);
+                    } catch (IOException e) {
+                        LOGGER.error("Error when sync query filters for 
project : " + project, e);
+                    }
+                });
+                long endTime = System.currentTimeMillis();
+                LOGGER.info("Sync query filters success. cost time " + 
(endTime - startTime) + " ms."
+                        + " the failed filters maybe " + currentQueryFilters);
+            } catch (Throwable e) {
+                LOGGER.error("Error when sync query filters...", e);
+            }
+        }, 0, 
KylinConfig.getInstanceFromEnv().getQueryFilterCollectInterval(), 
TimeUnit.SECONDS);
+    }
+
+    private static Map<String, Map<String, Integer>> mergeHistory(FileSystem 
fs,
+            Map<String, Map<String, Integer>> currentFilters, Path 
projectFilterPath) throws IOException {
+        Map<String, Map<String, Integer>> history = JsonUtil.readValue(
+                HadoopUtil.readStringFromHdfs(fs, projectFilterPath), 
Map.class);
+        currentFilters.forEach((currentModel, currentColumns) -> {
+            if (!history.containsKey(currentModel)) {
+                history.put(currentModel, currentColumns);
+            } else {
+                Map<String, Integer> historyColumns = 
history.get(currentModel);
+                currentColumns.forEach((column, hit) -> {
+                    Integer oriHit = historyColumns.getOrDefault(column, 0);
+                    if (oriHit < 0) {
+                        // hit number > Integer.MAX_VALUE, almost impossible 
to get here
+                        oriHit = Integer.MAX_VALUE / 2;
+                    }
+                    historyColumns.put(column, oriHit + hit);
+                });
+            }
+        });
+        return history;
+    }
+
+    public static void destoryScheduler() {
+        executor.shutdown();
+    }
+
+    // schema:  <modelId, <columnId, filter_hit_number>>
+    private static Map<String, Map<String, Integer>> getProjectFilters(String 
project) {
+        project = project.toUpperCase();
+        currentQueryFilters.computeIfAbsent(project, key -> 
Maps.newConcurrentMap());
+        return currentQueryFilters.get(project);
+    }
+
+    // schema: <columnId, filter_hit_number>
+    private static Map<String, Integer> getModelFilters(String project, String 
modelId) {
+        project = project.toUpperCase();
+        Map<String, Map<String, Integer>> projectFilters = 
getProjectFilters(project);
+        projectFilters.computeIfAbsent(modelId, key -> 
Maps.newConcurrentMap());
+        return projectFilters.get(modelId);
+    }
+
+    public static Path getProjectFiltersFile(String host, String project) {
+        project = project.toUpperCase();
+        return new Path(FILTER_STORAGE_PATH + host + "/" + project);
+    }
+
+    private QueryFiltersCollector() {
+
+    }
+}
diff --git 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java
 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java
index 240c55b564..cd26d5c514 100644
--- 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java
+++ 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/engine/spark/utils/Repartitioner.java
@@ -24,10 +24,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameWriter;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
+
+import org.apache.kylin.engine.spark.filter.ParquetBloomFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -148,8 +151,9 @@ public class Repartitioner {
                 data = ss.read().parquet(inputPath).repartition(repartitionNum)
                         
.sortWithinPartitions(convertIntegerToColumns(sortByColumns));
             }
-
-            data.write().mode(SaveMode.Overwrite).parquet(outputPath);
+            DataFrameWriter<Row> writer = 
data.write().mode(SaveMode.Overwrite);
+            ParquetBloomFilter.configBloomColumnIfNeed(data, writer);
+            writer.parquet(outputPath);
             if (needRepartitionForShardByColumns()) {
                 if (optimizeShardEnabled)
                     
ss.sessionState().conf().setLocalProperty("spark.sql.adaptive.skewRepartition.enabled",
 null);
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/kylin/engine/spark/utils/StorageUtils.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/engine/spark/utils/StorageUtils.scala
index 6a2a5238d8..b6ec7d6969 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/kylin/engine/spark/utils/StorageUtils.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/engine/spark/utils/StorageUtils.scala
@@ -17,18 +17,20 @@
 
 package org.apache.kylin.engine.spark.utils
 
-import org.apache.kylin.metadata.cube.model.LayoutEntity
-import org.apache.kylin.metadata.model.NDataModel
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.kylin.common.KapConfig
 import org.apache.kylin.common.util.{HadoopUtil, JsonUtil, RandomUtil}
+import org.apache.kylin.engine.spark.filter.ParquetBloomFilter
 import org.apache.kylin.measure.bitmap.BitmapMeasureType
+import org.apache.kylin.metadata.cube.model.LayoutEntity
+import org.apache.kylin.metadata.model.NDataModel
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
 
-import scala.collection.JavaConverters._
-
 object StorageUtils extends Logging {
   val MB: Long = 1024 * 1024
 
@@ -72,7 +74,9 @@ object StorageUtils extends Logging {
 
   def writeWithMetrics(data: DataFrame, path: String): JobMetrics = {
     withMetrics(data.sparkSession) {
-      data.write.mode(SaveMode.Overwrite).parquet(path)
+      val writer = data.write.mode(SaveMode.Overwrite)
+      ParquetBloomFilter.configBloomColumnIfNeed(data, writer)
+      writer.parquet(path)
     }
   }
 
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderConstants.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderConstants.scala
index 025f9e785b..ac548e4bae 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderConstants.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderConstants.scala
@@ -45,4 +45,5 @@ object SparderConstants {
   val KYLIN_CONF = "kylin_conf"
   val PARQUET_FILE_CUBE_TYPE = "cube"
   val DATE_DICT = "date_dict"
+  val KYLIN_QUERY_ID_KEY = "kylin.query.id"
 }
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index b97d402217..e99a17a2fb 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -18,12 +18,15 @@
 
 package org.apache.spark.sql.execution.datasource
 
-import io.kyligence.kap.guava20.shaded.common.collect.Sets
+import com.google.common.collect.Sets
 
+import java.sql.{Date, Timestamp}
+import java.util
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.kylin.common.exception.TargetSegmentNotFoundException
 import org.apache.kylin.common.util.{DateFormat, HadoopUtil}
 import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
+import org.apache.kylin.engine.spark.filter.QueryFiltersCollector.increaseHit
 import org.apache.kylin.engine.spark.utils.{LogEx, LogUtils}
 import org.apache.kylin.metadata.cube.model.{DimensionRangeInfo, LayoutEntity, 
NDataflow, NDataflowManager}
 import org.apache.kylin.metadata.datatype.DataType
@@ -39,8 +42,6 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.util.collection.BitSet
 
-import java.sql.{Date, Timestamp}
-import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
@@ -390,6 +391,8 @@ class FilePruner(val session: SparkSession,
 
   private def pruneSegmentsDimRange(filters: Seq[Expression],
                                     segDirs: Seq[SegmentDirectory]): 
Seq[SegmentDirectory] = {
+    val hitColumns = Sets.newHashSet[String]()
+    val project = options.getOrElse("project", "")
     val filteredStatuses = if (filters.isEmpty) {
       segDirs
     } else {
@@ -400,7 +403,8 @@ class FilePruner(val session: SparkSession,
         e => {
           val dimRange = 
dataflow.getSegment(e.segmentID).getDimensionRangeInfoMap
           if (dimRange != null && !dimRange.isEmpty) {
-            SegDimFilters(dimRange, 
dataflow.getIndexPlan.getEffectiveDimCols).foldFilter(reducedFilter) match {
+            SegDimFilters(dimRange, dataflow.getIndexPlan.getEffectiveDimCols, 
dataflow.getId, project, hitColumns)
+              .foldFilter(reducedFilter) match {
               case Trivial(true) => true
               case Trivial(false) => false
             }
@@ -410,6 +414,7 @@ class FilePruner(val session: SparkSession,
         }
       }
     }
+    hitColumns.forEach(col => increaseHit(project, dataflow.getId, col))
     filteredStatuses
   }
 
@@ -729,7 +734,8 @@ abstract class PushableColumnBase {
 
 }
 
-case class SegDimFilters(dimRange: java.util.Map[String, DimensionRangeInfo], 
dimCols: java.util.Map[Integer, TblColRef]) extends Logging {
+case class SegDimFilters(dimRange: java.util.Map[String, DimensionRangeInfo], 
dimCols: java.util.Map[Integer, TblColRef],
+                         dataflowId: String, project: String, hitColumns: 
java.util.Set[String]) extends Logging {
 
   private def insurance(id: String, value: Any)
                        (func: Any => Filter): Filter = {
@@ -759,6 +765,7 @@ case class SegDimFilters(dimRange: java.util.Map[String, 
DimensionRangeInfo], di
     filter match {
       case EqualTo(id, value: Any) =>
         val col = escapeQuote(id)
+        hitColumns.add(col)
         insurance(col, value) {
           ts => {
             val dataType = getDataType(col, value)
@@ -768,6 +775,7 @@ case class SegDimFilters(dimRange: java.util.Map[String, 
DimensionRangeInfo], di
         }
       case In(id, values: Array[Any]) =>
         val col = escapeQuote(id)
+        hitColumns.add(col)
         val satisfied = values.map(v => insurance(col, v) {
           ts => {
             val dataType = getDataType(col, v)

Reply via email to