kangkaisen commented on a change in pull request #4330:
URL: https://github.com/apache/incubator-doris/pull/4330#discussion_r474720148



##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/qe/cache/PartitionCache.java
##########
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.cache;
+
+import com.google.common.collect.Lists;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.InlineViewRef;
+import org.apache.doris.analysis.QueryStmt;
+import org.apache.doris.analysis.SelectStmt;
+import org.apache.doris.analysis.TableRef;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.RangePartitionInfo;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.qe.RowBatch;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+public class PartitionCache extends Cache {
+    private static final Logger LOG = 
LogManager.getLogger(PartitionCache.class);
+    private SelectStmt nokeyStmt;
+    private SelectStmt rewriteStmt;
+    private CompoundPredicate partitionPredicate;
+    private OlapTable olapTable;
+    private RangePartitionInfo partitionInfo;
+    private Column partColumn;
+
+    private PartitionRange range;
+    private List<PartitionRange.PartitionSingle> newRangeList;
+
+    public SelectStmt getRewriteStmt() {
+        return rewriteStmt;
+    }
+
+    public SelectStmt getNokeyStmt() {
+        return nokeyStmt;
+    }
+
+    public PartitionCache(TUniqueId queryId, SelectStmt selectStmt) {
+        super(queryId, selectStmt);
+    }
+
+    public void setCacheInfo(CacheAnalyzer.CacheTable latestTable, 
RangePartitionInfo partitionInfo, Column partColumn,
+                             CompoundPredicate partitionPredicate) {
+        this.latestTable = latestTable;
+        this.olapTable = latestTable.olapTable;
+        this.partitionInfo = partitionInfo;
+        this.partColumn = partColumn;
+        this.partitionPredicate = partitionPredicate;
+        this.newRangeList = Lists.newArrayList();
+    }
+
+    public CacheProxy.FetchCacheResult getCacheData(Status status) {
+        CacheProxy.FetchCacheRequest request;
+        rewriteSelectStmt(null);
+        request = new CacheBeProxy.FetchCacheRequest(nokeyStmt.toSql());
+        range = new PartitionRange(this.partitionPredicate, this.olapTable,
+                this.partitionInfo);
+        if (!range.analytics()) {
+            status.setStatus("analytics range error");
+            return null;
+        }
+
+        for (PartitionRange.PartitionSingle single : 
range.getPartitionSingleList()) {
+            request.addParam(single.getCacheKey().realValue(),
+                    single.getPartition().getVisibleVersion(),
+                    single.getPartition().getVisibleVersionTime()
+            );
+        }
+
+        CacheProxy.FetchCacheResult cacheResult = proxy.fetchCache(request, 
10000, status);
+        if (status.ok() && cacheResult != null) {
+            cacheResult.all_count = range.getPartitionSingleList().size();
+            for (CacheBeProxy.CacheValue value : cacheResult.getValueList()) {
+                range.setCacheFlag(value.param.partition_key);
+            }
+            MetricRepo.COUNTER_CACHE_HIT_PARTITION.increase(1L);
+            MetricRepo.COUNTER_CACHE_PARTITION_ALL.increase((long) 
range.getPartitionSingleList().size());
+            MetricRepo.COUNTER_CACHE_PARTITION_HIT.increase((long) 
cacheResult.getValueList().size());
+        }
+
+        range.setTooNewByID(latestTable.latestPartitionId);
+        //build rewrite sql
+        this.hitRange = range.buildDiskPartitionRange(newRangeList);
+        if (newRangeList != null && newRangeList.size() > 0) {
+            rewriteSelectStmt(newRangeList);
+        }
+        return cacheResult;
+    }
+
+    public void copyRowBatch(RowBatch rowBatch) {
+        if (rowBatchBuilder == null) {
+            rowBatchBuilder = new 
RowBatchBuilder(CacheAnalyzer.CacheMode.Partition);
+            rowBatchBuilder.buildPartitionIndex(selectStmt.getResultExprs(), 
selectStmt.getColLabels(),
+                    partColumn, range.buildUpdatePartitionRange());
+        }
+        rowBatchBuilder.copyRowData(rowBatch);
+    }
+
+    public void updateCache() {
+        if (!super.checkRowLimit()) {
+            return;
+        }
+
+        CacheBeProxy.UpdateCacheRequest updateRequest = 
rowBatchBuilder.buildPartitionUpdateRequest(nokeyStmt.toSql());
+        if (updateRequest.value_count > 0) {
+            CacheBeProxy proxy = new CacheBeProxy();
+            Status status = new Status();
+            proxy.updateCache(updateRequest, CacheProxy.UPDATE_TIMEOUT, 
status);
+            LOG.info("update cache model {}, queryid {}, sqlkey {}, value 
count {}, row count {}, data size {}",
+                    CacheAnalyzer.CacheMode.Partition, 
DebugUtil.printId(queryId),
+                    DebugUtil.printId(updateRequest.sql_key),
+                    updateRequest.value_count, updateRequest.row_count, 
updateRequest.data_size);
+        }
+    }
+
+    /**
+     * Set the predicate containing partition key to null
+     */
+    public void rewriteSelectStmt(List<PartitionRange.PartitionSingle> 
newRangeList) {
+        if (newRangeList == null || newRangeList.size() == 0) {
+            this.nokeyStmt = (SelectStmt) this.selectStmt.clone();
+            rewriteSelectStmt(nokeyStmt, this.partitionPredicate, null);
+        } else {
+            this.rewriteStmt = (SelectStmt) this.selectStmt.clone();
+            rewriteSelectStmt(rewriteStmt, this.partitionPredicate, 
newRangeList);
+        }
+    }
+
+    private void rewriteSelectStmt(SelectStmt newStmt, CompoundPredicate 
predicate,
+                                   List<PartitionRange.PartitionSingle> 
newRangeList) {
+        newStmt.setWhereClause(
+                rewriteWhereClause(newStmt.getWhereClause(), predicate, 
newRangeList)
+        );
+        List<TableRef> tableRefs = newStmt.getTableRefs();
+        for (TableRef tblRef : tableRefs) {
+            if (tblRef instanceof InlineViewRef) {
+                InlineViewRef viewRef = (InlineViewRef) tblRef;
+                QueryStmt queryStmt = viewRef.getViewStmt();
+                if (queryStmt instanceof SelectStmt) {
+                    rewriteSelectStmt((SelectStmt) queryStmt, predicate, 
newRangeList);
+                }
+            }
+        }
+    }
+
+    /**
+     * P1 And P2 And P3 And P4

Review comment:
       Please comment rewrite what format Expr to what another format Expr.

##########
File path: fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
##########
@@ -0,0 +1,451 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.cache;
+
+import org.apache.doris.analysis.AggregateInfo;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.CastExpr;
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.InlineViewRef;
+import org.apache.doris.analysis.QueryStmt;
+import org.apache.doris.analysis.SelectStmt;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.analysis.TableRef;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.RangePartitionInfo;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.RowBatch;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Status;
+
+import com.google.common.collect.Lists;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Analyze which caching mode a SQL is suitable for
+ * 1. T + 1 update is suitable for SQL mode
+ * 2. Partition by date, update the data of the day in near real time, which 
is suitable for Partition mode
+ */
+public class CacheAnalyzer {
+    private static final Logger LOG = 
LogManager.getLogger(CacheAnalyzer.class);
+
+    /**
+     * NoNeed : disable config or variable, not query, not scan table etc.
+     */
+    public enum CacheMode {
+        NoNeed,
+        None,
+        TTL,
+        Sql,
+        Partition
+    }
+
+    private ConnectContext context;
+    private boolean enableSqlCache = false;
+    private boolean enablePartitionCache = false;
+    private TUniqueId queryId;
+    private CacheMode cacheMode;
+    private CacheTable latestTable;
+    private StatementBase parsedStmt;
+    private SelectStmt selectStmt;
+    private List<ScanNode> scanNodes;
+    private OlapTable olapTable;
+    private RangePartitionInfo partitionInfo;
+    private Column partColumn;
+    private CompoundPredicate partitionPredicate;
+    private Cache cache;
+
+    public Cache getCache() {
+        return cache;
+    }
+
+    public CacheAnalyzer(ConnectContext context, StatementBase parsedStmt, 
Planner planner) {
+        this.context = context;
+        this.queryId = context.queryId();
+        this.parsedStmt = parsedStmt;
+        scanNodes = planner.getScanNodes();
+        latestTable = new CacheTable();
+        checkCacheConfig();
+    }
+
+    //for unit test
+    public CacheAnalyzer(ConnectContext context, StatementBase parsedStmt, 
List<ScanNode> scanNodes) {
+        this.context = context;
+        this.parsedStmt = parsedStmt;
+        this.scanNodes = scanNodes;
+        checkCacheConfig();
+    }
+
+    private void checkCacheConfig() {
+        if (Config.cache_enable_sql_mode) {
+            if (context.getSessionVariable().isEnableSqlCache()) {
+                enableSqlCache = true;
+            }
+        }
+        if (Config.cache_enable_partition_mode) {
+            if (context.getSessionVariable().isEnablePartitionCache()) {
+                enablePartitionCache = true;
+            }
+        }
+    }
+
+    public CacheMode getCacheMode() {
+        return cacheMode;
+    }
+
+    public class CacheTable implements Comparable<CacheTable> {
+        public OlapTable olapTable;
+        public long latestPartitionId;
+        public long latestVersion;
+        public long latestTime;
+
+        public CacheTable() {
+            olapTable = null;
+            latestPartitionId = 0;
+            latestVersion = 0;
+            latestTime = 0;
+        }
+
+        @Override
+        public int compareTo(CacheTable table) {
+            return (int) (table.latestTime - this.latestTime);
+        }
+
+        public void Debug() {
+            LOG.info("table {}, partition id {}, ver {}, time {}", 
olapTable.getName(), latestPartitionId, latestVersion, latestTime);
+        }
+    }
+
+    public boolean enableCache() {
+        return enableSqlCache || enablePartitionCache;
+    }
+
+    public boolean enableSqlCache() {
+        return enableSqlCache;
+    }
+
+    public boolean enablePartitionCache() {
+        return enablePartitionCache;
+    }
+
+    /**
+     * Check cache mode with SQL and table
+     * 1、Only Olap table
+     * 2、The update time of the table is before 
Config.last_version_interval_time
+     * 2、PartitionType is PartitionType.RANGE, and partition key has only one 
column
+     * 4、Partition key must be included in the group by clause
+     * 5、Where clause must contain only one partition key predicate
+     * CacheMode.Sql
+     * xxx FROM user_profile, updated before Config.last_version_interval_time
+     * CacheMode.Partition, partition by event_date, only the partition of 
today will be updated.
+     * SELECT xxx FROM app_event WHERE event_date >= 20191201 AND event_date 
<= 20191207 GROUP BY event_date
+     * SELECT xxx FROM app_event INNER JOIN user_Profile ON app_event.user_id 
= user_profile.user_id xxx
+     * SELECT xxx FROM app_event INNER JOIN user_profile ON xxx INNER JOIN 
site_channel ON xxx
+     */
+    public void checkCacheMode(long now) {
+        cacheMode = innerCheckCacheMode(now);
+    }
+
+    private CacheMode innerCheckCacheMode(long now) {
+        if (!enableCache()) {
+            return CacheMode.NoNeed;
+        }
+        if (!(parsedStmt instanceof SelectStmt) || scanNodes.size() == 0) {
+            return CacheMode.NoNeed;
+        }
+        MetricRepo.COUNTER_QUERY_TABLE.increase(1L);
+
+        this.selectStmt = (SelectStmt) parsedStmt;
+        //Check the last version time of the table
+        List<CacheTable> tblTimeList = Lists.newArrayList();
+        for (int i = 0; i < scanNodes.size(); i++) {
+            ScanNode node = scanNodes.get(i);
+            if (!(node instanceof OlapScanNode)) {
+                return CacheMode.None;
+            }
+            OlapScanNode oNode = (OlapScanNode) node;
+            OlapTable oTable = oNode.getOlapTable();
+            CacheTable cTable = getLastUpdateTime(oTable);
+            tblTimeList.add(cTable);
+        }
+        MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
+        Collections.sort(tblTimeList);
+        latestTable = tblTimeList.get(0);
+        latestTable.Debug();
+
+        if (now == 0) {
+            now = nowtime();
+        }
+        if (enableSqlCache() &&
+                (now - latestTable.latestTime) >= 
Config.cache_last_version_interval_second * 1000) {
+            LOG.info("TIME:{},{},{}", now, latestTable.latestTime, 
Config.cache_last_version_interval_second*1000);
+            cache = new SqlCache(this.queryId, this.selectStmt);
+            ((SqlCache) cache).setCacheInfo(this.latestTable);
+            MetricRepo.COUNTER_CACHE_MODE_SQL.increase(1L);
+            return CacheMode.Sql;
+        }
+
+        if (!enablePartitionCache()) {
+            return CacheMode.None;
+        }
+
+        //Check if selectStmt matches partition key
+        //Only one table can be updated in 
Config.cache_last_version_interval_second range
+        for (int i = 1; i < tblTimeList.size(); i++) {
+            if ((now - tblTimeList.get(i).latestTime) < 
Config.cache_last_version_interval_second * 1000) {
+                LOG.info("the time of other tables is newer than {}", 
Config.cache_last_version_interval_second);
+                return CacheMode.None;
+            }
+        }
+        olapTable = latestTable.olapTable;
+        if (olapTable.getPartitionInfo().getType() != PartitionType.RANGE) {
+            LOG.info("the partition of OlapTable not RANGE type");

Review comment:
       Too many log info. please change some log to debug level.

##########
File path: 
fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
##########
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.cache;
+
+import com.google.common.collect.Lists;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.qe.RowBatch;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+public class RowBatchBuilder {

Review comment:
       Add a comment for this class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to