Prithvi Raj created FLINK-10155: ----------------------------------- Summary: JobExecutionException when using evictor followed with map Key: FLINK-10155 URL: https://issues.apache.org/jira/browse/FLINK-10155 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.6.0 Reporter: Prithvi Raj
The DataStream API encounters `JobExecutionException` when using a `map` after an event time `timeWindow` with an `evictor`. The exception is thrown at `out.collect` on the `WindowFunction` and is not thrown when an `evictor` isn't used, or when not using event time semantics. The exception is: {code:java} org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511) at com.uber.jaeger.dependencies.DependenciesProcessorTest.testMapAfterWindowing(DependenciesProcessorTest.java:101) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.uber.jaeger.dependencies.DependenciesProcessorTest$2.apply(DependenciesProcessorTest.java:89) at com.uber.jaeger.dependencies.DependenciesProcessorTest$2.apply(DependenciesProcessorTest.java:86) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:359) at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.onEventTime(EvictingWindowOperator.java:271) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:746) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) ... 7 more Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) ... 22 more Caused by: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: strategies (org.apache.flink.api.common.state.StateTtlConfig$CleanupStrategies) cleanupStrategies (org.apache.flink.api.common.state.StateTtlConfig) ttlConfig (org.apache.flink.api.common.state.ListStateDescriptor) evictingWindowStateDescriptor (org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator) this$0 (org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator$2) val$function (org.apache.flink.shaded.guava18.com.google.common.collect.Iterables$8) val$fromIterable (org.apache.flink.shaded.guava18.com.google.common.collect.Iterables$8) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:224) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) ... 28 more Caused by: java.lang.NullPointerException at java.util.EnumMap$EnumMapIterator.hasNext(EnumMap.java:527) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:80) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ... 50 more{code} And the code is: {code:java} @Test public void testMapAfterWindowing() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStreamSource<String> stringStream = env.fromElements("this", "is", "some", "data", "tomfoolery"); SingleOutputStreamOperator<String> source = stringStream.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); source.keyBy(new KeySelector<String, String>() { @Override public String getKey(String value) throws Exception { return value.substring(0, 1); } }) .timeWindow(Time.milliseconds(100)) .evictor(CountEvictor.of(1)) .apply(new WindowFunction<String, Iterable<String>, String, TimeWindow>() { @Override public void apply(String s, TimeWindow window, Iterable<String> input, Collector<Iterable<String>> out) throws Exception { out.collect(input); } }) .map(new RichMapFunction<Iterable<String>, Iterable<String>>() { // Identity map function @Override public Iterable<String> map(Iterable<String> value) throws Exception { return value; } }) .printToErr(); env.execute(); }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)