Hi, Here is the code which triggers the error(part of sink): @Override public void invoke(KafkaLog value) throws Exception { ...................... if (arg instanceof String && "error".equals((String)arg)) { throw new IOException("search for error"); } ........................... }
And here's the entire stack trace that I have from log file: 2016-11-15 17:44:20,000 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Caught exception while processing timer. java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56) at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508) at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: search for error at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48) at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) ... 18 more 2016-11-15 17:44:20,036 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service is shutting down. 2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task - Task execution failed. TimerException{java.lang.RuntimeException: Could not forward element to next operator} at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56) at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508) at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796) ... 7 more Caused by: java.io.IOException: search for error at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48) at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) ... 18 more 2016-11-15 17:44:20,037 INFO org.apache.flink.runtime.taskmanager.Task - TriggerWindow(TumblingProcessingTimeWindows(10000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@72b9f6df}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) -> Sink: Unnamed (1/2) switched to FAILED with exception. TimerException{java.lang.RuntimeException: Could not forward element to next operator} at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:56) at mypackage.flink.app.KafkaFlinkTimeTriggerWindowApp$1.apply(KafkaFlinkTimeTriggerWindowApp.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.apply(InternalIterableWindowFunction.java:36) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.fire(WindowOperator.java:543) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:508) at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:796) ... 7 more Caused by: java.io.IOException: search for error at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:48) at mypackage.flink.pgsql.batchv2.BatchPostgreSqlSinkV4.invoke(BatchPostgreSqlSinkV4.java:23) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) ... 18 more -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-handling-tp3448p10168.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.