[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14499428#comment-14499428
 ] 

ASF GitHub Bot commented on FLINK-1297:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/605#discussion_r28575964
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/statistics/OperatorStatistics.java ---
    @@ -0,0 +1,154 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.statistics;
    +
    +import 
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
    +import com.clearspring.analytics.stream.cardinality.HyperLogLog;
    +import com.clearspring.analytics.stream.cardinality.ICardinality;
    +import com.clearspring.analytics.stream.cardinality.LinearCounting;
    +import org.apache.flink.statistics.heavyhitters.IHeavyHitter;
    +import org.apache.flink.statistics.heavyhitters.LossyCounting;
    +import org.apache.flink.statistics.heavyhitters.CountMinHeavyHitter;
    +import org.apache.flink.statistics.heavyhitters.HeavyHitterMergeException;
    +
    +import java.io.Serializable;
    +import java.util.Map;
    +
    +/**
    + * Data structure that encapsulates statistical information of data that 
has only been processed by one pass
    + * This statistical information is meant to help determine the 
distribution of the data that has been processed
    + * in an operator so that we can determine if it is necessary to 
repartition the data
    + *
    + * The statistics to be gathered are configurable and represented by a 
{@link OperatorStatisticsConfig} object.
    + *
    + * The information encapsulated in this class is min, max, a structure 
enabling estimation of count distinct and a
    + * structure holding the heavy hitters along with their frequency.
    + *
    + */
    +public class OperatorStatistics implements Serializable {
    +
    +   OperatorStatisticsConfig config;
    +
    +   Object min;
    +   Object max;
    +   ICardinality countDistinct;
    +   IHeavyHitter heavyHitter;
    +   long cardinality = 0;
    +
    +   public OperatorStatistics(OperatorStatisticsConfig config) {
    +           this.config = config;
    +           if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING))
 {
    +                   countDistinct = new 
LinearCounting(OperatorStatisticsConfig.COUNTD_BITMAP_SIZE);
    +           }
    +           
if(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){
    +                   countDistinct = new 
HyperLogLog(OperatorStatisticsConfig.COUNTD_LOG2M);
    +           }
    +           if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){
    +                   heavyHitter =
    +                                   new 
LossyCounting(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, 
OperatorStatisticsConfig.HEAVY_HITTER_ERROR);
    +           }
    +           if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)){
    +                   heavyHitter =
    +                                   new 
CountMinHeavyHitter(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION,
    +                                                                           
        OperatorStatisticsConfig.HEAVY_HITTER_ERROR,
    +                                                                           
        OperatorStatisticsConfig.HEAVY_HITTER_CONFIDENCE,
    +                                                                           
        OperatorStatisticsConfig.HEAVY_HITTER_SEED);
    +           }
    +   }
    +
    +   public void process(Object tupleObject){
    +           if (tupleObject instanceof Comparable) {
    +                   if (config.collectMin && (min == null || ((Comparable) 
tupleObject).compareTo(min) < 0)) {
    +                           min = tupleObject;
    +                   }
    +                   if (config.collectMax && (max == null || ((Comparable) 
tupleObject).compareTo(max) > 0)) {
    +                           max = tupleObject;
    +                   }
    +           }
    +           if (config.collectCountDistinct){
    +                   countDistinct.offer(tupleObject);
    +           }
    +           if (config.collectHeavyHitters){
    +                   heavyHitter.addObject(tupleObject);
    +           }
    +           cardinality+=1;
    +   }
    +
    +   public void merge(OperatorStatistics other){
    +
    +           if (this.config.collectMin && 
((Comparable)this.min).compareTo(other.min) > 0 ) {
    +                   this.min = other.min;
    +           }
    +           if (this.config.collectMax && 
((Comparable)this.max).compareTo(other.max) < 0 ) {
    +                   this.max = other.max;
    +           }
    +
    +           try {
    +                   this.heavyHitter.merge(other.heavyHitter);
    +           } catch (HeavyHitterMergeException e) {
    +                   e.printStackTrace();
    --- End diff --
    
    rethrow


> Add support for tracking statistics of intermediate results
> -----------------------------------------------------------
>
>                 Key: FLINK-1297
>                 URL: https://issues.apache.org/jira/browse/FLINK-1297
>             Project: Flink
>          Issue Type: Improvement
>          Components: Distributed Runtime
>            Reporter: Alexander Alexandrov
>            Assignee: Alexander Alexandrov
>             Fix For: 0.9
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to