Hi, no, this is unfortunately not fixed in the current master. Cheers, Aljoscha
On Tue, 28 Jul 2015 at 15:29 Ufuk Celebi <u...@apache.org> wrote: > Hey Phillip, > > thanks for reporting the problem. I think your assessment is correct. If > the program is already finished, the threads throwing the Exceptions should > have been cleaned up as well. > > I am not sure, but I think parts of the system touching this mechanism > could have been reworked by Aljoscha in the current master branch. Is it > possible for you to try it out? If yes, it would be great to know if it is > fixed there. As far as I know, there were no API breaking changes in the > meantime. > > @Aljoscha: do you think this is fixed with your latest changes in > 0.10-SNAPSHOT? > > – Ufuk > > On 28 Jul 2015, at 14:02, Philipp Goetze <philipp.goe...@tu-ilmenau.de> > wrote: > > > Hey community, > > > > I am not sure whether it is a bug or I am doing something wrong. I have > a little snippet produced by our project (see below). When I execute it in > Eclipse everything works fine. However, when deploying the Jar to the local > flink installation I get NullPointer Exceptions after the program had > already finished. I found out that it happens exactly after the time of the > window trigger elapsed (10 seconds in this example). So it seems that there > is still a thread running, although the program has already finished. I > guess the thread does not get anymore input since the file was completely > read already and thus produces NullPointer Exceptions when trying to write > these null elements. But I think you know more about this. > > > > FYI: I am using Flink-0.9.0-rc4 built with Scala 2.11 > > > > So here the code: > > > > import org.apache.flink.streaming.api.scala._ > > import dbis.flink._ > > import java.util.concurrent.TimeUnit > > import org.apache.flink.streaming.api.windowing.helper._ > > import org.apache.flink.util.Collector > > > > > > object windowCount { > > > > def customgrpdMap(ts: Iterable[List[Any]], out: Collector[List[Any]]) > = { > > out.collect(ts.groupBy(t => t(0)).flatMap(x => > List(x._1,x._2)).toList) > > } > > > > def customcntdMap(ts: Iterable[List[Any]], out: Collector[List[Any]]) > = { > > ts.foreach { t => > out.collect(List(t(0),PigFuncs.count(t(1).asInstanceOf[Seq[Any]])))} > > } > > > > def tuplecntdToString(t: List[Any]): String = { > > implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]] > > > > val sb = new StringBuilder > > sb.append(t(0)) > > .append(",") > > .append(t(1)) > > sb.toString > > } > > > > def main(args: Array[String]) { > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > val input = PigStorage().load(env, "src/it/resources/mary.txt") > > val words = input.flatMap(t => > PigFuncs.tokenize(t(0).toString)).map(t => List(t)) > > > > val win = words.window(Time.of(10, > TimeUnit.SECONDS)).every(Time.of(10, TimeUnit.SECONDS)) > > val grpd = win.groupBy(t => t(0)).mapWindow(customgrpdMap _) > > val cntd = grpd.mapWindow(customcntdMap _).flatten() > > > > cntd.map(t => tuplecntdToString(t)).writeAsText("marycounts.out") > > env.execute("Starting Query") > > } > > } > > > > And here the log output: > > > > Exception in thread "Thread-32" java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.NullPointerException > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133) > > at > org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121) > > at > org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194) > > at > org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117) > > at > org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.NullPointerException > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43) > > at > org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55) > > at > org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60) > > at > org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45) > > at > org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > ... 7 more > > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.NullPointerException > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65) > > at > org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > ... 14 more > > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.NullPointerException > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > ... 18 more > > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.NullPointerException > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58) > > at > org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > ... 21 more > > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.NullPointerException > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > ... 25 more > > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.RuntimeException: java.lang.NullPointerException > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41) > > at > org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > ... 28 more > > Caused by: java.lang.RuntimeException: java.lang.RuntimeException: > java.lang.NullPointerException > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272) > > ... 32 more > > Caused by: java.lang.RuntimeException: java.lang.NullPointerException > > at > org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108) > > at > org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65) > > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272) > > ... 35 more > > Caused by: java.lang.NullPointerException > > at org.apache.flink.api.java.io > .TextOutputFormat.writeRecord(TextOutputFormat.java:93) > > at > org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94) > > ... 38 more > > > > And a part of the .out - File: > > > > 13:28:48,696 INFO org.apache.flink.runtime.jobmanager.JobManager > - Received job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query). > > 13:28:48,698 INFO org.apache.flink.runtime.jobmanager.JobManager > - Scheduling job Starting Query. > > 13:28:48,698 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Read Text > File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> > (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window > partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten > -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched > from CREATED to SCHEDULED > > 13:28:48,698 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Read Text > File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> > (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window > partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten > -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched > from SCHEDULED to DEPLOYING > > 13:28:48,698 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying > Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> > (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window > partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten > -> Map -> Stream Sink) (1/1) (attempt #0) to localhost > > 13:28:48,699 INFO org.apache.flink.runtime.taskmanager.TaskManager > - Received task Read Text File Source -> Map -> Flat Map -> Map -> > StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, > BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> > Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) > > 13:28:48,699 INFO org.apache.flink.runtime.jobmanager.JobManager > - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query) > changed to RUNNING. > > 13:28:48,705 INFO org.apache.flink.runtime.taskmanager.Task > - Loading JAR files for task Read Text File Source -> Map -> Flat > Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, > BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> > Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) > > 13:28:48,707 INFO org.apache.flink.runtime.blob.BlobCache > - Downloading f2612fe1d4aadc5206820be652dfa1019a66007c from > localhost/127.0.0.1:47210 > > 13:28:48,709 INFO org.apache.flink.runtime.taskmanager.Task > - Registering task at network: Read Text File Source -> Map -> > Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, > BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map -> > Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) > [DEPLOYING] > > 13:28:48,709 INFO org.apache.flink.streaming.runtime.tasks.StreamTask > - State backend for state checkpoints is set to jobmanager. > > 13:28:48,759 INFO org.apache.flink.runtime.taskmanager.Task > - Read Text File Source -> Map -> Flat Map -> Map -> > StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, > BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> > Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) switched to > RUNNING > > 13:28:48,759 INFO > org.apache.flink.api.common.io.LocatableInputSplitAssigner - Assigning > remote split to host localhost > > 13:28:48,759 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Read Text > File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> > (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window > partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten > -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched > from DEPLOYING to RUNNING > > 13:28:48,786 INFO org.apache.flink.runtime.taskmanager.Task > - Read Text File Source -> Map -> Flat Map -> Map -> > StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, > BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> > Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) switched to > FINISHED > > 13:28:48,786 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Read Text File Source -> Map -> Flat > Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, > BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> > Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) > > 13:28:48,786 INFO org.apache.flink.runtime.taskmanager.TaskManager > - Unregistering task and sending final execution state FINISHED to > JobManager for task Read Text File Source -> Map -> Flat Map -> Map -> > StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, > BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> > Window Map -> Window Flatten -> Map -> Stream Sink) > (ff2bb914c620859de94262af78ac9269) > > 13:28:48,787 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Read Text > File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> > (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window > partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten > -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched > from RUNNING to FINISHED > > 13:28:48,787 INFO org.apache.flink.runtime.jobmanager.JobManager > - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query) > changed to FINISHED. > > 13:28:58,793 ERROR > org.apache.flink.streaming.api.functions.sink.FileSinkFunction - Error > while writing element. > > java.lang.NullPointerException > > at org.apache.flink.api.java.io > .TextOutputFormat.writeRecord(TextOutputFormat.java:93) > > at > org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94) > > at > org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65) > > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41) > > at > org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58) > > at > org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65) > > at > org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43) > > at > org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55) > > at > org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60) > > at > org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45) > > at > org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133) > > at > org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121) > > at > org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194) > > at > org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117) > > at > org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144) > > at java.lang.Thread.run(Thread.java:745) > > 13:28:58,794 ERROR > org.apache.flink.streaming.runtime.tasks.OutputHandler - Could not > forward element to operator. > > java.lang.RuntimeException: java.lang.NullPointerException > > at > org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108) > > at > org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65) > > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41) > > at > org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58) > > at > org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65) > > at > org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43) > > at > org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55) > > at > org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60) > > at > org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45) > > at > org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29) > > at > org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239) > > at > org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40) > > at > org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133) > > at > org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121) > > at > org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194) > > at > org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117) > > at > org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.NullPointerException > > at org.apache.flink.api.java.io > .TextOutputFormat.writeRecord(TextOutputFormat.java:93) > > at > org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94) > > ... 38 more > > [...] > > > > > > > > Best Regards, > > Philipp > > > >