[ https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14499428#comment-14499428 ]
ASF GitHub Bot commented on FLINK-1297: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/605#discussion_r28575964 --- Diff: flink-core/src/main/java/org/apache/flink/statistics/OperatorStatistics.java --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statistics; + +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; +import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import com.clearspring.analytics.stream.cardinality.ICardinality; +import com.clearspring.analytics.stream.cardinality.LinearCounting; +import org.apache.flink.statistics.heavyhitters.IHeavyHitter; +import org.apache.flink.statistics.heavyhitters.LossyCounting; +import org.apache.flink.statistics.heavyhitters.CountMinHeavyHitter; +import org.apache.flink.statistics.heavyhitters.HeavyHitterMergeException; + +import java.io.Serializable; +import java.util.Map; + +/** + * Data structure that encapsulates statistical information of data that has only been processed by one pass + * This statistical information is meant to help determine the distribution of the data that has been processed + * in an operator so that we can determine if it is necessary to repartition the data + * + * The statistics to be gathered are configurable and represented by a {@link OperatorStatisticsConfig} object. + * + * The information encapsulated in this class is min, max, a structure enabling estimation of count distinct and a + * structure holding the heavy hitters along with their frequency. + * + */ +public class OperatorStatistics implements Serializable { + + OperatorStatisticsConfig config; + + Object min; + Object max; + ICardinality countDistinct; + IHeavyHitter heavyHitter; + long cardinality = 0; + + public OperatorStatistics(OperatorStatisticsConfig config) { + this.config = config; + if (config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING)) { + countDistinct = new LinearCounting(OperatorStatisticsConfig.COUNTD_BITMAP_SIZE); + } + if(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){ + countDistinct = new HyperLogLog(OperatorStatisticsConfig.COUNTD_LOG2M); + } + if (config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){ + heavyHitter = + new LossyCounting(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, OperatorStatisticsConfig.HEAVY_HITTER_ERROR); + } + if (config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)){ + heavyHitter = + new CountMinHeavyHitter(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, + OperatorStatisticsConfig.HEAVY_HITTER_ERROR, + OperatorStatisticsConfig.HEAVY_HITTER_CONFIDENCE, + OperatorStatisticsConfig.HEAVY_HITTER_SEED); + } + } + + public void process(Object tupleObject){ + if (tupleObject instanceof Comparable) { + if (config.collectMin && (min == null || ((Comparable) tupleObject).compareTo(min) < 0)) { + min = tupleObject; + } + if (config.collectMax && (max == null || ((Comparable) tupleObject).compareTo(max) > 0)) { + max = tupleObject; + } + } + if (config.collectCountDistinct){ + countDistinct.offer(tupleObject); + } + if (config.collectHeavyHitters){ + heavyHitter.addObject(tupleObject); + } + cardinality+=1; + } + + public void merge(OperatorStatistics other){ + + if (this.config.collectMin && ((Comparable)this.min).compareTo(other.min) > 0 ) { + this.min = other.min; + } + if (this.config.collectMax && ((Comparable)this.max).compareTo(other.max) < 0 ) { + this.max = other.max; + } + + try { + this.heavyHitter.merge(other.heavyHitter); + } catch (HeavyHitterMergeException e) { + e.printStackTrace(); --- End diff -- rethrow > Add support for tracking statistics of intermediate results > ----------------------------------------------------------- > > Key: FLINK-1297 > URL: https://issues.apache.org/jira/browse/FLINK-1297 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime > Reporter: Alexander Alexandrov > Assignee: Alexander Alexandrov > Fix For: 0.9 > > Original Estimate: 1,008h > Remaining Estimate: 1,008h > > One of the major problems related to the optimizer at the moment is the lack > of proper statistics. > With the introduction of staged execution, it is possible to instrument the > runtime code with a statistics facility that collects the required > information for optimizing the next execution stage. > I would therefore like to contribute code that can be used to gather basic > statistics for the (intermediate) result of dataflows (e.g. min, max, count, > count distinct) and make them available to the job manager. > Before I start, I would like to hear some feedback form the other users. > In particular, to handle skew (e.g. on grouping) it might be good to have > some sort of detailed sketch about the key distribution of an intermediate > result. I am not sure whether a simple histogram is the most effective way to > go. Maybe somebody would propose another lightweight sketch that provides > better accuracy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)