[ https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14501182#comment-14501182 ]
ASF GitHub Bot commented on FLINK-1297: --------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/605#discussion_r28642379 --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/OperatorStatsAccumulatorsTest.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.accumulators; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.OperatorStatsAccumulator; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.statistics.OperatorStatistics; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Random; + +public class OperatorStatsAccumulatorsTest extends AbstractTestBase { + + private static final String ACCUMULATOR_NAME = "op-stats-accumulator"; + + public OperatorStatsAccumulatorsTest(){ + super(new Configuration()); + } + + @Test + public void testAccumulator() { + + try { + String input = ""; + + Random rand = new Random(); + + for (int i = 1; i < 1000; i++) { + if(rand.nextDouble()<0.2){ + input+=String.valueOf(rand.nextInt(5))+"\n"; + }else{ + input+=String.valueOf(rand.nextInt(100))+"\n"; + } + } + + String inputFile = createTempFile("datapoints.txt", input); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + env.readTextFile(inputFile). + flatMap(new StringToInt()). + output(new DiscardingOutputFormat<Tuple1<Integer>>()); + + JobExecutionResult result = env.execute(); + System.out.println("Accumulator results:"); + + OperatorStatistics globalStats = result.getOperatorStatisticsResult(ACCUMULATOR_NAME); + System.out.println("Global Stats"); + System.out.println(globalStats.toString()); + + OperatorStatistics[] localStats = result.getLocalOperatorStatisticsResults(ACCUMULATOR_NAME); + System.out.println("Local stats: 0"); + System.out.println(localStats[0].toString()); + + OperatorStatistics merged = localStats[0].clone(); + for (int i=1;i<localStats.length;i++) { + merged.merge(localStats[i]); + System.out.println("Local stats: "+i); + System.out.println(localStats[i].toString()); + } + Assert.assertEquals(merged.getMin(),globalStats.getMin()); + Assert.assertEquals(merged.getMax(),globalStats.getMax()); + Assert.assertEquals(merged.estimateCountDistinct(),globalStats.estimateCountDistinct()); + Assert.assertEquals(merged.getHeavyHitters().size(),globalStats.getHeavyHitters().size()); + + } + catch (Exception e) { --- End diff -- Tests fail anyways if an Exception is thrown. No need for the catch => fail. > Add support for tracking statistics of intermediate results > ----------------------------------------------------------- > > Key: FLINK-1297 > URL: https://issues.apache.org/jira/browse/FLINK-1297 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime > Reporter: Alexander Alexandrov > Assignee: Alexander Alexandrov > Fix For: 0.9 > > Original Estimate: 1,008h > Remaining Estimate: 1,008h > > One of the major problems related to the optimizer at the moment is the lack > of proper statistics. > With the introduction of staged execution, it is possible to instrument the > runtime code with a statistics facility that collects the required > information for optimizing the next execution stage. > I would therefore like to contribute code that can be used to gather basic > statistics for the (intermediate) result of dataflows (e.g. min, max, count, > count distinct) and make them available to the job manager. > Before I start, I would like to hear some feedback form the other users. > In particular, to handle skew (e.g. on grouping) it might be good to have > some sort of detailed sketch about the key distribution of an intermediate > result. I am not sure whether a simple histogram is the most effective way to > go. Maybe somebody would propose another lightweight sketch that provides > better accuracy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)