Hi Nick, regarding the Kafka example: What happens is that the FlinkKafkaConsumer will throw an exception. The JobManager then cancels the entire job and restarts it. It will then try to continue reading from the last valid checkpoint or the consumer offset in zookeeper. Since the data in the topic is still corrupt, the job will fail again ...
On Mon, Nov 16, 2015 at 8:28 PM, Nick Dimiduk <ndimi...@gmail.com> wrote: > The errors outside your UDF (such as network problems) will be handled by >> Flink and cause the job to go into recovery. They should be transparently >> handled. > > > Is that so? I've been able to feed bad data onto my kafka topic and cause > the stream job to abort. You're saying this should not be the case? I have > not started exploring high availability or checkpointing, so let me spend > some time on those features before filing tickets. > > Sometimes you might want the system to continue even if stuff outside the >> UDF fails. For example, if a serializer does not work because of a null >> value somewhere. You would, however, like to get a message about this >> somewhere, I assume. > > > Exactly right. > > Makes sense. The class of operations that work "per-tuple" before the data >> is forwarded to the network stack could be extended to have error traps. >> >> @Nick: Is that what you had in mind? >> > > Yes, that sounds like what I'd be looking for. > > On Mon, Nov 16, 2015 at 7:27 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> I don’t think that alleviates the problem. Sometimes you might want the >>> system to continue even if stuff outside the UDF fails. For example, if a >>> serializer does not work because of a null value somewhere. You would, >>> however, like to get a message about this somewhere, I assume. >>> >>> Cheers, >>> Aljoscha >>> > On 16 Nov 2015, at 19:22, Stephan Ewen <se...@apache.org> wrote: >>> > >>> > Hi Nick! >>> > >>> > The errors outside your UDF (such as network problems) will be handled >>> by Flink and cause the job to go into recovery. They should be >>> transparently handled. >>> > >>> > Just make sure you activate checkpointing for your job! >>> > >>> > Stephan >>> > >>> > >>> > On Mon, Nov 16, 2015 at 6:18 PM, Nick Dimiduk <ndimi...@gmail.com> >>> wrote: >>> > I have been thinking about this, maybe we can add a special output >>> stream (for example Kafka, but can be generic) that would get >>> errors/exceptions that where throws during processing. The actual >>> processing would not stop and the messages in this special stream would >>> contain some information about the current state of processing, the input >>> element(s) and the machine/VM where computation is happening. >>> > >>> > Yes, this is precisely what I have in mind. The goal is (1) to not >>> lose input data, and (2) to make errors available for operator visibility. >>> > >>> > It's not very portable, but I was able to implement my Maybe<IN, OUT, >>> Throwable> type. I can now use it as the output of all my source streams, >>> and split those streams on the presence of the Throwable. With this, I'm >>> able to trap certain forms of invalid input and send it to an errors sink. >>> However, there are still some error cases that cause exceptions, >>> apparently, outside of my UDF try block that cause the whole streaming job >>> to terminate. >>> > >>> > > On 11 Nov 2015, at 21:49, Nick Dimiduk <ndimi...@gmail.com> wrote: >>> > > >>> > > Heya, >>> > > >>> > > I don't see a section in the online manual dedicated to this topic, >>> so I want to raise the question here: How should errors be handled? >>> Specifically I'm thinking about streaming jobs, which are expected to >>> "never go down". For example, errors can be raised at the point where >>> objects are serialized to/from sources/sinks, and UDFs. Cascading provides >>> failure traps [0] where erroneous tuples are saved off for post-processing. >>> Is there any such functionality in Flink? >>> > > >>> > > I started down the road of implementing a Maybe/Optional type, a >>> POJO Generic triple of <IN, OUT, Throwable> for capturing errors at each >>> stage of a pipeline. However, Java type erasure means even though it >>> compiles, the job is rejected at submission time. >>> > > >>> > > How are other people handling errors in their stream processing? >>> > > >>> > > Thanks, >>> > > Nick >>> > > >>> > > [0]: >>> http://docs.cascading.org/cascading/1.2/userguide/html/ch06s03.html >>> > >>> > >>> > >>> >>> >> >