[ https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14505292#comment-14505292 ]
ASF GitHub Bot commented on FLINK-1297: --------------------------------------- Github user aalexandrov commented on a diff in the pull request: https://github.com/apache/flink/pull/605#discussion_r28799879 --- Diff: flink-contrib/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorsTest.java --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.operatorstatistics; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Map; +import java.util.Random; + +public class OperatorStatsAccumulatorsTest extends AbstractTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(OperatorStatsAccumulatorsTest.class); + + private static final String ACCUMULATOR_NAME = "op-stats"; + + public OperatorStatsAccumulatorsTest(){ + super(new Configuration()); + } + + @Test + public void testAccumulator() throws Exception { + + String input = ""; + + Random rand = new Random(); + + for (int i = 1; i < 1000; i++) { + if(rand.nextDouble()<0.2){ + input+=String.valueOf(rand.nextInt(5))+"\n"; + }else{ + input+=String.valueOf(rand.nextInt(100))+"\n"; + } + } + + String inputFile = createTempFile("datapoints.txt", input); + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + env.readTextFile(inputFile). + flatMap(new StringToInt()). + output(new DiscardingOutputFormat<Tuple1<Integer>>()); + + JobExecutionResult result = env.execute(); + + OperatorStatistics globalStats = result.getAccumulatorResult(ACCUMULATOR_NAME); + LOG.debug("Global Stats"); + LOG.debug(globalStats.toString()); + + OperatorStatistics merged = null; + + Map<String,Object> accResults = result.getAllAccumulatorResults(); + for (String accumulatorName:accResults.keySet()){ + if (accumulatorName.contains(ACCUMULATOR_NAME+"-")){ + OperatorStatistics localStats = (OperatorStatistics) accResults.get(accumulatorName); + if (merged == null){ + merged = localStats.clone(); + }else { + merged.merge(localStats); + } + LOG.debug("Local Stats: " + accumulatorName); + LOG.debug(localStats.toString()); + } + } + + Assert.assertEquals(globalStats.cardinality,999); + Assert.assertEquals(globalStats.estimateCountDistinct(),100); + Assert.assertTrue(globalStats.getHeavyHitters().size()>0 && globalStats.getHeavyHitters().size()<=5); + Assert.assertEquals(merged.getMin(),globalStats.getMin()); + Assert.assertEquals(merged.getMax(),globalStats.getMax()); + Assert.assertEquals(merged.estimateCountDistinct(),globalStats.estimateCountDistinct()); + Assert.assertEquals(merged.getHeavyHitters().size(),globalStats.getHeavyHitters().size()); + + } + + public static class StringToInt extends RichFlatMapFunction<String, Tuple1<Integer>> { + + // Is instantiated later since the runtime context is not yet initialized + private Accumulator<Object, Serializable> globalAccumulator; + private Accumulator<Object,Serializable>[] localAccumulators; --- End diff -- Does it make sense to use some event notification logic for pluggable components at critical points (e.g. open/close method of UDFs)? > Add support for tracking statistics of intermediate results > ----------------------------------------------------------- > > Key: FLINK-1297 > URL: https://issues.apache.org/jira/browse/FLINK-1297 > Project: Flink > Issue Type: Improvement > Components: Distributed Runtime > Reporter: Alexander Alexandrov > Assignee: Alexander Alexandrov > Fix For: 0.9 > > Original Estimate: 1,008h > Remaining Estimate: 1,008h > > One of the major problems related to the optimizer at the moment is the lack > of proper statistics. > With the introduction of staged execution, it is possible to instrument the > runtime code with a statistics facility that collects the required > information for optimizing the next execution stage. > I would therefore like to contribute code that can be used to gather basic > statistics for the (intermediate) result of dataflows (e.g. min, max, count, > count distinct) and make them available to the job manager. > Before I start, I would like to hear some feedback form the other users. > In particular, to handle skew (e.g. on grouping) it might be good to have > some sort of detailed sketch about the key distribution of an intermediate > result. I am not sure whether a simple histogram is the most effective way to > go. Maybe somebody would propose another lightweight sketch that provides > better accuracy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)