Hej,

Sorry it took so long to respond I needed to check if I was actually
allowed to share the code since it uses internal datasets.

In the appendix of this email you will find the main class of this job
without the supporting classes or the actual dataset. If you want to run it
you need to replace the dataset by something else but that should be
trivial.
If you just want to see the problem itself, have a look at the appended log
in conjunction with the code. Each ERROR printout in the log relates to an
accumulator receiving wrong values.

cheers Martin

On Sat, Oct 3, 2015 at 11:29 AM, Márton Balassi <balassi.mar...@gmail.com>
wrote:

> Hey,
>
> Thanks for reporting the problem, Martin. I have not merged the PR Stephan
> is referring to yet. [1] There I am cleaning up some of the internals too.
> Just out of curiosity, could you share the code for the failing test
> please?
>
> [1] https://github.com/apache/flink/pull/1155
>
> On Fri, Oct 2, 2015 at 8:26 PM, Martin Neumann <mneum...@sics.se> wrote:
>
> > One of my colleagues found it today when we where hunting bugs today. We
> > where using the latest 0.10 version pulled from maven this morning.
> > The program we where testing is new code so I cant tell you if the
> behavior
> > has changed or if it was always like this.
> >
> > On Fri, Oct 2, 2015 at 7:46 PM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > I think these operations were recently moved to the internal state
> > > interface. Did the behavior change then?
> > >
> > > @Marton or Gyula, can you comment? Is it per chance not mapped to the
> > > partitioned state?
> > >
> > > On Fri, Oct 2, 2015 at 6:37 PM, Martin Neumann <mneum...@sics.se>
> wrote:
> > >
> > > > Hej,
> > > >
> > > > In one of my Programs I run a Fold on a GroupedDataStream. The aim is
> > to
> > > > aggregate the values in each group.
> > > > It seems the aggregator in the Fold function is shared on operator
> > level,
> > > > so all groups that end up on the same operator get mashed together.
> > > >
> > > > Is this the wanted behavior? If so, what do I have to do to separate
> > > them?
> > > >
> > > >
> > > > cheers Martin
> > > >
> > >
> >
>
import com.ericsson.config.Configuration;
import com.ericsson.pojos.DataPojo;
import com.ericsson.pojos.out.TimeDrift;
import com.ericsson.timedrift.TimeShiftWindowMapFunction;
import com.ericsson.utils.DataPojoSerializer;
import com.ericsson.utils.PojoSerializer;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedDataStream;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.concurrent.TimeUnit;


/**
 * Time drift host and time based.
 */
public class TimeDriftKafkaExampleTst {
    public static void main(String[] args) throws Exception {


        
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<String> in =  env.readTextFile("./datasets/first10000.json");
        DataStream<DataPojo> events = in.map(new DeserialiseMap()).returns(DataPojo.class);

        KeyedDataStream<DataPojo, String> groupedEvents = events.groupBy(t -> {
            return t.getHost();
        });
        WindowedDataStream<TimeDrift> drift = groupedEvents
                //.window(Count.of(50)).every(Count.of(10))
                .window(Time.of(1, TimeUnit.MINUTES)).every(Time.of(20, TimeUnit.SECONDS))
                .foldWindow(new TimeDrift(),new TimeShiftFold());
             //.mapWindow(new TimeShiftWindowMapFunction());

        DataStream<TimeDrift> out = drift.flatten();
        out.print();


        env.execute();

    }
    private static class DeserialiseMap implements MapFunction<String, DataPojo> {
        private static final DataPojoSerializer deserializer = new DataPojoSerializer();

        @Override
        public DataPojo map(String value) throws Exception {
            return deserializer.deserialize(value.getBytes());
        }
    }
    private static class TimeShiftFold implements FoldFunction<DataPojo, TimeDrift> {

        @Override
        public TimeDrift fold(TimeDrift accumulator, DataPojo value) throws Exception {
            long ts = value.getTimestampInMillis();
            long highest = accumulator.highest;
            if(accumulator.host!= null && !accumulator.host.equalsIgnoreCase(value.getHost())){
                System.err.println(accumulator.host + " != " + value.getHost());
            }
            accumulator.host = value.getHost();
            accumulator.totalNbEvents += 1;

            if(highest>ts){
                accumulator.totalOoOrderEvents += 1;
                accumulator.accumulatedDelta += highest - ts;
                return accumulator;
            }

            // else case
            accumulator.highest = ts;
            return accumulator;
        }
    }
}
/usr/lib/jvm/java-8-oracle/bin/java -Didea.launcher.port=7533 -Didea.launcher.bin.path=/home/martin/Software/idea-IC-141.1532.4/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-8-oracle/jre/lib/javaws.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jsse.jar:/usr/lib/jvm/java-8-oracle/jre/lib/deploy.jar:/usr/lib/jvm/java-8-oracle/jre/lib/management-agent.jar:/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jce.jar:/usr/lib/jvm/java-8-oracle/jre/lib/charsets.jar:/usr/lib/jvm/java-8-oracle/jre/lib/plugin.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jfxswt.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jfr.jar:/usr/lib/jvm/java-8-oracle/jre/lib/resources.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/jfxrt.jar:/home/martin/Workspaces/summerschool/ericsson-flink/flink-preprocessing/target/classes:/home/martin/.m2/repository/org/apache/flink/flink-java/0.10-SNAPSHOT/flink-java-0.10-20150929.231545-244.jar:/home/martin/.m2/repository/org/apache/flink/flink-core/0.10-SNAPSHOT/flink-core-0.10-20150929.231451-244.jar:/home/martin/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar:/home/martin/.m2/repository/org/apache/flink/flink-shaded-hadoop2/0.10-SNAPSHOT/flink-shaded-hadoop2-0.10-20150929.231241-33.jar:/home/martin/.m2/repository/org/apache/avro/avro/1.7.6/avro-1.7.6.jar:/home/martin/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar:/home/martin/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar:/home/martin/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar:/home/martin/.m2/repository/org/xerial/snappy/snappy-java/1.0.5/snappy-java-1.0.5.jar:/home/martin/.m2/repository/org/apache/commons/commons-compress/1.4.1/commons-compress-1.4.1.jar:/home/martin/.m2/repository/org/tukaani/xz/1.0/xz-1.0.jar:/home/martin/.m2/repository/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar:/home/martin/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/home/martin/.m2/repository/org/objenesis/objenesis/2.1/objenesis-2.1.jar:/home/martin/.m2/repository/com/twitter/chill_2.10/0.5.2/chill_2.10-0.5.2.jar:/home/martin/.m2/repository/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar:/home/martin/.m2/repository/com/twitter/chill-java/0.5.2/chill-java-0.5.2.jar:/home/martin/.m2/repository/com/twitter/chill-avro_2.10/0.5.2/chill-avro_2.10-0.5.2.jar:/home/martin/.m2/repository/com/twitter/chill-bijection_2.10/0.5.2/chill-bijection_2.10-0.5.2.jar:/home/martin/.m2/repository/com/twitter/bijection-core_2.10/0.7.2/bijection-core_2.10-0.7.2.jar:/home/martin/.m2/repository/com/twitter/bijection-avro_2.10/0.7.2/bijection-avro_2.10-0.7.2.jar:/home/martin/.m2/repository/de/javakaffee/kryo-serializers/0.27/kryo-serializers-0.27.jar:/home/martin/.m2/repository/joda-time/joda-time/2.5/joda-time-2.5.jar:/home/martin/.m2/repository/org/apache/commons/commons-math3/3.5/commons-math3-3.5.jar:/home/martin/.m2/repository/org/slf4j/slf4j-api/1.7.7/slf4j-api-1.7.7.jar:/home/martin/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar:/home/martin/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/home/martin/.m2/repository/org/apache/flink/flink-streaming-core/0.10-SNAPSHOT/flink-streaming-core-0.10-20150929.232749-239.jar:/home/martin/.m2/repository/org/apache/flink/flink-runtime/0.10-SNAPSHOT/flink-runtime-0.10-20150929.231914-243.jar:/home/martin/.m2/repository/com/amazonaws/aws-java-sdk/1.8.1/aws-java-sdk-1.8.1.jar:/home/martin/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar:/home/martin/.m2/repository/org/apache/httpcomponents/httpclient/4.2/httpclient-4.2.jar:/home/martin/.m2/repository/org/apache/httpcomponents/httpcore/4.2/httpcore-4.2.jar:/home/martin/.m2/repository/io/netty/netty-all/4.0.31.Final/netty-all-4.0.31.Final.jar:/home/martin/.m2/repository/org/javassist/javassist/3.18.1-GA/javassist-3.18.1-GA.jar:/home/martin/.m2/repository/org/codehaus/jettison/jettison/1.1/jettison-1.1.jar:/home/martin/.m2/repository/stax/stax-api/1.0.1/stax-api-1.0.1.jar:/home/martin/.m2/repository/com/typesafe/akka/akka-actor_2.10/2.3.7/akka-actor_2.10-2.3.7.jar:/home/martin/.m2/repository/com/typesafe/config/1.2.1/config-1.2.1.jar:/home/martin/.m2/repository/com/typesafe/akka/akka-remote_2.10/2.3.7/akka-remote_2.10-2.3.7.jar:/home/martin/.m2/repository/io/netty/netty/3.8.0.Final/netty-3.8.0.Final.jar:/home/martin/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar:/home/martin/.m2/repository/org/uncommons/maths/uncommons-maths/1.2.2a/uncommons-maths-1.2.2a.jar:/home/martin/.m2/repository/com/typesafe/akka/akka-slf4j_2.10/2.3.7/akka-slf4j_2.10-2.3.7.jar:/home/martin/.m2/repository/org/clapper/grizzled-slf4j_2.10/1.0.2/grizzled-slf4j_2.10-1.0.2.jar:/home/martin/.m2/repository/com/github/scopt/scopt_2.10/3.2.0/scopt_2.10-3.2.0.jar:/home/martin/.m2/repository/io/dropwizard/metrics/metrics-core/3.1.0/metrics-core-3.1.0.jar:/home/martin/.m2/repository/io/dropwizard/metrics/metrics-jvm/3.1.0/metrics-jvm-3.1.0.jar:/home/martin/.m2/repository/io/dropwizard/metrics/metrics-json/3.1.0/metrics-json-3.1.0.jar:/home/martin/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar:/home/martin/.m2/repository/jline/jline/0.9.94/jline-0.9.94.jar:/home/martin/.m2/repository/org/apache/flink/flink-shaded-curator/0.10-SNAPSHOT/flink-shaded-curator-0.10-20150929.231341-92.jar:/home/martin/.m2/repository/org/apache/commons/commons-math/2.2/commons-math-2.2.jar:/home/martin/.m2/repository/org/apache/sling/org.apache.sling.commons.json/2.0.6/org.apache.sling.commons.json-2.0.6.jar:/home/martin/.m2/repository/org/apache/flink/flink-clients/0.10-SNAPSHOT/flink-clients-0.10-20150929.232028-242.jar:/home/martin/.m2/repository/org/apache/flink/flink-optimizer/0.10-SNAPSHOT/flink-optimizer-0.10-20150929.231952-243.jar:/home/martin/.m2/repository/org/eclipse/jetty/jetty-server/8.0.0.M1/jetty-server-8.0.0.M1.jar:/home/martin/.m2/repository/org/mortbay/jetty/servlet-api/3.0.20100224/servlet-api-3.0.20100224.jar:/home/martin/.m2/repository/org/eclipse/jetty/jetty-continuation/8.0.0.M1/jetty-continuation-8.0.0.M1.jar:/home/martin/.m2/repository/org/eclipse/jetty/jetty-http/8.0.0.M1/jetty-http-8.0.0.M1.jar:/home/martin/.m2/repository/org/eclipse/jetty/jetty-io/8.0.0.M1/jetty-io-8.0.0.M1.jar:/home/martin/.m2/repository/org/eclipse/jetty/jetty-util/8.0.0.M1/jetty-util-8.0.0.M1.jar:/home/martin/.m2/repository/org/eclipse/jetty/jetty-security/8.0.0.M1/jetty-security-8.0.0.M1.jar:/home/martin/.m2/repository/org/eclipse/jetty/jetty-servlet/8.0.0.M1/jetty-servlet-8.0.0.M1.jar:/home/martin/.m2/repository/commons-fileupload/commons-fileupload/1.3.1/commons-fileupload-1.3.1.jar:/home/martin/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/home/martin/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/home/martin/.m2/repository/org/apache/flink/flink-connector-kafka/0.10-SNAPSHOT/flink-connector-kafka-0.10-20150929.233714-237.jar:/home/martin/.m2/repository/org/apache/kafka/kafka_2.10/0.8.2.0/kafka_2.10-0.8.2.0.jar:/home/martin/.m2/repository/org/apache/kafka/kafka-clients/0.8.2.0/kafka-clients-0.8.2.0.jar:/home/martin/.m2/repository/net/jpountz/lz4/lz4/1.2.0/lz4-1.2.0.jar:/home/martin/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar:/home/martin/.m2/repository/com/101tec/zkclient/0.3/zkclient-0.3.jar:/home/martin/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.6.1/jackson-core-2.6.1.jar:/home/martin/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.6.1/jackson-databind-2.6.1.jar:/home/martin/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.6.1/jackson-annotations-2.6.1.jar:/home/martin/.m2/repository/org/apache/commons/commons-lang3/3.4/commons-lang3-3.4.jar:/home/martin/.m2/repository/junit/junit/4.12/junit-4.12.jar:/home/martin/.m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar:/home/martin/Software/idea-IC-141.1532.4/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain com.ericsson.examples.TimeDriftKafkaExampleTst
21:57:00,307 INFO  [org.apache.flink.streaming.util.ClusterUtil]:63 - Running on mini cluster
21:57:00,625 INFO  [org.apache.flink.runtime.minicluster.FlinkMiniCluster]:229 - Starting FlinkMiniCluster.
21:57:01,133 INFO  [akka.event.slf4j.Slf4jLogger$$anonfun$receive$1]:80 - Slf4jLogger started
21:57:01,152 INFO  [org.apache.flink.runtime.blob.BlobServer]:83 - Created BLOB server storage directory /tmp/blobStore-31133bf1-5ff5-4270-99d6-e1950e62ffb5
21:57:01,152 INFO  [org.apache.flink.runtime.blob.BlobServer]:122 - Started BLOB server at 0.0.0.0:36740 - max concurrent requests: 50 - max backlog: 1000
21:57:01,166 INFO  [grizzled.slf4j.Logger]:128 - Starting JobManager at akka://flink/user/jobmanager_1.
21:57:01,166 INFO  [grizzled.slf4j.Logger]:128 - Started memory archivist akka://flink/user/archive_1
21:57:01,170 INFO  [grizzled.slf4j.Logger]:128 - JobManager akka://flink/user/jobmanager_1 was granted leadership with leader session ID None.
21:57:01,179 INFO  [grizzled.slf4j.Logger]:128 - Messages between TaskManager and JobManager have a max timeout of 100000 milliseconds
21:57:01,182 INFO  [grizzled.slf4j.Logger]:128 - Temporary file directory '/tmp': total 161 GB, usable 52 GB (32.30% usable)
21:57:01,292 INFO  [org.apache.flink.runtime.io.network.buffer.NetworkBufferPool]:121 - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768).
21:57:01,293 INFO  [grizzled.slf4j.Logger]:128 - Using 238 MB for Flink managed memory.
21:57:01,723 INFO  [org.apache.flink.runtime.io.disk.iomanager.IOManager]:94 - I/O manager uses directory /tmp/flink-io-88a552ab-5076-4d7e-8ab1-79d1eedde6fd for spill files.
21:57:01,730 INFO  [org.apache.flink.runtime.filecache.FileCache]:88 - User file cache uses directory /tmp/flink-dist-cache-e76d4927-1ea9-4942-bec8-edd4b5f417cd
21:57:01,754 INFO  [grizzled.slf4j.Logger]:128 - Starting TaskManager actor at akka://flink/user/taskmanager_1#-88623183.
21:57:01,754 INFO  [grizzled.slf4j.Logger]:128 - TaskManager data connection information: localhost (dataPort=56309)
21:57:01,755 INFO  [grizzled.slf4j.Logger]:128 - TaskManager has 4 task slot(s).
21:57:01,756 INFO  [grizzled.slf4j.Logger]:128 - Memory usage stats: [HEAP: 313/507/812 MB, NON HEAP: 26/27/-1 MB (used/committed/max)]
21:57:01,760 INFO  [grizzled.slf4j.Logger]:128 - Trying to register at JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 milliseconds)
21:57:01,768 INFO  [org.apache.flink.runtime.instance.InstanceManager]:179 - Registered TaskManager at localhost (akka://flink/user/taskmanager_1) as 342d07625c52f727253518a314dbe4e9. Current number of registered hosts is 1. Current number of alive task slots is 4.
21:57:01,771 INFO  [grizzled.slf4j.Logger]:128 - Successful registration at JobManager (akka://flink/user/jobmanager_1), starting network stack and library cache.
21:57:01,775 INFO  [grizzled.slf4j.Logger]:128 - Determined BLOB server address to be localhost/127.0.0.1:36740. Starting BLOB cache.
21:57:01,776 INFO  [org.apache.flink.runtime.blob.BlobCache]:70 - Created BLOB cache storage directory /tmp/blobStore-08df4946-6f31-4ee6-9a58-996321bf24cd
21:57:01,788 INFO  [org.apache.flink.runtime.client.JobClientActor]:89 - Sending message to JobManager akka://flink/user/jobmanager_1 to submit job Flink Streaming Job (c652293e8336786f398e85124a13ed78) and wait for progress
21:57:01,791 INFO  [grizzled.slf4j.Logger]:128 - Received job c652293e8336786f398e85124a13ed78 (Flink Streaming Job).
21:57:01,842 INFO  [grizzled.slf4j.Logger]:128 - Scheduling job Flink Streaming Job.
21:57:01,842 INFO  [org.apache.flink.runtime.client.JobClientActor]:135 - Job was successfully submitted to the JobManager
21:57:01,843 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	Job execution switched to status RUNNING.
10/05/2015 21:57:01	Job execution switched to status RUNNING.
21:57:01,844 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (1/4) (e0f71f47ea8f178e80ed42a118cca055) switched from CREATED to SCHEDULED
21:57:01,845 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	Source: Read Text File Source -> Map(1/4) switched to SCHEDULED 
10/05/2015 21:57:01	Source: Read Text File Source -> Map(1/4) switched to SCHEDULED 
21:57:01,849 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (1/4) (e0f71f47ea8f178e80ed42a118cca055) switched from SCHEDULED to DEPLOYING
21:57:01,849 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	Source: Read Text File Source -> Map(1/4) switched to DEPLOYING 
10/05/2015 21:57:01	Source: Read Text File Source -> Map(1/4) switched to DEPLOYING 
21:57:01,849 INFO  [org.apache.flink.runtime.executiongraph.Execution]:358 - Deploying Source: Read Text File Source -> Map (1/4) (attempt #0) to localhost
21:57:01,853 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (2/4) (4aa05f7bceeb6cd9396cdaa1f5df63f2) switched from CREATED to SCHEDULED
21:57:01,853 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	Source: Read Text File Source -> Map(2/4) switched to SCHEDULED 
10/05/2015 21:57:01	Source: Read Text File Source -> Map(2/4) switched to SCHEDULED 
21:57:01,854 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (2/4) (4aa05f7bceeb6cd9396cdaa1f5df63f2) switched from SCHEDULED to DEPLOYING
21:57:01,854 INFO  [org.apache.flink.runtime.executiongraph.Execution]:358 - Deploying Source: Read Text File Source -> Map (2/4) (attempt #0) to localhost
21:57:01,854 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	Source: Read Text File Source -> Map(2/4) switched to DEPLOYING 
10/05/2015 21:57:01	Source: Read Text File Source -> Map(2/4) switched to DEPLOYING 
21:57:01,855 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (3/4) (38f9a95d77a83cefe0ea17880337c393) switched from CREATED to SCHEDULED
21:57:01,856 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	Source: Read Text File Source -> Map(3/4) switched to SCHEDULED 
10/05/2015 21:57:01	Source: Read Text File Source -> Map(3/4) switched to SCHEDULED 
21:57:01,856 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (3/4) (38f9a95d77a83cefe0ea17880337c393) switched from SCHEDULED to DEPLOYING
21:57:01,856 INFO  [org.apache.flink.runtime.executiongraph.Execution]:358 - Deploying Source: Read Text File Source -> Map (3/4) (attempt #0) to localhost
21:57:01,857 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	Source: Read Text File Source -> Map(3/4) switched to DEPLOYING 
10/05/2015 21:57:01	Source: Read Text File Source -> Map(3/4) switched to DEPLOYING 
21:57:01,857 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (4/4) (c0422dc1d04fda150d744ec8d554500c) switched from CREATED to SCHEDULED
21:57:01,858 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	Source: Read Text File Source -> Map(4/4) switched to SCHEDULED 
10/05/2015 21:57:01	Source: Read Text File Source -> Map(4/4) switched to SCHEDULED 
21:57:01,858 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (4/4) (c0422dc1d04fda150d744ec8d554500c) switched from SCHEDULED to DEPLOYING
21:57:01,858 INFO  [org.apache.flink.runtime.executiongraph.Execution]:358 - Deploying Source: Read Text File Source -> Map (4/4) (attempt #0) to localhost
21:57:01,858 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	Source: Read Text File Source -> Map(4/4) switched to DEPLOYING 
10/05/2015 21:57:01	Source: Read Text File Source -> Map(4/4) switched to DEPLOYING 
21:57:01,859 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (1/4) (f6efc003319bb2ed8151671b1efd783a) switched from CREATED to SCHEDULED
21:57:01,860 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(1/4) switched to SCHEDULED 
10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(1/4) switched to SCHEDULED 
21:57:01,860 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (1/4) (f6efc003319bb2ed8151671b1efd783a) switched from SCHEDULED to DEPLOYING
21:57:01,861 INFO  [org.apache.flink.runtime.executiongraph.Execution]:358 - Deploying GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (1/4) (attempt #0) to localhost
21:57:01,861 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(1/4) switched to DEPLOYING 
10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(1/4) switched to DEPLOYING 
21:57:01,867 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (2/4) (da0e6770cf1e4b7a0579b2ef7a27b3b6) switched from CREATED to SCHEDULED
21:57:01,868 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(2/4) switched to SCHEDULED 
10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(2/4) switched to SCHEDULED 
21:57:01,868 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (2/4) (da0e6770cf1e4b7a0579b2ef7a27b3b6) switched from SCHEDULED to DEPLOYING
21:57:01,868 INFO  [org.apache.flink.runtime.executiongraph.Execution]:358 - Deploying GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (2/4) (attempt #0) to localhost
21:57:01,868 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(2/4) switched to DEPLOYING 
10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(2/4) switched to DEPLOYING 
21:57:01,871 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (3/4) (a120c4adac8de6abd96ad60395cf4393) switched from CREATED to SCHEDULED
21:57:01,871 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(3/4) switched to SCHEDULED 
10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(3/4) switched to SCHEDULED 
21:57:01,871 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (3/4) (a120c4adac8de6abd96ad60395cf4393) switched from SCHEDULED to DEPLOYING
21:57:01,872 INFO  [org.apache.flink.runtime.executiongraph.Execution]:358 - Deploying GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (3/4) (attempt #0) to localhost
21:57:01,872 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(3/4) switched to DEPLOYING 
10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(3/4) switched to DEPLOYING 
21:57:01,874 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (4/4) (4523e5a8eb8585ad16d662176f88deef) switched from CREATED to SCHEDULED
21:57:01,874 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(4/4) switched to SCHEDULED 
10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(4/4) switched to SCHEDULED 
21:57:01,875 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (4/4) (4523e5a8eb8585ad16d662176f88deef) switched from SCHEDULED to DEPLOYING
21:57:01,875 INFO  [org.apache.flink.runtime.executiongraph.Execution]:358 - Deploying GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (4/4) (attempt #0) to localhost
21:57:01,875 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(4/4) switched to DEPLOYING 
10/05/2015 21:57:01	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(4/4) switched to DEPLOYING 
21:57:01,882 INFO  [grizzled.slf4j.Logger]:137 - Status of job c652293e8336786f398e85124a13ed78 (Flink Streaming Job) changed to RUNNING.
21:57:01,887 INFO  [grizzled.slf4j.Logger]:128 - Received task Source: Read Text File Source -> Map (1/4)
21:57:01,887 INFO  [org.apache.flink.runtime.taskmanager.Task]:465 - Loading JAR files for task Source: Read Text File Source -> Map (1/4)
21:57:01,889 INFO  [org.apache.flink.runtime.taskmanager.Task]:482 - Registering task at network: Source: Read Text File Source -> Map (1/4) [DEPLOYING]
21:57:01,892 INFO  [grizzled.slf4j.Logger]:128 - Received task Source: Read Text File Source -> Map (2/4)
21:57:01,897 INFO  [grizzled.slf4j.Logger]:128 - Received task Source: Read Text File Source -> Map (3/4)
21:57:01,898 INFO  [org.apache.flink.runtime.taskmanager.Task]:465 - Loading JAR files for task Source: Read Text File Source -> Map (2/4)
21:57:01,898 INFO  [org.apache.flink.runtime.taskmanager.Task]:482 - Registering task at network: Source: Read Text File Source -> Map (2/4) [DEPLOYING]
21:57:01,900 INFO  [grizzled.slf4j.Logger]:128 - Received task Source: Read Text File Source -> Map (4/4)
21:57:01,901 INFO  [org.apache.flink.runtime.taskmanager.Task]:465 - Loading JAR files for task Source: Read Text File Source -> Map (3/4)
21:57:01,901 INFO  [org.apache.flink.runtime.taskmanager.Task]:482 - Registering task at network: Source: Read Text File Source -> Map (3/4) [DEPLOYING]
21:57:01,907 INFO  [grizzled.slf4j.Logger]:128 - Received task GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (1/4)
21:57:01,907 INFO  [org.apache.flink.runtime.taskmanager.Task]:465 - Loading JAR files for task Source: Read Text File Source -> Map (4/4)
21:57:01,907 INFO  [org.apache.flink.runtime.taskmanager.Task]:482 - Registering task at network: Source: Read Text File Source -> Map (4/4) [DEPLOYING]
21:57:01,918 INFO  [grizzled.slf4j.Logger]:128 - Received task GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (2/4)
21:57:01,924 INFO  [org.apache.flink.runtime.taskmanager.Task]:465 - Loading JAR files for task GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (1/4)
21:57:01,924 INFO  [org.apache.flink.runtime.taskmanager.Task]:465 - Loading JAR files for task GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (2/4)
21:57:01,925 INFO  [grizzled.slf4j.Logger]:128 - Received task GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (3/4)
21:57:01,926 INFO  [org.apache.flink.runtime.taskmanager.Task]:465 - Loading JAR files for task GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (3/4)
21:57:01,926 INFO  [grizzled.slf4j.Logger]:128 - Received task GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (4/4)
21:57:01,926 INFO  [org.apache.flink.runtime.taskmanager.Task]:465 - Loading JAR files for task GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (4/4)
21:57:01,927 INFO  [org.apache.flink.runtime.taskmanager.Task]:482 - Registering task at network: GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (3/4) [DEPLOYING]
21:57:01,927 INFO  [org.apache.flink.runtime.taskmanager.Task]:482 - Registering task at network: GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (1/4) [DEPLOYING]
21:57:01,927 INFO  [org.apache.flink.runtime.taskmanager.Task]:482 - Registering task at network: GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (2/4) [DEPLOYING]
21:57:01,928 INFO  [org.apache.flink.runtime.taskmanager.Task]:482 - Registering task at network: GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (4/4) [DEPLOYING]
21:57:01,967 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,969 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,969 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,969 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - Source: Read Text File Source -> Map (3/4) switched to RUNNING
21:57:01,970 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,970 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - Source: Read Text File Source -> Map (4/4) switched to RUNNING
21:57:01,973 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (3/4) (38f9a95d77a83cefe0ea17880337c393) switched from DEPLOYING to RUNNING
21:57:01,973 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (4/4) (c0422dc1d04fda150d744ec8d554500c) switched from DEPLOYING to RUNNING
21:57:01,974 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,976 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,977 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,979 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,981 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,981 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - Source: Read Text File Source -> Map (2/4) switched to RUNNING
21:57:01,984 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,985 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,986 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,986 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,986 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - Source: Read Text File Source -> Map (1/4) switched to RUNNING
21:57:01,988 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,989 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,990 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	Source: Read Text File Source -> Map(3/4) switched to RUNNING 
10/05/2015 21:57:01	Source: Read Text File Source -> Map(3/4) switched to RUNNING 
21:57:01,990 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:01	Source: Read Text File Source -> Map(4/4) switched to RUNNING 
10/05/2015 21:57:01	Source: Read Text File Source -> Map(4/4) switched to RUNNING 
21:57:01,991 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (4/4) switched to RUNNING
21:57:01,993 INFO  [org.apache.flink.api.common.io.LocatableInputSplitAssigner]:184 - Assigning remote split to host localhost
21:57:01,991 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,996 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,997 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:01,997 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:02,003 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:02,004 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:02,005 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:02,005 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:02,006 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:02,006 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (3/4) switched to RUNNING
21:57:02,006 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (2/4) switched to RUNNING
21:57:02,007 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:02,007 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:02,034 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:02,034 INFO  [org.apache.flink.streaming.runtime.tasks.StreamTask]:439 - State backend for state checkpoints is set to jobmanager.
21:57:02,036 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (1/4) switched to RUNNING
21:57:02,050 INFO  [org.apache.flink.api.common.io.LocatableInputSplitAssigner]:184 - Assigning remote split to host localhost
21:57:02,051 INFO  [org.apache.flink.api.common.io.LocatableInputSplitAssigner]:184 - Assigning remote split to host localhost
21:57:02,057 INFO  [org.apache.flink.api.common.io.LocatableInputSplitAssigner]:184 - Assigning remote split to host localhost
21:57:02,058 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (1/4) (e0f71f47ea8f178e80ed42a118cca055) switched from DEPLOYING to RUNNING
21:57:02,058 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (4/4) (4523e5a8eb8585ad16d662176f88deef) switched from DEPLOYING to RUNNING
21:57:02,059 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:02	Source: Read Text File Source -> Map(1/4) switched to RUNNING 
10/05/2015 21:57:02	Source: Read Text File Source -> Map(1/4) switched to RUNNING 
21:57:02,059 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:02	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(4/4) switched to RUNNING 
10/05/2015 21:57:02	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(4/4) switched to RUNNING 
21:57:02,074 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (2/4) (4aa05f7bceeb6cd9396cdaa1f5df63f2) switched from DEPLOYING to RUNNING
21:57:02,075 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (2/4) (da0e6770cf1e4b7a0579b2ef7a27b3b6) switched from DEPLOYING to RUNNING
21:57:02,075 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:02	Source: Read Text File Source -> Map(2/4) switched to RUNNING 
10/05/2015 21:57:02	Source: Read Text File Source -> Map(2/4) switched to RUNNING 
21:57:02,075 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (1/4) (f6efc003319bb2ed8151671b1efd783a) switched from DEPLOYING to RUNNING
21:57:02,075 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:02	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(2/4) switched to RUNNING 
10/05/2015 21:57:02	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(2/4) switched to RUNNING 
21:57:02,075 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:02	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(1/4) switched to RUNNING 
10/05/2015 21:57:02	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(1/4) switched to RUNNING 
21:57:02,074 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (3/4) (a120c4adac8de6abd96ad60395cf4393) switched from DEPLOYING to RUNNING
21:57:02,078 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:02	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(3/4) switched to RUNNING 
10/05/2015 21:57:02	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(3/4) switched to RUNNING 
21:57:02,882 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - Source: Read Text File Source -> Map (1/4) switched to FINISHED
21:57:02,882 INFO  [org.apache.flink.runtime.taskmanager.Task]:667 - Freeing task resources for Source: Read Text File Source -> Map (1/4)
21:57:02,884 INFO  [grizzled.slf4j.Logger]:128 - Unregistering task and sending final execution state FINISHED to JobManager for task Source: Read Text File Source -> Map (e0f71f47ea8f178e80ed42a118cca055)
21:57:02,887 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (1/4) (e0f71f47ea8f178e80ed42a118cca055) switched from RUNNING to FINISHED
21:57:02,888 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:02	Source: Read Text File Source -> Map(1/4) switched to FINISHED 
10/05/2015 21:57:02	Source: Read Text File Source -> Map(1/4) switched to FINISHED 
21:57:02,907 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - Source: Read Text File Source -> Map (4/4) switched to FINISHED
21:57:02,907 INFO  [org.apache.flink.runtime.taskmanager.Task]:667 - Freeing task resources for Source: Read Text File Source -> Map (4/4)
21:57:02,907 INFO  [grizzled.slf4j.Logger]:128 - Unregistering task and sending final execution state FINISHED to JobManager for task Source: Read Text File Source -> Map (c0422dc1d04fda150d744ec8d554500c)
21:57:02,908 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (4/4) (c0422dc1d04fda150d744ec8d554500c) switched from RUNNING to FINISHED
21:57:02,909 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:02	Source: Read Text File Source -> Map(4/4) switched to FINISHED 
10/05/2015 21:57:02	Source: Read Text File Source -> Map(4/4) switched to FINISHED 
21:57:02,934 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - Source: Read Text File Source -> Map (3/4) switched to FINISHED
21:57:02,934 INFO  [org.apache.flink.runtime.taskmanager.Task]:667 - Freeing task resources for Source: Read Text File Source -> Map (3/4)
21:57:02,936 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - Source: Read Text File Source -> Map (2/4) switched to FINISHED
21:57:02,936 INFO  [org.apache.flink.runtime.taskmanager.Task]:667 - Freeing task resources for Source: Read Text File Source -> Map (2/4)
21:57:02,936 INFO  [grizzled.slf4j.Logger]:128 - Unregistering task and sending final execution state FINISHED to JobManager for task Source: Read Text File Source -> Map (38f9a95d77a83cefe0ea17880337c393)
21:57:02,939 INFO  [grizzled.slf4j.Logger]:128 - Unregistering task and sending final execution state FINISHED to JobManager for task Source: Read Text File Source -> Map (4aa05f7bceeb6cd9396cdaa1f5df63f2)
21:57:02,940 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (3/4) (38f9a95d77a83cefe0ea17880337c393) switched from RUNNING to FINISHED
21:57:02,940 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:02	Source: Read Text File Source -> Map(3/4) switched to FINISHED 
10/05/2015 21:57:02	Source: Read Text File Source -> Map(3/4) switched to FINISHED 
21:57:02,940 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - Source: Read Text File Source -> Map (2/4) (4aa05f7bceeb6cd9396cdaa1f5df63f2) switched from RUNNING to FINISHED
21:57:02,941 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:02	Source: Read Text File Source -> Map(2/4) switched to FINISHED 
10/05/2015 21:57:02	Source: Read Text File Source -> Map(2/4) switched to FINISHED 
3> TimeDrift{type='flink', _type='flink', host='node-5.domain.tld', totalNbEvents=270, totalOoOrderEvents=262, accumulatedDelta=1141247, highest=1441022584798}
4> TimeDrift{type='flink', _type='flink', host='node-20.domain.tld', totalNbEvents=302, totalOoOrderEvents=296, accumulatedDelta=1252442, highest=1441022581323}
ERROR node-5.domain.tld != node-10.domain.tld
ERROR node-20.domain.tld != node-17.domain.tld
2> TimeDrift{type='flink', _type='flink', host='node-11.domain.tld', totalNbEvents=300, totalOoOrderEvents=31, accumulatedDelta=40077, highest=1441022582279}
ERROR node-11.domain.tld != node-26.domain.tld
2> TimeDrift{type='flink', _type='flink', host='node-26.domain.tld', totalNbEvents=516, totalOoOrderEvents=247, accumulatedDelta=1626958, highest=1441022582279}
4> TimeDrift{type='flink', _type='flink', host='node-17.domain.tld', totalNbEvents=588, totalOoOrderEvents=582, accumulatedDelta=2776163, highest=1441022581323}
1> TimeDrift{type='flink', _type='flink', host='node-27.domain.tld', totalNbEvents=457, totalOoOrderEvents=68, accumulatedDelta=209765, highest=1441022584045}
ERROR node-26.domain.tld != node-8.domain.tld
ERROR node-17.domain.tld != node-35.domain.tld
ERROR node-27.domain.tld != node-30.domain.tld
ERROR node-10.domain.tld != node-36.domain.tld
3> TimeDrift{type='flink', _type='flink', host='node-10.domain.tld', totalNbEvents=572, totalOoOrderEvents=564, accumulatedDelta=2214446, highest=1441022584798}
ERROR node-8.domain.tld != node-33.domain.tld
ERROR node-30.domain.tld != node-7.domain.tld
2> TimeDrift{type='flink', _type='flink', host='node-8.domain.tld', totalNbEvents=805, totalOoOrderEvents=536, accumulatedDelta=2032922, highest=1441022582279}
1> TimeDrift{type='flink', _type='flink', host='node-30.domain.tld', totalNbEvents=667, totalOoOrderEvents=278, accumulatedDelta=2098348, highest=1441022584045}
3> TimeDrift{type='flink', _type='flink', host='node-36.domain.tld', totalNbEvents=961, totalOoOrderEvents=953, accumulatedDelta=3483672, highest=1441022584798}
4> TimeDrift{type='flink', _type='flink', host='node-35.domain.tld', totalNbEvents=1079, totalOoOrderEvents=878, accumulatedDelta=4786563, highest=1441022585048}
2> TimeDrift{type='flink', _type='flink', host='node-33.domain.tld', totalNbEvents=1036, totalOoOrderEvents=583, accumulatedDelta=2254665, highest=1441022583186}
1> TimeDrift{type='flink', _type='flink', host='node-7.domain.tld', totalNbEvents=941, totalOoOrderEvents=552, accumulatedDelta=3070926, highest=1441022584045}
ERROR node-36.domain.tld != node-14.domain.tld
ERROR node-35.domain.tld != node-6.domain.tld
ERROR node-33.domain.tld != node-15.domain.tld
ERROR node-7.domain.tld != node-12.domain.tld
ERROR node-14.domain.tld != node-29.domain.tld
ERROR node-6.domain.tld != node-13.domain.tld
3> TimeDrift{type='flink', _type='flink', host='node-14.domain.tld', totalNbEvents=1257, totalOoOrderEvents=1249, accumulatedDelta=6069984, highest=1441022584798}
4> TimeDrift{type='flink', _type='flink', host='node-6.domain.tld', totalNbEvents=1372, totalOoOrderEvents=1171, accumulatedDelta=5922440, highest=1441022585048}
ERROR node-12.domain.tld != node-34.domain.tld
ERROR node-29.domain.tld != node-9.domain.tld
1> TimeDrift{type='flink', _type='flink', host='node-12.domain.tld', totalNbEvents=1238, totalOoOrderEvents=849, accumulatedDelta=3997167, highest=1441022584045}
3> TimeDrift{type='flink', _type='flink', host='node-29.domain.tld', totalNbEvents=1492, totalOoOrderEvents=1484, accumulatedDelta=7795364, highest=1441022584798}
2> TimeDrift{type='flink', _type='flink', host='node-15.domain.tld', totalNbEvents=1310, totalOoOrderEvents=857, accumulatedDelta=4091549, highest=1441022583186}
ERROR node-15.domain.tld != node-22.domain.tld
ERROR node-9.domain.tld != node-32.domain.tld
3> TimeDrift{type='flink', _type='flink', host='node-9.domain.tld', totalNbEvents=1781, totalOoOrderEvents=1773, accumulatedDelta=8990371, highest=1441022584798}
4> TimeDrift{type='flink', _type='flink', host='node-13.domain.tld', totalNbEvents=1653, totalOoOrderEvents=1452, accumulatedDelta=6814037, highest=1441022585048}
ERROR node-13.domain.tld != node-31.domain.tld
ERROR node-34.domain.tld != node-23.domain.tld
1> TimeDrift{type='flink', _type='flink', host='node-34.domain.tld', totalNbEvents=1622, totalOoOrderEvents=1102, accumulatedDelta=6352050, highest=1441022585157}
4> TimeDrift{type='flink', _type='flink', host='node-31.domain.tld', totalNbEvents=1871, totalOoOrderEvents=1670, accumulatedDelta=7383601, highest=1441022585048}
ERROR node-31.domain.tld != node-28.domain.tld
1> TimeDrift{type='flink', _type='flink', host='node-23.domain.tld', totalNbEvents=1916, totalOoOrderEvents=1396, accumulatedDelta=8819853, highest=1441022585157}
3> TimeDrift{type='flink', _type='flink', host='node-32.domain.tld', totalNbEvents=2223, totalOoOrderEvents=2215, accumulatedDelta=11894283, highest=1441022584798}
ERROR node-23.domain.tld != node-16.domain.tld
ERROR node-32.domain.tld != node-25.domain.tld
4> TimeDrift{type='flink', _type='flink', host='node-28.domain.tld', totalNbEvents=2296, totalOoOrderEvents=2095, accumulatedDelta=9833026, highest=1441022585048}
ERROR node-28.domain.tld != node-19.domain.tld
2> TimeDrift{type='flink', _type='flink', host='node-22.domain.tld', totalNbEvents=1735, totalOoOrderEvents=1079, accumulatedDelta=6042761, highest=1441022584499}
21:57:03,121 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (2/4) switched to FINISHED
21:57:03,121 INFO  [org.apache.flink.runtime.taskmanager.Task]:667 - Freeing task resources for GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (2/4)
21:57:03,122 INFO  [grizzled.slf4j.Logger]:128 - Unregistering task and sending final execution state FINISHED to JobManager for task GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (da0e6770cf1e4b7a0579b2ef7a27b3b6)
21:57:03,123 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (2/4) (da0e6770cf1e4b7a0579b2ef7a27b3b6) switched from RUNNING to FINISHED
21:57:03,123 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:03	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(2/4) switched to FINISHED 
10/05/2015 21:57:03	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(2/4) switched to FINISHED 
4> TimeDrift{type='flink', _type='flink', host='node-19.domain.tld', totalNbEvents=2580, totalOoOrderEvents=2379, accumulatedDelta=12130409, highest=1441022585048}
ERROR node-19.domain.tld != node-24.domain.tld
ERROR node-25.domain.tld != node-21.domain.tld
1> TimeDrift{type='flink', _type='flink', host='node-16.domain.tld', totalNbEvents=2194, totalOoOrderEvents=1674, accumulatedDelta=11209936, highest=1441022585157}
3> TimeDrift{type='flink', _type='flink', host='node-25.domain.tld', totalNbEvents=2684, totalOoOrderEvents=2676, accumulatedDelta=14910488, highest=1441022584798}
21:57:03,136 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (1/4) switched to FINISHED
21:57:03,136 INFO  [org.apache.flink.runtime.taskmanager.Task]:667 - Freeing task resources for GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (1/4)
4> TimeDrift{type='flink', _type='flink', host='node-24.domain.tld', totalNbEvents=2799, totalOoOrderEvents=2598, accumulatedDelta=14150959, highest=1441022585048}
21:57:03,137 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (4/4) switched to FINISHED
21:57:03,137 INFO  [org.apache.flink.runtime.taskmanager.Task]:667 - Freeing task resources for GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (4/4)
21:57:03,138 INFO  [grizzled.slf4j.Logger]:128 - Unregistering task and sending final execution state FINISHED to JobManager for task GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (f6efc003319bb2ed8151671b1efd783a)
21:57:03,139 INFO  [grizzled.slf4j.Logger]:128 - Unregistering task and sending final execution state FINISHED to JobManager for task GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (4523e5a8eb8585ad16d662176f88deef)
21:57:03,141 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (1/4) (f6efc003319bb2ed8151671b1efd783a) switched from RUNNING to FINISHED
21:57:03,141 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:03	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(1/4) switched to FINISHED 
10/05/2015 21:57:03	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(1/4) switched to FINISHED 
21:57:03,142 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (4/4) (4523e5a8eb8585ad16d662176f88deef) switched from RUNNING to FINISHED
21:57:03,142 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:03	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(4/4) switched to FINISHED 
10/05/2015 21:57:03	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(4/4) switched to FINISHED 
3> TimeDrift{type='flink', _type='flink', host='node-21.domain.tld', totalNbEvents=3079, totalOoOrderEvents=3071, accumulatedDelta=17169838, highest=1441022584798}
ERROR node-21.domain.tld != node-18.domain.tld
3> TimeDrift{type='flink', _type='flink', host='node-18.domain.tld', totalNbEvents=3272, totalOoOrderEvents=3264, accumulatedDelta=17750058, highest=1441022584798}
21:57:03,167 INFO  [org.apache.flink.runtime.taskmanager.Task]:850 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (3/4) switched to FINISHED
21:57:03,168 INFO  [org.apache.flink.runtime.taskmanager.Task]:667 - Freeing task resources for GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (3/4)
21:57:03,168 INFO  [grizzled.slf4j.Logger]:128 - Unregistering task and sending final execution state FINISHED to JobManager for task GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (a120c4adac8de6abd96ad60395cf4393)
21:57:03,169 INFO  [org.apache.flink.runtime.executiongraph.Execution]:934 - GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed (3/4) (a120c4adac8de6abd96ad60395cf4393) switched from RUNNING to FINISHED
21:57:03,170 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:03	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(3/4) switched to FINISHED 
10/05/2015 21:57:03	GroupedActiveDiscretizer -> BasicWindowBuffer -> Fold Window -> Window Flatten -> Sink: Unnamed(3/4) switched to FINISHED 
21:57:03,170 INFO  [grizzled.slf4j.Logger]:137 - Status of job c652293e8336786f398e85124a13ed78 (Flink Streaming Job) changed to FINISHED.
21:57:03,170 INFO  [org.apache.flink.runtime.client.JobClientActor]:164 - 10/05/2015 21:57:03	Job execution switched to status FINISHED.
10/05/2015 21:57:03	Job execution switched to status FINISHED.
21:57:03,174 INFO  [org.apache.flink.runtime.client.JobClient]:179 - Job execution complete
21:57:03,175 INFO  [org.apache.flink.runtime.minicluster.FlinkMiniCluster]:310 - Stopping FlinkMiniCluster.
21:57:03,183 INFO  [grizzled.slf4j.Logger]:128 - Stopping TaskManager akka://flink/user/taskmanager_1#-88623183.
21:57:03,183 INFO  [grizzled.slf4j.Logger]:128 - Disassociating from JobManager
21:57:03,185 INFO  [grizzled.slf4j.Logger]:128 - Stopping JobManager akka://flink/user/jobmanager_1.
21:57:03,197 INFO  [org.apache.flink.runtime.blob.BlobServer]:263 - Stopped BLOB server at 0.0.0.0:36740
21:57:03,206 INFO  [org.apache.flink.runtime.io.disk.iomanager.IOManager]:109 - I/O manager removed spill file directory /tmp/flink-io-88a552ab-5076-4d7e-8ab1-79d1eedde6fd
21:57:03,210 INFO  [grizzled.slf4j.Logger]:128 - Task manager akka://flink/user/taskmanager_1 is completely shut down.

Process finished with exit code 0

Reply via email to