Hi,
no, this is unfortunately not fixed in the current master.

Cheers,
Aljoscha

On Tue, 28 Jul 2015 at 15:29 Ufuk Celebi <u...@apache.org> wrote:

> Hey Phillip,
>
> thanks for reporting the problem. I think your assessment is correct. If
> the program is already finished, the threads throwing the Exceptions should
> have been cleaned up as well.
>
> I am not sure, but I think parts of the system touching this mechanism
> could have been reworked by Aljoscha in the current master branch. Is it
> possible for you to try it out? If yes, it would be great to know if it is
> fixed there. As far as I know, there were no API breaking changes in the
> meantime.
>
> @Aljoscha: do you think this is fixed with your latest changes in
> 0.10-SNAPSHOT?
>
> – Ufuk
>
> On 28 Jul 2015, at 14:02, Philipp Goetze <philipp.goe...@tu-ilmenau.de>
> wrote:
>
> > Hey community,
> >
> > I am not sure whether it is a bug or I am doing something wrong. I have
> a little snippet produced by our project (see below). When I execute it in
> Eclipse everything works fine. However, when deploying the Jar to the local
> flink installation I get NullPointer Exceptions after the program had
> already finished. I found out that it happens exactly after the time of the
> window trigger elapsed (10 seconds in this example). So it seems that there
> is still a thread running, although the program has already finished. I
> guess the thread does not get anymore input since the file was completely
> read already and thus produces NullPointer Exceptions when trying to write
> these null elements. But I think you know more about this.
> >
> > FYI: I am using Flink-0.9.0-rc4 built with Scala 2.11
> >
> > So here the code:
> >
> > import org.apache.flink.streaming.api.scala._
> > import dbis.flink._
> > import java.util.concurrent.TimeUnit
> > import org.apache.flink.streaming.api.windowing.helper._
> > import org.apache.flink.util.Collector
> >
> >
> > object windowCount {
> >
> >   def customgrpdMap(ts: Iterable[List[Any]], out: Collector[List[Any]])
> = {
> >     out.collect(ts.groupBy(t => t(0)).flatMap(x =>
> List(x._1,x._2)).toList)
> >   }
> >
> >   def customcntdMap(ts: Iterable[List[Any]], out: Collector[List[Any]])
> = {
> >     ts.foreach { t =>
> out.collect(List(t(0),PigFuncs.count(t(1).asInstanceOf[Seq[Any]])))}
> >   }
> >
> >   def tuplecntdToString(t: List[Any]): String = {
> >     implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
> >
> >     val sb = new StringBuilder
> >     sb.append(t(0))
> >     .append(",")
> >     .append(t(1))
> >     sb.toString
> >   }
> >
> >   def main(args: Array[String]) {
> >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> >     val input = PigStorage().load(env, "src/it/resources/mary.txt")
> >     val words = input.flatMap(t =>
> PigFuncs.tokenize(t(0).toString)).map(t => List(t))
> >
> >     val win = words.window(Time.of(10,
> TimeUnit.SECONDS)).every(Time.of(10, TimeUnit.SECONDS))
> >     val grpd = win.groupBy(t => t(0)).mapWindow(customgrpdMap _)
> >     val cntd = grpd.mapWindow(customcntdMap _).flatten()
> >
> >     cntd.map(t => tuplecntdToString(t)).writeAsText("marycounts.out")
> >     env.execute("Starting Query")
> >   }
> > }
> >
> > And here the log output:
> >
> > Exception in thread "Thread-32" java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.NullPointerException
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
> >     at
> org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
> >     at
> org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
> >     at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.NullPointerException
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     ... 7 more
> > Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.NullPointerException
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     ... 14 more
> > Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.NullPointerException
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     ... 18 more
> > Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.NullPointerException
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     ... 21 more
> > Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.NullPointerException
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     ... 25 more
> > Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.RuntimeException: java.lang.NullPointerException
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     ... 28 more
> > Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
> java.lang.NullPointerException
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
> >     ... 32 more
> > Caused by: java.lang.RuntimeException: java.lang.NullPointerException
> >     at
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108)
> >     at
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
> >     at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
> >     ... 35 more
> > Caused by: java.lang.NullPointerException
> >     at org.apache.flink.api.java.io
> .TextOutputFormat.writeRecord(TextOutputFormat.java:93)
> >     at
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
> >     ... 38 more
> >
> > And a part of the .out - File:
> >
> > 13:28:48,696 INFO  org.apache.flink.runtime.jobmanager.JobManager
>         - Received job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query).
> > 13:28:48,698 INFO  org.apache.flink.runtime.jobmanager.JobManager
>         - Scheduling job Starting Query.
> > 13:28:48,698 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text
> File Source -> Map -> Flat Map -> Map -> StreamDiscretizer ->
> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window
> partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten
> -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched
> from CREATED to SCHEDULED
> > 13:28:48,698 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text
> File Source -> Map -> Flat Map -> Map -> StreamDiscretizer ->
> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window
> partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten
> -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched
> from SCHEDULED to DEPLOYING
> > 13:28:48,698 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying
> Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer ->
> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window
> partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten
> -> Map -> Stream Sink) (1/1) (attempt #0) to localhost
> > 13:28:48,699 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>         - Received task Read Text File Source -> Map -> Flat Map -> Map ->
> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer,
> BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger ->
> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
> > 13:28:48,699 INFO  org.apache.flink.runtime.jobmanager.JobManager
>         - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query)
> changed to RUNNING.
> > 13:28:48,705 INFO  org.apache.flink.runtime.taskmanager.Task
>          - Loading JAR files for task Read Text File Source -> Map -> Flat
> Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer,
> BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger ->
> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
> > 13:28:48,707 INFO  org.apache.flink.runtime.blob.BlobCache
>          - Downloading f2612fe1d4aadc5206820be652dfa1019a66007c from
> localhost/127.0.0.1:47210
> > 13:28:48,709 INFO  org.apache.flink.runtime.taskmanager.Task
>          - Registering task at network: Read Text File Source -> Map ->
> Flat Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer,
> BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window Map ->
> Window Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
> [DEPLOYING]
> > 13:28:48,709 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask
>          - State backend for state checkpoints is set to jobmanager.
> > 13:28:48,759 INFO  org.apache.flink.runtime.taskmanager.Task
>          - Read Text File Source -> Map -> Flat Map -> Map ->
> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer,
> BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger ->
> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) switched to
> RUNNING
> > 13:28:48,759 INFO
> org.apache.flink.api.common.io.LocatableInputSplitAssigner    - Assigning
> remote split to host localhost
> > 13:28:48,759 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text
> File Source -> Map -> Flat Map -> Map -> StreamDiscretizer ->
> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window
> partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten
> -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched
> from DEPLOYING to RUNNING
> > 13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.Task
>          - Read Text File Source -> Map -> Flat Map -> Map ->
> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer,
> BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger ->
> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) switched to
> FINISHED
> > 13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.Task
>          - Freeing task resources for Read Text File Source -> Map -> Flat
> Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer,
> BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger ->
> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
> > 13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>         - Unregistering task and sending final execution state FINISHED to
> JobManager for task Read Text File Source -> Map -> Flat Map -> Map ->
> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer,
> BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger ->
> Window Map -> Window Flatten -> Map -> Stream Sink)
> (ff2bb914c620859de94262af78ac9269)
> > 13:28:48,787 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Read Text
> File Source -> Map -> Flat Map -> Map -> StreamDiscretizer ->
> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window
> partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten
> -> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched
> from RUNNING to FINISHED
> > 13:28:48,787 INFO  org.apache.flink.runtime.jobmanager.JobManager
>         - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query)
> changed to FINISHED.
> > 13:28:58,793 ERROR
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction  - Error
> while writing element.
> > java.lang.NullPointerException
> >     at org.apache.flink.api.java.io
> .TextOutputFormat.writeRecord(TextOutputFormat.java:93)
> >     at
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
> >     at
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
> >     at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
> >     at
> org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
> >     at
> org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
> >     at java.lang.Thread.run(Thread.java:745)
> > 13:28:58,794 ERROR
> org.apache.flink.streaming.runtime.tasks.OutputHandler        - Could not
> forward element to operator.
> > java.lang.RuntimeException: java.lang.NullPointerException
> >     at
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108)
> >     at
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
> >     at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
> >     at
> org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
> >     at
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
> >     at
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
> >     at
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
> >     at
> org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
> >     at
> org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
> >     at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.NullPointerException
> >     at org.apache.flink.api.java.io
> .TextOutputFormat.writeRecord(TextOutputFormat.java:93)
> >     at
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
> >     ... 38 more
> > [...]
> >
> >
> >
> > Best Regards,
> > Philipp
> >
>
>

Reply via email to