[ https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14498302#comment-14498302 ]
ASF GitHub Bot commented on FLINK-1297: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/605#discussion_r28528360 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/accumulators/OperatorStatsAccumulator.java --- @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.accumulators; + +import org.apache.flink.statistics.OperatorStatistics; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +/** + * This accumulator wraps the class {@link org.apache.flink.api.common.accumulators.OperatorStatisticsResult} to track + * estimated values for count distinct and heavy hitters. It has lower memory requirements than + * {@link org.apache.flink.api.common.accumulators.Histogram}. + * Our goal in accumulating stats per operator is to not only track global stats, but also know the local stats of the + * subtasks of a task. For this purpose, {@link org.apache.flink.api.common.accumulators.OperatorStatisticsResult} + * encapsulates one global accumulator and an array of local accumulators. + * + * The constructor of the class receives a subTaskIndex, and the total number of subtasks. + * Both parameters are found in the + * {@link org.apache.flink.api.common.functions.RuntimeContext} from where the constructor is called. + */ +public class OperatorStatsAccumulator implements Accumulator<Object, OperatorStatisticsResult> { + + + private OperatorStatisticsResult localValue; + + public OperatorStatsAccumulator(int subTaskIndex, int numSubtasks){ + localValue = new OperatorStatisticsResult(subTaskIndex, numSubtasks); + } + + @Override + public void add(Object value) { + localValue.add(value); + } + + @Override + public OperatorStatisticsResult getLocalValue() { + return localValue; + } + + @Override + public void resetLocal() { + localValue.resetLocal(); + } + + @Override + public void merge(Accumulator<Object, OperatorStatisticsResult> other) { + localValue.merge(other.getLocalValue()); + } + + //todo this has not been tested + public void write(ObjectOutputStream out) throws IOException { + out.writeObject(localValue.getLocal()); + + } + + //todo this has not been tested + public void read(ObjectInputStream in) throws IOException { + try { + localValue.setLocal((OperatorStatistics) in.readObject()); + } catch (ClassNotFoundException e) { + e.printStackTrace(); --- End diff -- Just doing e.printStackTrace() will make it extremely hard to debug the error. You'll find a stacktrace without any message in the stderr file ... The flink programm will continue running in an erroneous state. I would recommend re-throwing the exception as a runtime exception. > Add support for tracking statistics of intermediate results > ----------------------------------------------------------- > > Key: FLINK-1297 > URL: https://issues.apache.org/jira/browse/FLINK-1297 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime > Reporter: Alexander Alexandrov > Assignee: Alexander Alexandrov > Fix For: 0.9 > > Original Estimate: 1,008h > Remaining Estimate: 1,008h > > One of the major problems related to the optimizer at the moment is the lack > of proper statistics. > With the introduction of staged execution, it is possible to instrument the > runtime code with a statistics facility that collects the required > information for optimizing the next execution stage. > I would therefore like to contribute code that can be used to gather basic > statistics for the (intermediate) result of dataflows (e.g. min, max, count, > count distinct) and make them available to the job manager. > Before I start, I would like to hear some feedback form the other users. > In particular, to handle skew (e.g. on grouping) it might be good to have > some sort of detailed sketch about the key distribution of an intermediate > result. I am not sure whether a simple histogram is the most effective way to > go. Maybe somebody would propose another lightweight sketch that provides > better accuracy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)