Hi, I have this, architecture: kafka topic -> flink kafka stream -> flink custom sink to save data in a Postgresql database. For testing how the system will behave if an error occurs, I've done the following test: Activate checkpoints on my DataStream and put on kafka topic one item with special value on some field and throw an error when processing that item. What have I discovered: - When the error is thrown inside DeserializationSchema implementation everything is fine, the job is recovered as it says in documentation. - BUT when the error is thrown inside invoke implementation from RichSinkFunction, the recovery is not done and also no further items are processed even if the kafka consumer is working fine. Is this normal? Here are some logs: 2016-11-15 17:44:20,000 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Caught exception while processing timer. java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) 2016-11-15 17:44:20,036 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - Timer service is shutting down. 2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task - Task execution failed. TimerException{java.lang.RuntimeException: Could not forward element to next operator} at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
-- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-handling-tp3448p10141.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.