Hi,

I have this, architecture: kafka topic -> flink kafka stream -> flink custom
sink to save data in a Postgresql database.
For testing how the system will behave if an error occurs, I've done the
following test: 
Activate checkpoints on my DataStream and put on kafka topic one item with
special value on some field and throw an error when processing that item.
What have I discovered:
- When the error is thrown inside DeserializationSchema implementation
everything is fine, the job is recovered as it says in documentation.
- BUT when the error is thrown inside invoke implementation from
RichSinkFunction, the recovery is not done and also no further items are
processed even if the kafka consumer is working fine. Is this normal?
Here are some logs:
2016-11-15 17:44:20,000 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask           - Caught
exception while processing timer.
java.lang.RuntimeException: Could not forward element to next operator
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
2016-11-15 17:44:20,036 INFO 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Timer
service is shutting down.
2016-11-15 17:44:20,036 ERROR org.apache.flink.runtime.taskmanager.Task         
           
- Task execution failed. 
TimerException{java.lang.RuntimeException: Could not forward element to next
operator}
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:802)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next
operator
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-handling-tp3448p10141.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to