Answers inline: > On 22 Jun 2017, at 03:26, Guozhang Wang <wangg...@gmail.com> wrote: > > Thanks for the updated KIP, some more comments: > > 1.The config name is "default.deserialization.exception.handler" while the > interface class name is "RecordExceptionHandler", which is more general > than the intended purpose. Could we rename the class name accordingly?
Sure. > > 2. Could you describe the full implementation of "DefaultExceptionHandler", > currently it is not clear to me how it is implemented with the configured > value. > > In addition, I think we do not need to include an additional > "DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE_CONFIG" as the configure() > function is mainly used for users to pass any customized parameters that is > out of the Streams library; plus adding such additional config sounds > over-complicated for a default exception handler. Instead I'd suggest we > just provide two handlers (or three if people feel strong about the > LogAndThresholdExceptionHandler), one for FailOnExceptionHandler and one > for LogAndContinueOnExceptionHandler. And we can set > LogAndContinueOnExceptionHandler > by default. > That's what I had originally. Jay mentioned he preferred one default class, with config options. So with that approach, you'd have 2 config options, one for failing, one for continuing, and the one exception handler would take those options during it's configure() call. After checking the other exception handlers in the code, I might revert back to what I originally had (2 default handlers) as Guozhang also re-suggests, but still have the interface extend Configurable. Guozhang, you ok with that? In that case there is no need for the response config option. Thanks Eno > > Guozhang > > > > > > > > > On Wed, Jun 21, 2017 at 1:39 AM, Eno Thereska <eno.there...@gmail.com > <mailto:eno.there...@gmail.com>> > wrote: > >> Thanks Guozhang, >> >> I’ve updated the KIP and hopefully addressed all the comments so far. In >> the process also changed the name of the KIP to reflect its scope better: >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+> >> deserialization+exception+handlers <https://cwiki.apache.org/ >> <https://cwiki.apache.org/> >> confluence/display/KAFKA/KIP-161:+streams+deserialization+ >> exception+handlers> >> >> Any other feedback appreciated, otherwise I’ll start the vote soon. >> >> Thanks >> Eno >> >>> On Jun 12, 2017, at 6:28 AM, Guozhang Wang <wangg...@gmail.com> wrote: >>> >>> Eno, Thanks for bringing this proposal up and sorry for getting late on >>> this. Here are my two cents: >>> >>> 1. First some meta comments regarding "fail fast" v.s. "making >> progress". I >>> agree that in general we should better "enforce user to do the right >> thing" >>> in system design, but we also need to keep in mind that Kafka is a >>> multi-tenant system, i.e. from a Streams app's pov you probably would not >>> control the whole streaming processing pipeline end-to-end. E.g. Your >> input >>> data may not be controlled by yourself; it could be written by another >> app, >>> or another team in your company, or even a different organization, and if >>> an error happens maybe you cannot fix "to do the right thing" just by >>> yourself in time. In such an environment I think it is important to leave >>> the door open to let users be more resilient. So I find the current >>> proposal which does leave the door open for either fail-fast or make >>> progress quite reasonable. >>> >>> 2. On the other hand, if the question is whether we should provide a >>> built-in "send to bad queue" handler from the library, I think that might >>> be an overkill: with some tweaks (see my detailed comments below) on the >>> API we can allow users to implement such handlers pretty easily. In >> fact, I >>> feel even "LogAndThresholdExceptionHandler" is not necessary as a >> built-in >>> handler, as it would then require users to specify the threshold via >>> configs, etc. I think letting people provide such "eco-libraries" may be >>> better. >>> >>> 3. Regarding the CRC error: today we validate CRC on both the broker end >>> upon receiving produce requests and on consumer end upon receiving fetch >>> responses; and if the CRC validation fails in the former case it would >> not >>> be appended to the broker logs. So if we do see a CRC failure on the >>> consumer side it has to be that either we have a flipped bit on the >> broker >>> disks or over the wire. For the first case it is fatal while for the >> second >>> it is retriable. Unfortunately we cannot tell which case it is when >> seeing >>> CRC validation failures. But in either case, just skipping and making >>> progress seems not a good choice here, and hence I would personally >> exclude >>> these errors from the general serde errors to NOT leave the door open of >>> making progress. >>> >>> Currently such errors are thrown as KafkaException that wraps an >>> InvalidRecordException, which may be too general and we could consider >> just >>> throwing the InvalidRecordException directly. But that could be an >>> orthogonal discussion if we agrees that CRC failures should not be >>> considered in this KIP. >>> >>> ---------------- >>> >>> Now some detailed comments: >>> >>> 4. Could we consider adding the processor context in the handle() >> function >>> as well? This context will be wrapping as the source node that is about >> to >>> process the record. This could expose more info like which task / source >>> node sees this error, which timestamp of the message, etc, and also can >>> allow users to implement their handlers by exposing some metrics, by >>> calling context.forward() to implement the "send to bad queue" behavior >> etc. >>> >>> 5. Could you add the string name of >>> StreamsConfig.DEFAULT_RECORD_EXCEPTION_HANDLER as well in the KIP? >>> Personally I find "default" prefix a bit misleading since we do not allow >>> users to override it per-node yet. But I'm okay either way as I can see >> we >>> may extend it in the future and probably would like to not rename the >>> config again. Also from the experience of `default partitioner` and >>> `default timestamp extractor` we may also make sure that the passed in >>> object can be either a string "class name" or a class object? >>> >>> >>> Guozhang >>> >>> >>> On Wed, Jun 7, 2017 at 2:16 PM, Jan Filipiak <jan.filip...@trivago.com> >>> wrote: >>> >>>> Hi Eno, >>>> >>>> On 07.06.2017 22:49, Eno Thereska wrote: >>>> >>>>> Comments inline: >>>>> >>>>> On 5 Jun 2017, at 18:19, Jan Filipiak <jan.filip...@trivago.com> >> wrote: >>>>>> >>>>>> Hi >>>>>> >>>>>> just my few thoughts >>>>>> >>>>>> On 05.06.2017 11:44, Eno Thereska wrote: >>>>>> >>>>>>> Hi there, >>>>>>> >>>>>>> Sorry for the late reply, I was out this past week. Looks like good >>>>>>> progress was made with the discussions either way. Let me recap a >> couple of >>>>>>> points I saw into one big reply: >>>>>>> >>>>>>> 1. Jan mentioned CRC errors. I think this is a good point. As these >>>>>>> happen in Kafka, before Kafka Streams gets a chance to inspect >> anything, >>>>>>> I'd like to hear the opinion of more Kafka folks like Ismael or >> Jason on >>>>>>> this one. Currently the documentation is not great with what to do >> once a >>>>>>> CRC check has failed. From looking at the code, it looks like the >> client >>>>>>> gets a KafkaException (bubbled up from the fetcher) and currently we >> in >>>>>>> streams catch this as part of poll() and fail. It might be >> advantageous to >>>>>>> treat CRC handling in a similar way to serialisation handling (e.g., >> have >>>>>>> the option to fail/skip). Let's see what the other folks say. >> Worst-case we >>>>>>> can do a separate KIP for that if it proved too hard to do in one go. >>>>>>> >>>>>> there is no reasonable way to "skip" a crc error. How can you know the >>>>>> length you read was anything reasonable? you might be completely lost >>>>>> inside your response. >>>>>> >>>>> On the client side, every record received is checked for validity. As >> it >>>>> happens, if the CRC check fails the exception is wrapped with a >>>>> KafkaException that is thrown all the way to poll(). Assuming we change >>>>> that and poll() throws a CRC exception, I was thinking we could treat >> it >>>>> similarly to a deserialize exception and pass it to the exception >> handler >>>>> to decide what to do. Default would be to fail. This might need a >> Kafka KIP >>>>> btw and can be done separately from this KIP, but Jan, would you find >> this >>>>> useful? >>>>> >>>> I don't think so. IMO you can not reasonably continue parsing when the >>>> checksum of a message is not correct. If you are not sure you got the >>>> correct length, how can you be sure to find the next record? I would >> always >>>> straight fail in all cases. Its to hard for me to understand why one >> would >>>> try to continue. I mentioned CRC's because thats the only bad pills I >> ever >>>> saw so far. But I am happy that it just stopped and I could check what >> was >>>> going on. This will also be invasive in the client code then. >>>> >>>> If you ask me, I am always going to vote for "grind to halt" let the >>>> developers see what happened and let them fix it. It helps building good >>>> kafka experiences and better software and architectures. For me this is: >>>> "force the user todo the right thing". https://youtu.be/aAb7hSCtvGw? >> t=374 >>>> eg. not letting unexpected input slip by. Letting unexpected input >> slip by >>>> is what bought us 15+years of war of all sorts of ingestion attacks. I >>>> don't even dare to estimate how many missingrecords-search-teams going >> be >>>> formed, maybe some hackerone for stream apps :D >>>> >>>> Best Jan >>>> >>>> >>>>> >>>>>>> At a minimum, handling this type of exception will need to involve >> the >>>>>>> exactly-once (EoS) logic. We'd still allow the option of failing or >>>>>>> skipping, but EoS would need to clean up by rolling back all the side >>>>>>> effects from the processing so far. Matthias, how does this sound? >>>>>>> >>>>>> Eos will not help the record might be 5,6 repartitions down into the >>>>>> topology. I haven't followed but I pray you made EoS optional! We >> don't >>>>>> need this and we don't want this and we will turn it off if it comes. >> So I >>>>>> wouldn't recommend relying on it. The option to turn it off is better >> than >>>>>> forcing it and still beeing unable to rollback badpills (as explained >>>>>> before) >>>>>> >>>>> Yeah as Matthias mentioned EoS is optional. >>>>> >>>>> Thanks, >>>>> Eno >>>>> >>>>> >>>>> 6. Will add an end-to-end example as Michael suggested. >>>>>>> >>>>>>> Thanks >>>>>>> Eno >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matth...@confluent.io> >> wrote: >>>>>>>> >>>>>>>> What I don't understand is this: >>>>>>>> >>>>>>>> From there on its the easiest way forward: fix, redeploy, start => >>>>>>>>> done >>>>>>>>> >>>>>>>> If you have many producers that work fine and a new "bad" producer >>>>>>>> starts up and writes bad data into your input topic, your Streams >> app >>>>>>>> dies but all your producers, including the bad one, keep writing. >>>>>>>> >>>>>>>> Thus, how would you fix this, as you cannot "remove" the corrupted >> date >>>>>>>> from the topic? It might take some time to identify the root cause >> and >>>>>>>> stop the bad producer. Up to this point you get good and bad data >> into >>>>>>>> your Streams input topic. If Streams app in not able to skip over >> those >>>>>>>> bad records, how would you get all the good data from the topic? Not >>>>>>>> saying it's not possible, but it's extra work copying the data with >> a >>>>>>>> new non-Streams consumer-producer-app into a new topic and than feed >>>>>>>> your Streams app from this new topic -- you also need to update all >>>>>>>> your >>>>>>>> upstream producers to write to the new topic. >>>>>>>> >>>>>>>> Thus, if you want to fail fast, you can still do this. And after you >>>>>>>> detected and fixed the bad producer you might just reconfigure your >> app >>>>>>>> to skip bad records until it reaches the good part of the data. >>>>>>>> Afterwards, you could redeploy with fail-fast again. >>>>>>>> >>>>>>>> >>>>>>>> Thus, for this pattern, I actually don't see any reason why to stop >> the >>>>>>>> Streams app at all. If you have a callback, and use the callback to >>>>>>>> raise an alert (and maybe get the bad data into a bad record >> queue), it >>>>>>>> will not take longer to identify and stop the "bad" producer. But >> for >>>>>>>> this case, you have zero downtime for your Streams app. >>>>>>>> >>>>>>>> This seems to be much simpler. Or do I miss anything? >>>>>>>> >>>>>>>> >>>>>>>> Having said this, I agree that the "threshold based callback" might >> be >>>>>>>> questionable. But as you argue for strict "fail-fast", I want to >> argue >>>>>>>> that this must not always be the best pattern to apply and that the >>>>>>>> overall KIP idea is super useful from my point of view. >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> On 6/3/17 11:57 AM, Jan Filipiak wrote: >>>>>>>> >>>>>>>>> Could not agree more! >>>>>>>>> >>>>>>>>> But then I think the easiest is still: print exception and die. >>>>>>>>> From there on its the easiest way forward: fix, redeploy, start => >>>>>>>>> done >>>>>>>>> >>>>>>>>> All the other ways to recover a pipeline that was processing >> partially >>>>>>>>> all the time >>>>>>>>> and suddenly went over a "I cant take it anymore" threshold is not >>>>>>>>> straight forward IMO. >>>>>>>>> >>>>>>>>> How to find the offset, when it became to bad when it is not the >>>>>>>>> latest >>>>>>>>> commited one? >>>>>>>>> How to reset there? with some reasonable stuff in your rockses? >>>>>>>>> >>>>>>>>> If one would do the following. The continuing Handler would measure >>>>>>>>> for >>>>>>>>> a threshold and >>>>>>>>> would terminate after a certain threshold has passed (per task). >> Then >>>>>>>>> one can use offset commit/ flush intervals >>>>>>>>> to make reasonable assumption of how much is slipping by + you get >> an >>>>>>>>> easy recovery when it gets to bad >>>>>>>>> + you could also account for "in processing" records. >>>>>>>>> >>>>>>>>> Setting this threshold to zero would cover all cases with 1 >>>>>>>>> implementation. It is still beneficial to have it pluggable >>>>>>>>> >>>>>>>>> Again CRC-Errors are the only bad pills we saw in production for >> now. >>>>>>>>> >>>>>>>>> Best Jan >>>>>>>>> >>>>>>>>> >>>>>>>>> On 02.06.2017 17:37, Jay Kreps wrote: >>>>>>>>> >>>>>>>>>> Jan, I agree with you philosophically. I think one practical >>>>>>>>>> challenge >>>>>>>>>> has >>>>>>>>>> to do with data formats. Many people use untyped events, so there >> is >>>>>>>>>> simply >>>>>>>>>> no guarantee on the form of the input. E.g. many companies use >> JSON >>>>>>>>>> without >>>>>>>>>> any kind of schema so it becomes very hard to assert anything >> about >>>>>>>>>> the >>>>>>>>>> input which makes these programs very fragile to the "one >> accidental >>>>>>>>>> message publication that creates an unsolvable problem. >>>>>>>>>> >>>>>>>>>> For that reason I do wonder if limiting to just serialization >>>>>>>>>> actually >>>>>>>>>> gets >>>>>>>>>> you a useful solution. For JSON it will help with the problem of >>>>>>>>>> non-parseable JSON, but sounds like it won't help in the case >> where >>>>>>>>>> the >>>>>>>>>> JSON is well-formed but does not have any of the fields you expect >>>>>>>>>> and >>>>>>>>>> depend on for your processing. I expect the reason for limiting >> the >>>>>>>>>> scope >>>>>>>>>> is it is pretty hard to reason about correctness for anything that >>>>>>>>>> stops in >>>>>>>>>> the middle of processing an operator DAG? >>>>>>>>>> >>>>>>>>>> -Jay >>>>>>>>>> >>>>>>>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak < >>>>>>>>>> jan.filip...@trivago.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> IMHO your doing it wrong then. + building to much support into the >>>>>>>>>>> kafka >>>>>>>>>>> eco system is very counterproductive in fostering a happy >> userbase >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 02.06.2017 13:15, Damian Guy wrote: >>>>>>>>>>> >>>>>>>>>>> Jan, you have a choice to Fail fast if you want. This is about >>>>>>>>>>>> giving >>>>>>>>>>>> people options and there are times when you don't want to fail >>>>>>>>>>>> fast. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak < >> jan.filip...@trivago.com >>>>>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi >>>>>>>>>>>> >>>>>>>>>>>>> 1. >>>>>>>>>>>>> That greatly complicates monitoring. Fail Fast gives you that >>>>>>>>>>>>> when >>>>>>>>>>>>> you >>>>>>>>>>>>> monitor only the lag of all your apps >>>>>>>>>>>>> you are completely covered. With that sort of new application >>>>>>>>>>>>> Monitoring >>>>>>>>>>>>> is very much more complicated as >>>>>>>>>>>>> you know need to monitor fail % of some special apps aswell. >> In my >>>>>>>>>>>>> opinion that is a huge downside already. >>>>>>>>>>>>> >>>>>>>>>>>>> 2. >>>>>>>>>>>>> using a schema regerstry like Avrostuff it might not even be >> the >>>>>>>>>>>>> record >>>>>>>>>>>>> that is broken, it might be just your app >>>>>>>>>>>>> unable to fetch a schema it needs now know. Maybe you got >>>>>>>>>>>>> partitioned >>>>>>>>>>>>> away from that registry. >>>>>>>>>>>>> >>>>>>>>>>>>> 3. When you get alerted because of to high fail percentage. >> what >>>>>>>>>>>>> are the >>>>>>>>>>>>> steps you gonna do? >>>>>>>>>>>>> shut it down to buy time. fix the problem. spend way to much >> time >>>>>>>>>>>>> to >>>>>>>>>>>>> find a good reprocess offset. >>>>>>>>>>>>> Your timewindows are in bad shape anyways, and you pretty much >>>>>>>>>>>>> lost. >>>>>>>>>>>>> This routine is nonsense. >>>>>>>>>>>>> >>>>>>>>>>>>> Dead letter queues would be the worst possible addition to the >>>>>>>>>>>>> kafka >>>>>>>>>>>>> toolkit that I can think of. It just doesn't fit the >> architecture >>>>>>>>>>>>> of having clients falling behind is a valid option. >>>>>>>>>>>>> >>>>>>>>>>>>> Further. I mentioned already the only bad pill ive seen so far >> is >>>>>>>>>>>>> crc >>>>>>>>>>>>> errors. any plans for those? >>>>>>>>>>>>> >>>>>>>>>>>>> Best Jan >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 02.06.2017 11:34, Damian Guy wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> I agree with what Matthias has said w.r.t failing fast. There >> are >>>>>>>>>>>>>> plenty >>>>>>>>>>>>>> >>>>>>>>>>>>>> of >>>>>>>>>>>>> >>>>>>>>>>>>> times when you don't want to fail-fast and must attempt to >> make >>>>>>>>>>>>>> >>>>>>>>>>>>>> progress. >>>>>>>>>>>>> >>>>>>>>>>>>> The dead-letter queue is exactly for these circumstances. Of >>>>>>>>>>>>>> course if >>>>>>>>>>>>>> every record is failing, then you probably do want to give up. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax < >>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> First a meta comment. KIP discussion should take place on the >> dev >>>>>>>>>>>>>> list >>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- if user list is cc'ed please make sure to reply to both >>>>>>>>>>>>>>> lists. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot of >>>>>>>>>>>>>> sense to >>>>>>>>>>>>>> >>>>>>>>>>>>>>> focus on deserialization exceptions for now. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> With regard to corrupted state stores, would it make sense to >>>>>>>>>>>>>>> fail a >>>>>>>>>>>>>>> task and wipe out the store to repair it via recreation from >> the >>>>>>>>>>>>>>> changelog? That's of course a quite advance pattern, but I >> want >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> bring >>>>>>>>>>>>>>> it up to design the first step in a way such that we can get >>>>>>>>>>>>>>> there (if >>>>>>>>>>>>>>> we think it's a reasonable idea). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I also want to comment about fail fast vs making progress. I >>>>>>>>>>>>>>> think that >>>>>>>>>>>>>>> fail-fast must not always be the best option. The scenario I >>>>>>>>>>>>>>> have in >>>>>>>>>>>>>>> mind is like this: you got a bunch of producers that feed the >>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>> input topic. Most producers work find, but maybe one producer >>>>>>>>>>>>>>> miss >>>>>>>>>>>>>>> behaves and the data it writes is corrupted. You might not >> even >>>>>>>>>>>>>>> be able >>>>>>>>>>>>>>> to recover this lost data at any point -- thus, there is no >>>>>>>>>>>>>>> reason to >>>>>>>>>>>>>>> stop processing but you just skip over those records. Of >>>>>>>>>>>>>>> course, you >>>>>>>>>>>>>>> need to fix the root cause, and thus you need to alert >> (either >>>>>>>>>>>>>>> via logs >>>>>>>>>>>>>>> of the exception handler directly) and you need to start to >>>>>>>>>>>>>>> investigate >>>>>>>>>>>>>>> to find the bad producer, shut it down and fix it. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Here the dead letter queue comes into place. From my >>>>>>>>>>>>>>> understanding, the >>>>>>>>>>>>>>> purpose of this feature is solely enable post debugging. I >> don't >>>>>>>>>>>>>>> think >>>>>>>>>>>>>>> those record would be fed back at any point in time (so I >> don't >>>>>>>>>>>>>>> see any >>>>>>>>>>>>>>> ordering issue -- a skipped record, with this regard, is just >>>>>>>>>>>>>>> "fully >>>>>>>>>>>>>>> processed"). Thus, the dead letter queue should actually >> encode >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> original records metadata (topic, partition offset etc) to >>>>>>>>>>>>>>> enable >>>>>>>>>>>>>>> such >>>>>>>>>>>>>>> debugging. I guess, this might also be possible if you just >> log >>>>>>>>>>>>>>> the bad >>>>>>>>>>>>>>> records, but it would be harder to access (you first must >> find >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> Streams instance that did write the log and extract the >>>>>>>>>>>>>>> information >>>>>>>>>>>>>>> from >>>>>>>>>>>>>>> there). Reading it from topic is much simpler. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I also want to mention the following. Assume you have such a >>>>>>>>>>>>>>> topic with >>>>>>>>>>>>>>> some bad records and some good records. If we always >> fail-fast, >>>>>>>>>>>>>>> it's >>>>>>>>>>>>>>> going to be super hard to process the good data. You would >> need >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> write >>>>>>>>>>>>>>> an extra app that copied the data into a new topic filtering >>>>>>>>>>>>>>> out the >>>>>>>>>>>>>>> bad >>>>>>>>>>>>>>> records (or apply the map() workaround withing stream). So I >>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>> think >>>>>>>>>>>>>>> that failing fast is most likely the best option in >> production >>>>>>>>>>>>>>> is >>>>>>>>>>>>>>> necessarily, true. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Or do you think there are scenarios, for which you can >> recover >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> corrupted records successfully? And even if this is >> possible, it >>>>>>>>>>>>>>> might >>>>>>>>>>>>>>> be a case for reprocessing instead of failing the whole >>>>>>>>>>>>>>> application? >>>>>>>>>>>>>>> Also, if you think you can "repair" a corrupted record, >> should >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> handler allow to return a "fixed" record? This would solve >> the >>>>>>>>>>>>>>> ordering >>>>>>>>>>>>>>> problem. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - I think it would help to improve the KIP by adding an >>>>>>>>>>>>>>>> end-to-end >>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>> example that demonstrates, with the DSL and with the >> Processor >>>>>>>>>>>>>>>> API, >>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> user would write a simple application that would then be >>>>>>>>>>>>>>>> augmented >>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> proposed KIP changes to handle exceptions. It should also >>>>>>>>>>>>>>>> become much >>>>>>>>>>>>>>>> clearer then that e.g. the KIP would lead to different code >>>>>>>>>>>>>>>> paths for >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> >>>>>>>>>>>>>> happy case and any failure scenarios. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> - Do we have sufficient information available to make >> informed >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> decisions >>>>>>>>>>>>>>> >>>>>>>>>>>>>> on >>>>>>>>>>>>>> >>>>>>>>>>>>>>> what to do next? For example, do we know in which part of >> the >>>>>>>>>>>>>>>> topology >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> record failed? `ConsumerRecord` gives us access to topic, >>>>>>>>>>>>>>>> partition, >>>>>>>>>>>>>>>> offset, timestamp, etc., but what about topology-related >>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> what is the associated state store, if any)? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Only partly on-topic for the scope of this KIP, but this >> is >>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> bigger picture: This KIP would give users the option to send >>>>>>>>>>>>>>>> corrupted >>>>>>>>>>>>>>>> records to dead letter queue (quarantine topic). But, what >>>>>>>>>>>>>>>> pattern >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>> >>>>>>>>>>>>>> we advocate to process such a dead letter queue then, e.g. >> how to >>>>>>>>>>>>>> allow >>>>>>>>>>>>>> >>>>>>>>>>>>>>> for >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> retries with backoff ("If the first record in the dead letter >>>>>>>>>>>>>>>> queue >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> fails >>>>>>>>>>>>>>> >>>>>>>>>>>>>> again, then try the second record for the time being and go >> back >>>>>>>>>>>>>> to the >>>>>>>>>>>>>> >>>>>>>>>>>>>>> first record at a later time"). Jay and Jan already alluded >> to >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ordering >>>>>>>>>>>>>>> >>>>>>>>>>>>>> problems that will be caused by dead letter queues. As I said, >>>>>>>>>>>>>> retries >>>>>>>>>>>>>> >>>>>>>>>>>>>>> might be out of scope but perhaps the implications should be >>>>>>>>>>>>>>>> considered >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> possible? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Also, I wrote the text below before reaching the point in >> the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> conversation >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> that this KIP's scope will be limited to exceptions in the >>>>>>>>>>>>>>>> category of >>>>>>>>>>>>>>>> poison pills / deserialization errors. But since Jay >> brought >>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> errors again, I decided to include it again. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ----------------------------snip-------------------------- >> -- >>>>>>>>>>>>>>>> A meta comment: I am not sure about this split between the >>>>>>>>>>>>>>>> code for >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure >>>>>>>>>>>>>>>> path >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> (using >>>>>>>>>>>>>>> >>>>>>>>>>>>>> exception handlers). In Scala, for example, we can do: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> scala> val computation = scala.util.Try(1 / 0) >>>>>>>>>>>>>>>> computation: scala.util.Try[Int] = >>>>>>>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> scala> computation.getOrElse(42) >>>>>>>>>>>>>>>> res2: Int = 42 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Another example with Scala's pattern matching, which is >>>>>>>>>>>>>>>> similar to >>>>>>>>>>>>>>>> `KStream#branch()`: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> computation match { >>>>>>>>>>>>>>>> case scala.util.Success(x) => x * 5 >>>>>>>>>>>>>>>> case scala.util.Failure(_) => 42 >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> (The above isn't the most idiomatic way to handle this in >>>>>>>>>>>>>>>> Scala, >>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> that's >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> not the point I'm trying to make here.) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hence the question I'm raising here is: Do we want to have >> an >>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>> where >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> code "the happy path", and then have a different code path >> for >>>>>>>>>>>>>>>> failures >>>>>>>>>>>>>>>> (using exceptions and handlers); or should we treat both >>>>>>>>>>>>>>>> Success and >>>>>>>>>>>>>>>> Failure in the same way? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I think the failure/exception handling approach (as >> proposed in >>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> KIP) >>>>>>>>>>>>>>> >>>>>>>>>>>>>> is well-suited for errors in the category of deserialization >>>>>>>>>>>>>> problems >>>>>>>>>>>>>> >>>>>>>>>>>>>>> aka >>>>>>>>>>>>>>> >>>>>>>>>>>>>> poison pills, partly because the (default) serdes are defined >>>>>>>>>>>>>> through >>>>>>>>>>>>>> >>>>>>>>>>>>>>> configuration (explicit serdes however are defined through >> API >>>>>>>>>>>>>>>> calls). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> However, I'm not yet convinced that the failure/exception >>>>>>>>>>>>>>>> handling >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> is the best idea for user code exceptions, e.g. if you fail >> to >>>>>>>>>>>>>>>> guard >>>>>>>>>>>>>>>> against NPE in your lambdas or divide a number by zero. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> scala> val stream = Seq(1, 2, 3, 4, 5) >>>>>>>>>>>>>>>> stream: Seq[Int] = List(1, 2, 3, 4, 5) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // Here: Fallback to a sane default when encountering >>>>>>>>>>>>>>>> failed >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> records >>>>>>>>>>>>>>> >>>>>>>>>>>>>> scala> stream.map(x => Try(1/(3 - x))).flatMap(t => >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Seq(t.getOrElse(42))) >>>>>>>>>>>>>>>> res19: Seq[Int] = List(0, 1, 42, -1, 0) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> // Here: Skip over failed records >>>>>>>>>>>>>>>> scala> stream.map(x => Try(1/(3 - x))).collect{ case >>>>>>>>>>>>>>>> Success(s) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> => s >>>>>>>>>>>>>>> >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>>> res20: Seq[Int] = List(0, 1, -1, 0) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The above is more natural to me than using error handlers to >>>>>>>>>>>>>>>> define >>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> deal with failed records (here, the value `3` causes an >>>>>>>>>>>>>>>> arithmetic >>>>>>>>>>>>>>>> exception). Again, it might help the KIP if we added an >>>>>>>>>>>>>>>> end-to-end >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> example >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> for such user code errors. >>>>>>>>>>>>>>>> ----------------------------snip-------------------------- >> -- >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak < >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> jan.filip...@trivago.com> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Jay, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Eno mentioned that he will narrow down the scope to only >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ConsumerRecord >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> deserialisation. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am working with Database Changelogs only. I would really >> not >>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> see >>>>>>>>>>>>>>>> a dead letter queue or something >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> similliar. how am I expected to get these back in order. >> Just >>>>>>>>>>>>>>>>> grind >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> hold an call me on the weekend. I'll fix it >>>>>>>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead >>>>>>>>>>>>>>>>> letters. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> (where >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> reprocessing might be even the faster fix) >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - I think we should hold off on retries unless we >> have >>>>>>>>>>>>>>>>> worked >>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>>>> full usage pattern, people can always implement their >>>>>>>>>>>>>>>> own. I >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the idea >>>>>>>>>>>>>> >>>>>>>>>>>>>>> is that you send the message to some kind of dead >>>>>>>>>>>>>>>>>> letter queue >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> then >>>>>>>>>>>>>> >>>>>>>>>>>>>>> replay these later. This obviously destroys all >> semantic >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> guarantees >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> we are >>>>>>>>>>>>>> >>>>>>>>>>>>>>> working hard to provide right now, which may be okay. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>> >>> >>> >>> -- >>> -- Guozhang >> >> > > > -- > -- Guozhang