Should go to dev list, too.
-------- Forwarded Message -------- Subject: Re: [DISCUSS]: KIP-161: streams record processing exception handlers Date: Mon, 5 Jun 2017 19:19:42 +0200 From: Jan Filipiak <jan.filip...@trivago.com> Reply-To: users@kafka.apache.org To: users@kafka.apache.org Hi just my few thoughts On 05.06.2017 11:44, Eno Thereska wrote: > Hi there, > > Sorry for the late reply, I was out this past week. Looks like good progress > was made with the discussions either way. Let me recap a couple of points I > saw into one big reply: > > 1. Jan mentioned CRC errors. I think this is a good point. As these happen in > Kafka, before Kafka Streams gets a chance to inspect anything, I'd like to > hear the opinion of more Kafka folks like Ismael or Jason on this one. > Currently the documentation is not great with what to do once a CRC check has > failed. From looking at the code, it looks like the client gets a > KafkaException (bubbled up from the fetcher) and currently we in streams > catch this as part of poll() and fail. It might be advantageous to treat CRC > handling in a similar way to serialisation handling (e.g., have the option to > fail/skip). Let's see what the other folks say. Worst-case we can do a > separate KIP for that if it proved too hard to do in one go. there is no reasonable way to "skip" a crc error. How can you know the length you read was anything reasonable? you might be completely lost inside your response. > 2. Damian has convinced me that the KIP should just be for deserialisation > from the network, not from local state store DBs. For the latter we'll follow > the current way of failing since the DB is likely corrupt. > > 3. Dead letter queue option. There was never any intention here to do > anything super clever like attempt to re-inject the failed records from the > dead letter queue back into the system. Reasoning about when that'd be useful > in light of all sorts of semantic breakings would be hard (arguably > impossible). The idea was to just have a place to have all these dead records > to help with subsequent debugging. We could also just log a whole bunch of > info for a poison pill record and not have a dead letter queue at all. > Perhaps that's a better, simpler, starting point. +1 > > 4. Agree with Jay on style, a DefaultHandler with some config options. Will > add options to KIP. Also as part of this let's remove the threshold logger > since it gets complex and arguably the ROI is low. > > 5. Jay's JSON example, where serialisation passes but the JSON message > doesn't have the expected fields, is an interesting one. It's a bit > complicated to handle this in the middle of processing. For example, some > operators in the DAG might actually find the needed JSON fields and make > progress, but other operators, for the same record, might not find their > fields and will throw an exception. > > At a minimum, handling this type of exception will need to involve the > exactly-once (EoS) logic. We'd still allow the option of failing or skipping, > but EoS would need to clean up by rolling back all the side effects from the > processing so far. Matthias, how does this sound? Eos will not help the record might be 5,6 repartitions down into the topology. I haven't followed but I pray you made EoS optional! We don't need this and we don't want this and we will turn it off if it comes. So I wouldn't recommend relying on it. The option to turn it off is better than forcing it and still beeing unable to rollback badpills (as explained before) > > 6. Will add an end-to-end example as Michael suggested. > > Thanks > Eno > > > >> On 4 Jun 2017, at 02:35, Matthias J. Sax <matth...@confluent.io> wrote: >> >> What I don't understand is this: >> >>> From there on its the easiest way forward: fix, redeploy, start => done >> If you have many producers that work fine and a new "bad" producer >> starts up and writes bad data into your input topic, your Streams app >> dies but all your producers, including the bad one, keep writing. >> >> Thus, how would you fix this, as you cannot "remove" the corrupted date >> from the topic? It might take some time to identify the root cause and >> stop the bad producer. Up to this point you get good and bad data into >> your Streams input topic. If Streams app in not able to skip over those >> bad records, how would you get all the good data from the topic? Not >> saying it's not possible, but it's extra work copying the data with a >> new non-Streams consumer-producer-app into a new topic and than feed >> your Streams app from this new topic -- you also need to update all your >> upstream producers to write to the new topic. >> >> Thus, if you want to fail fast, you can still do this. And after you >> detected and fixed the bad producer you might just reconfigure your app >> to skip bad records until it reaches the good part of the data. >> Afterwards, you could redeploy with fail-fast again. >> >> >> Thus, for this pattern, I actually don't see any reason why to stop the >> Streams app at all. If you have a callback, and use the callback to >> raise an alert (and maybe get the bad data into a bad record queue), it >> will not take longer to identify and stop the "bad" producer. But for >> this case, you have zero downtime for your Streams app. >> >> This seems to be much simpler. Or do I miss anything? >> >> >> Having said this, I agree that the "threshold based callback" might be >> questionable. But as you argue for strict "fail-fast", I want to argue >> that this must not always be the best pattern to apply and that the >> overall KIP idea is super useful from my point of view. >> >> >> -Matthias >> >> >> On 6/3/17 11:57 AM, Jan Filipiak wrote: >>> Could not agree more! >>> >>> But then I think the easiest is still: print exception and die. >>> From there on its the easiest way forward: fix, redeploy, start => done >>> >>> All the other ways to recover a pipeline that was processing partially >>> all the time >>> and suddenly went over a "I cant take it anymore" threshold is not >>> straight forward IMO. >>> >>> How to find the offset, when it became to bad when it is not the latest >>> commited one? >>> How to reset there? with some reasonable stuff in your rockses? >>> >>> If one would do the following. The continuing Handler would measure for >>> a threshold and >>> would terminate after a certain threshold has passed (per task). Then >>> one can use offset commit/ flush intervals >>> to make reasonable assumption of how much is slipping by + you get an >>> easy recovery when it gets to bad >>> + you could also account for "in processing" records. >>> >>> Setting this threshold to zero would cover all cases with 1 >>> implementation. It is still beneficial to have it pluggable >>> >>> Again CRC-Errors are the only bad pills we saw in production for now. >>> >>> Best Jan >>> >>> >>> On 02.06.2017 17:37, Jay Kreps wrote: >>>> Jan, I agree with you philosophically. I think one practical challenge >>>> has >>>> to do with data formats. Many people use untyped events, so there is >>>> simply >>>> no guarantee on the form of the input. E.g. many companies use JSON >>>> without >>>> any kind of schema so it becomes very hard to assert anything about the >>>> input which makes these programs very fragile to the "one accidental >>>> message publication that creates an unsolvable problem. >>>> >>>> For that reason I do wonder if limiting to just serialization actually >>>> gets >>>> you a useful solution. For JSON it will help with the problem of >>>> non-parseable JSON, but sounds like it won't help in the case where the >>>> JSON is well-formed but does not have any of the fields you expect and >>>> depend on for your processing. I expect the reason for limiting the scope >>>> is it is pretty hard to reason about correctness for anything that >>>> stops in >>>> the middle of processing an operator DAG? >>>> >>>> -Jay >>>> >>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak <jan.filip...@trivago.com> >>>> wrote: >>>> >>>>> IMHO your doing it wrong then. + building to much support into the kafka >>>>> eco system is very counterproductive in fostering a happy userbase >>>>> >>>>> >>>>> >>>>> On 02.06.2017 13:15, Damian Guy wrote: >>>>> >>>>>> Jan, you have a choice to Fail fast if you want. This is about giving >>>>>> people options and there are times when you don't want to fail fast. >>>>>> >>>>>> >>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <jan.filip...@trivago.com> >>>>>> wrote: >>>>>> >>>>>> Hi >>>>>>> 1. >>>>>>> That greatly complicates monitoring. Fail Fast gives you that when >>>>>>> you >>>>>>> monitor only the lag of all your apps >>>>>>> you are completely covered. With that sort of new application >>>>>>> Monitoring >>>>>>> is very much more complicated as >>>>>>> you know need to monitor fail % of some special apps aswell. In my >>>>>>> opinion that is a huge downside already. >>>>>>> >>>>>>> 2. >>>>>>> using a schema regerstry like Avrostuff it might not even be the >>>>>>> record >>>>>>> that is broken, it might be just your app >>>>>>> unable to fetch a schema it needs now know. Maybe you got partitioned >>>>>>> away from that registry. >>>>>>> >>>>>>> 3. When you get alerted because of to high fail percentage. what >>>>>>> are the >>>>>>> steps you gonna do? >>>>>>> shut it down to buy time. fix the problem. spend way to much time to >>>>>>> find a good reprocess offset. >>>>>>> Your timewindows are in bad shape anyways, and you pretty much lost. >>>>>>> This routine is nonsense. >>>>>>> >>>>>>> Dead letter queues would be the worst possible addition to the kafka >>>>>>> toolkit that I can think of. It just doesn't fit the architecture >>>>>>> of having clients falling behind is a valid option. >>>>>>> >>>>>>> Further. I mentioned already the only bad pill ive seen so far is crc >>>>>>> errors. any plans for those? >>>>>>> >>>>>>> Best Jan >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 02.06.2017 11:34, Damian Guy wrote: >>>>>>> >>>>>>>> I agree with what Matthias has said w.r.t failing fast. There are >>>>>>>> plenty >>>>>>>> >>>>>>> of >>>>>>> >>>>>>>> times when you don't want to fail-fast and must attempt to make >>>>>>>> >>>>>>> progress. >>>>>>> >>>>>>>> The dead-letter queue is exactly for these circumstances. Of >>>>>>>> course if >>>>>>>> every record is failing, then you probably do want to give up. >>>>>>>> >>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <matth...@confluent.io> >>>>>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> First a meta comment. KIP discussion should take place on the dev >>>>>>>> list >>>>>>>>> -- if user list is cc'ed please make sure to reply to both lists. >>>>>>>>> >>>>>>>> Thanks. >>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot of sense to >>>>>>>>> focus on deserialization exceptions for now. >>>>>>>>> >>>>>>>>> With regard to corrupted state stores, would it make sense to fail a >>>>>>>>> task and wipe out the store to repair it via recreation from the >>>>>>>>> changelog? That's of course a quite advance pattern, but I want to >>>>>>>>> bring >>>>>>>>> it up to design the first step in a way such that we can get >>>>>>>>> there (if >>>>>>>>> we think it's a reasonable idea). >>>>>>>>> >>>>>>>>> I also want to comment about fail fast vs making progress. I >>>>>>>>> think that >>>>>>>>> fail-fast must not always be the best option. The scenario I have in >>>>>>>>> mind is like this: you got a bunch of producers that feed the >>>>>>>>> Streams >>>>>>>>> input topic. Most producers work find, but maybe one producer miss >>>>>>>>> behaves and the data it writes is corrupted. You might not even >>>>>>>>> be able >>>>>>>>> to recover this lost data at any point -- thus, there is no >>>>>>>>> reason to >>>>>>>>> stop processing but you just skip over those records. Of course, you >>>>>>>>> need to fix the root cause, and thus you need to alert (either >>>>>>>>> via logs >>>>>>>>> of the exception handler directly) and you need to start to >>>>>>>>> investigate >>>>>>>>> to find the bad producer, shut it down and fix it. >>>>>>>>> >>>>>>>>> Here the dead letter queue comes into place. From my >>>>>>>>> understanding, the >>>>>>>>> purpose of this feature is solely enable post debugging. I don't >>>>>>>>> think >>>>>>>>> those record would be fed back at any point in time (so I don't >>>>>>>>> see any >>>>>>>>> ordering issue -- a skipped record, with this regard, is just "fully >>>>>>>>> processed"). Thus, the dead letter queue should actually encode the >>>>>>>>> original records metadata (topic, partition offset etc) to enable >>>>>>>>> such >>>>>>>>> debugging. I guess, this might also be possible if you just log >>>>>>>>> the bad >>>>>>>>> records, but it would be harder to access (you first must find the >>>>>>>>> Streams instance that did write the log and extract the information >>>>>>>>> from >>>>>>>>> there). Reading it from topic is much simpler. >>>>>>>>> >>>>>>>>> I also want to mention the following. Assume you have such a >>>>>>>>> topic with >>>>>>>>> some bad records and some good records. If we always fail-fast, it's >>>>>>>>> going to be super hard to process the good data. You would need to >>>>>>>>> write >>>>>>>>> an extra app that copied the data into a new topic filtering out the >>>>>>>>> bad >>>>>>>>> records (or apply the map() workaround withing stream). So I don't >>>>>>>>> think >>>>>>>>> that failing fast is most likely the best option in production is >>>>>>>>> necessarily, true. >>>>>>>>> >>>>>>>>> Or do you think there are scenarios, for which you can recover the >>>>>>>>> corrupted records successfully? And even if this is possible, it >>>>>>>>> might >>>>>>>>> be a case for reprocessing instead of failing the whole application? >>>>>>>>> Also, if you think you can "repair" a corrupted record, should the >>>>>>>>> handler allow to return a "fixed" record? This would solve the >>>>>>>>> ordering >>>>>>>>> problem. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote: >>>>>>>>> >>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated! >>>>>>>>>> >>>>>>>>>> - I think it would help to improve the KIP by adding an end-to-end >>>>>>>>>> code >>>>>>>>>> example that demonstrates, with the DSL and with the Processor API, >>>>>>>>>> how >>>>>>>>>> >>>>>>>>> the >>>>>>>>> >>>>>>>>>> user would write a simple application that would then be augmented >>>>>>>>>> with >>>>>>>>>> >>>>>>>>> the >>>>>>>>> >>>>>>>>>> proposed KIP changes to handle exceptions. It should also >>>>>>>>>> become much >>>>>>>>>> clearer then that e.g. the KIP would lead to different code >>>>>>>>>> paths for >>>>>>>>>> >>>>>>>>> the >>>>>>>> happy case and any failure scenarios. >>>>>>>>>> - Do we have sufficient information available to make informed >>>>>>>>>> >>>>>>>>> decisions >>>>>>>> on >>>>>>>>>> what to do next? For example, do we know in which part of the >>>>>>>>>> topology >>>>>>>>>> >>>>>>>>> the >>>>>>>>> >>>>>>>>>> record failed? `ConsumerRecord` gives us access to topic, >>>>>>>>>> partition, >>>>>>>>>> offset, timestamp, etc., but what about topology-related >>>>>>>>>> information >>>>>>>>>> >>>>>>>>> (e.g. >>>>>>>>> >>>>>>>>>> what is the associated state store, if any)? >>>>>>>>>> >>>>>>>>>> - Only partly on-topic for the scope of this KIP, but this is about >>>>>>>>>> the >>>>>>>>>> bigger picture: This KIP would give users the option to send >>>>>>>>>> corrupted >>>>>>>>>> records to dead letter queue (quarantine topic). But, what pattern >>>>>>>>>> >>>>>>>>> would >>>>>>>> we advocate to process such a dead letter queue then, e.g. how to >>>>>>>> allow >>>>>>>>> for >>>>>>>>> >>>>>>>>>> retries with backoff ("If the first record in the dead letter queue >>>>>>>>>> >>>>>>>>> fails >>>>>>>> again, then try the second record for the time being and go back >>>>>>>> to the >>>>>>>>>> first record at a later time"). Jay and Jan already alluded to >>>>>>>>>> >>>>>>>>> ordering >>>>>>>> problems that will be caused by dead letter queues. As I said, >>>>>>>> retries >>>>>>>>>> might be out of scope but perhaps the implications should be >>>>>>>>>> considered >>>>>>>>>> >>>>>>>>> if >>>>>>>>> >>>>>>>>>> possible? >>>>>>>>>> >>>>>>>>>> Also, I wrote the text below before reaching the point in the >>>>>>>>>> >>>>>>>>> conversation >>>>>>>>> >>>>>>>>>> that this KIP's scope will be limited to exceptions in the >>>>>>>>>> category of >>>>>>>>>> poison pills / deserialization errors. But since Jay brought up >>>>>>>>>> user >>>>>>>>>> >>>>>>>>> code >>>>>>>>> >>>>>>>>>> errors again, I decided to include it again. >>>>>>>>>> >>>>>>>>>> ----------------------------snip---------------------------- >>>>>>>>>> A meta comment: I am not sure about this split between the code for >>>>>>>>>> the >>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure path >>>>>>>>>> >>>>>>>>> (using >>>>>>>> exception handlers). In Scala, for example, we can do: >>>>>>>>>> scala> val computation = scala.util.Try(1 / 0) >>>>>>>>>> computation: scala.util.Try[Int] = >>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero) >>>>>>>>>> >>>>>>>>>> scala> computation.getOrElse(42) >>>>>>>>>> res2: Int = 42 >>>>>>>>>> >>>>>>>>>> Another example with Scala's pattern matching, which is similar to >>>>>>>>>> `KStream#branch()`: >>>>>>>>>> >>>>>>>>>> computation match { >>>>>>>>>> case scala.util.Success(x) => x * 5 >>>>>>>>>> case scala.util.Failure(_) => 42 >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> (The above isn't the most idiomatic way to handle this in Scala, >>>>>>>>>> but >>>>>>>>>> >>>>>>>>> that's >>>>>>>>> >>>>>>>>>> not the point I'm trying to make here.) >>>>>>>>>> >>>>>>>>>> Hence the question I'm raising here is: Do we want to have an API >>>>>>>>>> where >>>>>>>>>> >>>>>>>>> you >>>>>>>>> >>>>>>>>>> code "the happy path", and then have a different code path for >>>>>>>>>> failures >>>>>>>>>> (using exceptions and handlers); or should we treat both >>>>>>>>>> Success and >>>>>>>>>> Failure in the same way? >>>>>>>>>> >>>>>>>>>> I think the failure/exception handling approach (as proposed in >>>>>>>>>> this >>>>>>>>>> >>>>>>>>> KIP) >>>>>>>> is well-suited for errors in the category of deserialization problems >>>>>>>>> aka >>>>>>>> poison pills, partly because the (default) serdes are defined through >>>>>>>>>> configuration (explicit serdes however are defined through API >>>>>>>>>> calls). >>>>>>>>>> >>>>>>>>>> However, I'm not yet convinced that the failure/exception handling >>>>>>>>>> >>>>>>>>> approach >>>>>>>>> >>>>>>>>>> is the best idea for user code exceptions, e.g. if you fail to >>>>>>>>>> guard >>>>>>>>>> against NPE in your lambdas or divide a number by zero. >>>>>>>>>> >>>>>>>>>> scala> val stream = Seq(1, 2, 3, 4, 5) >>>>>>>>>> stream: Seq[Int] = List(1, 2, 3, 4, 5) >>>>>>>>>> >>>>>>>>>> // Here: Fallback to a sane default when encountering failed >>>>>>>>>> >>>>>>>>> records >>>>>>>> scala> stream.map(x => Try(1/(3 - x))).flatMap(t => >>>>>>>>>> Seq(t.getOrElse(42))) >>>>>>>>>> res19: Seq[Int] = List(0, 1, 42, -1, 0) >>>>>>>>>> >>>>>>>>>> // Here: Skip over failed records >>>>>>>>>> scala> stream.map(x => Try(1/(3 - x))).collect{ case >>>>>>>>>> Success(s) >>>>>>>>>> >>>>>>>>> => s >>>>>>>> } >>>>>>>>>> res20: Seq[Int] = List(0, 1, -1, 0) >>>>>>>>>> >>>>>>>>>> The above is more natural to me than using error handlers to define >>>>>>>>>> how >>>>>>>>>> >>>>>>>>> to >>>>>>>>> >>>>>>>>>> deal with failed records (here, the value `3` causes an arithmetic >>>>>>>>>> exception). Again, it might help the KIP if we added an end-to-end >>>>>>>>>> >>>>>>>>> example >>>>>>>>> >>>>>>>>>> for such user code errors. >>>>>>>>>> ----------------------------snip---------------------------- >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak < >>>>>>>>>> >>>>>>>>> jan.filip...@trivago.com> >>>>>>>> wrote: >>>>>>>>>> Hi Jay, >>>>>>>>>>> Eno mentioned that he will narrow down the scope to only >>>>>>>>>>> >>>>>>>>>> ConsumerRecord >>>>>>>> deserialisation. >>>>>>>>>>> I am working with Database Changelogs only. I would really not >>>>>>>>>>> like >>>>>>>>>>> to >>>>>>>>>>> >>>>>>>>>> see >>>>>>>>>> a dead letter queue or something >>>>>>>>>>> similliar. how am I expected to get these back in order. Just >>>>>>>>>>> grind >>>>>>>>>>> to >>>>>>>>>>> hold an call me on the weekend. I'll fix it >>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead letters. >>>>>>>>>>> >>>>>>>>>> (where >>>>>>>> reprocessing might be even the faster fix) >>>>>>>>>>> Best Jan >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote: >>>>>>>>>>> >>>>>>>>>>> - I think we should hold off on retries unless we have >>>>>>>>>>> worked >>>>>>>>>>> out >>>>>>>> the >>>>>>>>>> full usage pattern, people can always implement their own. I >>>>>>>>>>> think >>>>>>>> the idea >>>>>>>>>>>> is that you send the message to some kind of dead >>>>>>>>>>>> letter queue >>>>>>>>>>>> >>>>>>>>>>> and >>>>>>>> then >>>>>>>>>>>> replay these later. This obviously destroys all semantic >>>>>>>>>>>> >>>>>>>>>>> guarantees >>>>>>>> we are >>>>>>>>>>>> working hard to provide right now, which may be okay. >>>>>>>>>>>> >>>>>>>>>>>>
signature.asc
Description: OpenPGP digital signature