I also think, that one config is better, with two default implementations: failing and log-and-continue
However, I think we should fail by default. Similar to timestamp extractor as "silent" data loss is no good default behavior IMHO. -Matthias On 6/22/17 11:00 AM, Eno Thereska wrote: > Answers inline: > >> On 22 Jun 2017, at 03:26, Guozhang Wang <wangg...@gmail.com> wrote: >> >> Thanks for the updated KIP, some more comments: >> >> 1.The config name is "default.deserialization.exception.handler" while the >> interface class name is "RecordExceptionHandler", which is more general >> than the intended purpose. Could we rename the class name accordingly? > > Sure. > > >> >> 2. Could you describe the full implementation of "DefaultExceptionHandler", >> currently it is not clear to me how it is implemented with the configured >> value. >> >> In addition, I think we do not need to include an additional >> "DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE_CONFIG" as the configure() >> function is mainly used for users to pass any customized parameters that is >> out of the Streams library; plus adding such additional config sounds >> over-complicated for a default exception handler. Instead I'd suggest we >> just provide two handlers (or three if people feel strong about the >> LogAndThresholdExceptionHandler), one for FailOnExceptionHandler and one >> for LogAndContinueOnExceptionHandler. And we can set >> LogAndContinueOnExceptionHandler >> by default. >> > > That's what I had originally. Jay mentioned he preferred one default class, > with config options. > So with that approach, you'd have 2 config options, one for failing, one for > continuing, and the one > exception handler would take those options during it's configure() call. > > After checking the other exception handlers in the code, I might revert back > to what I originally had (2 default handlers) > as Guozhang also re-suggests, but still have the interface extend > Configurable. Guozhang, you ok with that? In that case > there is no need for the response config option. > > Thanks > Eno > > >> >> Guozhang >> >> >> >> >> >> >> >> >> On Wed, Jun 21, 2017 at 1:39 AM, Eno Thereska <eno.there...@gmail.com >> <mailto:eno.there...@gmail.com>> >> wrote: >> >>> Thanks Guozhang, >>> >>> I’ve updated the KIP and hopefully addressed all the comments so far. In >>> the process also changed the name of the KIP to reflect its scope better: >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ >>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+> >>> deserialization+exception+handlers <https://cwiki.apache.org/ >>> <https://cwiki.apache.org/> >>> confluence/display/KAFKA/KIP-161:+streams+deserialization+ >>> exception+handlers> >>> >>> Any other feedback appreciated, otherwise I’ll start the vote soon. >>> >>> Thanks >>> Eno >>> >>>> On Jun 12, 2017, at 6:28 AM, Guozhang Wang <wangg...@gmail.com> wrote: >>>> >>>> Eno, Thanks for bringing this proposal up and sorry for getting late on >>>> this. Here are my two cents: >>>> >>>> 1. First some meta comments regarding "fail fast" v.s. "making >>> progress". I >>>> agree that in general we should better "enforce user to do the right >>> thing" >>>> in system design, but we also need to keep in mind that Kafka is a >>>> multi-tenant system, i.e. from a Streams app's pov you probably would not >>>> control the whole streaming processing pipeline end-to-end. E.g. Your >>> input >>>> data may not be controlled by yourself; it could be written by another >>> app, >>>> or another team in your company, or even a different organization, and if >>>> an error happens maybe you cannot fix "to do the right thing" just by >>>> yourself in time. In such an environment I think it is important to leave >>>> the door open to let users be more resilient. So I find the current >>>> proposal which does leave the door open for either fail-fast or make >>>> progress quite reasonable. >>>> >>>> 2. On the other hand, if the question is whether we should provide a >>>> built-in "send to bad queue" handler from the library, I think that might >>>> be an overkill: with some tweaks (see my detailed comments below) on the >>>> API we can allow users to implement such handlers pretty easily. In >>> fact, I >>>> feel even "LogAndThresholdExceptionHandler" is not necessary as a >>> built-in >>>> handler, as it would then require users to specify the threshold via >>>> configs, etc. I think letting people provide such "eco-libraries" may be >>>> better. >>>> >>>> 3. Regarding the CRC error: today we validate CRC on both the broker end >>>> upon receiving produce requests and on consumer end upon receiving fetch >>>> responses; and if the CRC validation fails in the former case it would >>> not >>>> be appended to the broker logs. So if we do see a CRC failure on the >>>> consumer side it has to be that either we have a flipped bit on the >>> broker >>>> disks or over the wire. For the first case it is fatal while for the >>> second >>>> it is retriable. Unfortunately we cannot tell which case it is when >>> seeing >>>> CRC validation failures. But in either case, just skipping and making >>>> progress seems not a good choice here, and hence I would personally >>> exclude >>>> these errors from the general serde errors to NOT leave the door open of >>>> making progress. >>>> >>>> Currently such errors are thrown as KafkaException that wraps an >>>> InvalidRecordException, which may be too general and we could consider >>> just >>>> throwing the InvalidRecordException directly. But that could be an >>>> orthogonal discussion if we agrees that CRC failures should not be >>>> considered in this KIP. >>>> >>>> ---------------- >>>> >>>> Now some detailed comments: >>>> >>>> 4. Could we consider adding the processor context in the handle() >>> function >>>> as well? This context will be wrapping as the source node that is about >>> to >>>> process the record. This could expose more info like which task / source >>>> node sees this error, which timestamp of the message, etc, and also can >>>> allow users to implement their handlers by exposing some metrics, by >>>> calling context.forward() to implement the "send to bad queue" behavior >>> etc. >>>> >>>> 5. Could you add the string name of >>>> StreamsConfig.DEFAULT_RECORD_EXCEPTION_HANDLER as well in the KIP? >>>> Personally I find "default" prefix a bit misleading since we do not allow >>>> users to override it per-node yet. But I'm okay either way as I can see >>> we >>>> may extend it in the future and probably would like to not rename the >>>> config again. Also from the experience of `default partitioner` and >>>> `default timestamp extractor` we may also make sure that the passed in >>>> object can be either a string "class name" or a class object? >>>> >>>> >>>> Guozhang >>>> >>>> >>>> On Wed, Jun 7, 2017 at 2:16 PM, Jan Filipiak <jan.filip...@trivago.com> >>>> wrote: >>>> >>>>> Hi Eno, >>>>> >>>>> On 07.06.2017 22:49, Eno Thereska wrote: >>>>> >>>>>> Comments inline: >>>>>> >>>>>> On 5 Jun 2017, at 18:19, Jan Filipiak <jan.filip...@trivago.com> >>> wrote: >>>>>>> >>>>>>> Hi >>>>>>> >>>>>>> just my few thoughts >>>>>>> >>>>>>> On 05.06.2017 11:44, Eno Thereska wrote: >>>>>>> >>>>>>>> Hi there, >>>>>>>> >>>>>>>> Sorry for the late reply, I was out this past week. Looks like good >>>>>>>> progress was made with the discussions either way. Let me recap a >>> couple of >>>>>>>> points I saw into one big reply: >>>>>>>> >>>>>>>> 1. Jan mentioned CRC errors. I think this is a good point. As these >>>>>>>> happen in Kafka, before Kafka Streams gets a chance to inspect >>> anything, >>>>>>>> I'd like to hear the opinion of more Kafka folks like Ismael or >>> Jason on >>>>>>>> this one. Currently the documentation is not great with what to do >>> once a >>>>>>>> CRC check has failed. From looking at the code, it looks like the >>> client >>>>>>>> gets a KafkaException (bubbled up from the fetcher) and currently we >>> in >>>>>>>> streams catch this as part of poll() and fail. It might be >>> advantageous to >>>>>>>> treat CRC handling in a similar way to serialisation handling (e.g., >>> have >>>>>>>> the option to fail/skip). Let's see what the other folks say. >>> Worst-case we >>>>>>>> can do a separate KIP for that if it proved too hard to do in one go. >>>>>>>> >>>>>>> there is no reasonable way to "skip" a crc error. How can you know the >>>>>>> length you read was anything reasonable? you might be completely lost >>>>>>> inside your response. >>>>>>> >>>>>> On the client side, every record received is checked for validity. As >>> it >>>>>> happens, if the CRC check fails the exception is wrapped with a >>>>>> KafkaException that is thrown all the way to poll(). Assuming we change >>>>>> that and poll() throws a CRC exception, I was thinking we could treat >>> it >>>>>> similarly to a deserialize exception and pass it to the exception >>> handler >>>>>> to decide what to do. Default would be to fail. This might need a >>> Kafka KIP >>>>>> btw and can be done separately from this KIP, but Jan, would you find >>> this >>>>>> useful? >>>>>> >>>>> I don't think so. IMO you can not reasonably continue parsing when the >>>>> checksum of a message is not correct. If you are not sure you got the >>>>> correct length, how can you be sure to find the next record? I would >>> always >>>>> straight fail in all cases. Its to hard for me to understand why one >>> would >>>>> try to continue. I mentioned CRC's because thats the only bad pills I >>> ever >>>>> saw so far. But I am happy that it just stopped and I could check what >>> was >>>>> going on. This will also be invasive in the client code then. >>>>> >>>>> If you ask me, I am always going to vote for "grind to halt" let the >>>>> developers see what happened and let them fix it. It helps building good >>>>> kafka experiences and better software and architectures. For me this is: >>>>> "force the user todo the right thing". https://youtu.be/aAb7hSCtvGw? >>> t=374 >>>>> eg. not letting unexpected input slip by. Letting unexpected input >>> slip by >>>>> is what bought us 15+years of war of all sorts of ingestion attacks. I >>>>> don't even dare to estimate how many missingrecords-search-teams going >>> be >>>>> formed, maybe some hackerone for stream apps :D >>>>> >>>>> Best Jan >>>>> >>>>> >>>>>> >>>>>>>> At a minimum, handling this type of exception will need to involve >>> the >>>>>>>> exactly-once (EoS) logic. We'd still allow the option of failing or >>>>>>>> skipping, but EoS would need to clean up by rolling back all the side >>>>>>>> effects from the processing so far. Matthias, how does this sound? >>>>>>>> >>>>>>> Eos will not help the record might be 5,6 repartitions down into the >>>>>>> topology. I haven't followed but I pray you made EoS optional! We >>> don't >>>>>>> need this and we don't want this and we will turn it off if it comes. >>> So I >>>>>>> wouldn't recommend relying on it. The option to turn it off is better >>> than >>>>>>> forcing it and still beeing unable to rollback badpills (as explained >>>>>>> before) >>>>>>> >>>>>> Yeah as Matthias mentioned EoS is optional. >>>>>> >>>>>> Thanks, >>>>>> Eno >>>>>> >>>>>> >>>>>> 6. Will add an end-to-end example as Michael suggested. >>>>>>>> >>>>>>>> Thanks >>>>>>>> Eno >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>>>>>>>> >>>>>>>>> What I don't understand is this: >>>>>>>>> >>>>>>>>> From there on its the easiest way forward: fix, redeploy, start => >>>>>>>>>> done >>>>>>>>>> >>>>>>>>> If you have many producers that work fine and a new "bad" producer >>>>>>>>> starts up and writes bad data into your input topic, your Streams >>> app >>>>>>>>> dies but all your producers, including the bad one, keep writing. >>>>>>>>> >>>>>>>>> Thus, how would you fix this, as you cannot "remove" the corrupted >>> date >>>>>>>>> from the topic? It might take some time to identify the root cause >>> and >>>>>>>>> stop the bad producer. Up to this point you get good and bad data >>> into >>>>>>>>> your Streams input topic. If Streams app in not able to skip over >>> those >>>>>>>>> bad records, how would you get all the good data from the topic? Not >>>>>>>>> saying it's not possible, but it's extra work copying the data with >>> a >>>>>>>>> new non-Streams consumer-producer-app into a new topic and than feed >>>>>>>>> your Streams app from this new topic -- you also need to update all >>>>>>>>> your >>>>>>>>> upstream producers to write to the new topic. >>>>>>>>> >>>>>>>>> Thus, if you want to fail fast, you can still do this. And after you >>>>>>>>> detected and fixed the bad producer you might just reconfigure your >>> app >>>>>>>>> to skip bad records until it reaches the good part of the data. >>>>>>>>> Afterwards, you could redeploy with fail-fast again. >>>>>>>>> >>>>>>>>> >>>>>>>>> Thus, for this pattern, I actually don't see any reason why to stop >>> the >>>>>>>>> Streams app at all. If you have a callback, and use the callback to >>>>>>>>> raise an alert (and maybe get the bad data into a bad record >>> queue), it >>>>>>>>> will not take longer to identify and stop the "bad" producer. But >>> for >>>>>>>>> this case, you have zero downtime for your Streams app. >>>>>>>>> >>>>>>>>> This seems to be much simpler. Or do I miss anything? >>>>>>>>> >>>>>>>>> >>>>>>>>> Having said this, I agree that the "threshold based callback" might >>> be >>>>>>>>> questionable. But as you argue for strict "fail-fast", I want to >>> argue >>>>>>>>> that this must not always be the best pattern to apply and that the >>>>>>>>> overall KIP idea is super useful from my point of view. >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> On 6/3/17 11:57 AM, Jan Filipiak wrote: >>>>>>>>> >>>>>>>>>> Could not agree more! >>>>>>>>>> >>>>>>>>>> But then I think the easiest is still: print exception and die. >>>>>>>>>> From there on its the easiest way forward: fix, redeploy, start => >>>>>>>>>> done >>>>>>>>>> >>>>>>>>>> All the other ways to recover a pipeline that was processing >>> partially >>>>>>>>>> all the time >>>>>>>>>> and suddenly went over a "I cant take it anymore" threshold is not >>>>>>>>>> straight forward IMO. >>>>>>>>>> >>>>>>>>>> How to find the offset, when it became to bad when it is not the >>>>>>>>>> latest >>>>>>>>>> commited one? >>>>>>>>>> How to reset there? with some reasonable stuff in your rockses? >>>>>>>>>> >>>>>>>>>> If one would do the following. The continuing Handler would measure >>>>>>>>>> for >>>>>>>>>> a threshold and >>>>>>>>>> would terminate after a certain threshold has passed (per task). >>> Then >>>>>>>>>> one can use offset commit/ flush intervals >>>>>>>>>> to make reasonable assumption of how much is slipping by + you get >>> an >>>>>>>>>> easy recovery when it gets to bad >>>>>>>>>> + you could also account for "in processing" records. >>>>>>>>>> >>>>>>>>>> Setting this threshold to zero would cover all cases with 1 >>>>>>>>>> implementation. It is still beneficial to have it pluggable >>>>>>>>>> >>>>>>>>>> Again CRC-Errors are the only bad pills we saw in production for >>> now. >>>>>>>>>> >>>>>>>>>> Best Jan >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 02.06.2017 17:37, Jay Kreps wrote: >>>>>>>>>> >>>>>>>>>>> Jan, I agree with you philosophically. I think one practical >>>>>>>>>>> challenge >>>>>>>>>>> has >>>>>>>>>>> to do with data formats. Many people use untyped events, so there >>> is >>>>>>>>>>> simply >>>>>>>>>>> no guarantee on the form of the input. E.g. many companies use >>> JSON >>>>>>>>>>> without >>>>>>>>>>> any kind of schema so it becomes very hard to assert anything >>> about >>>>>>>>>>> the >>>>>>>>>>> input which makes these programs very fragile to the "one >>> accidental >>>>>>>>>>> message publication that creates an unsolvable problem. >>>>>>>>>>> >>>>>>>>>>> For that reason I do wonder if limiting to just serialization >>>>>>>>>>> actually >>>>>>>>>>> gets >>>>>>>>>>> you a useful solution. For JSON it will help with the problem of >>>>>>>>>>> non-parseable JSON, but sounds like it won't help in the case >>> where >>>>>>>>>>> the >>>>>>>>>>> JSON is well-formed but does not have any of the fields you expect >>>>>>>>>>> and >>>>>>>>>>> depend on for your processing. I expect the reason for limiting >>> the >>>>>>>>>>> scope >>>>>>>>>>> is it is pretty hard to reason about correctness for anything that >>>>>>>>>>> stops in >>>>>>>>>>> the middle of processing an operator DAG? >>>>>>>>>>> >>>>>>>>>>> -Jay >>>>>>>>>>> >>>>>>>>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak < >>>>>>>>>>> jan.filip...@trivago.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> IMHO your doing it wrong then. + building to much support into the >>>>>>>>>>>> kafka >>>>>>>>>>>> eco system is very counterproductive in fostering a happy >>> userbase >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 02.06.2017 13:15, Damian Guy wrote: >>>>>>>>>>>> >>>>>>>>>>>> Jan, you have a choice to Fail fast if you want. This is about >>>>>>>>>>>>> giving >>>>>>>>>>>>> people options and there are times when you don't want to fail >>>>>>>>>>>>> fast. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak < >>> jan.filip...@trivago.com >>>>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi >>>>>>>>>>>>> >>>>>>>>>>>>>> 1. >>>>>>>>>>>>>> That greatly complicates monitoring. Fail Fast gives you that >>>>>>>>>>>>>> when >>>>>>>>>>>>>> you >>>>>>>>>>>>>> monitor only the lag of all your apps >>>>>>>>>>>>>> you are completely covered. With that sort of new application >>>>>>>>>>>>>> Monitoring >>>>>>>>>>>>>> is very much more complicated as >>>>>>>>>>>>>> you know need to monitor fail % of some special apps aswell. >>> In my >>>>>>>>>>>>>> opinion that is a huge downside already. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2. >>>>>>>>>>>>>> using a schema regerstry like Avrostuff it might not even be >>> the >>>>>>>>>>>>>> record >>>>>>>>>>>>>> that is broken, it might be just your app >>>>>>>>>>>>>> unable to fetch a schema it needs now know. Maybe you got >>>>>>>>>>>>>> partitioned >>>>>>>>>>>>>> away from that registry. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 3. When you get alerted because of to high fail percentage. >>> what >>>>>>>>>>>>>> are the >>>>>>>>>>>>>> steps you gonna do? >>>>>>>>>>>>>> shut it down to buy time. fix the problem. spend way to much >>> time >>>>>>>>>>>>>> to >>>>>>>>>>>>>> find a good reprocess offset. >>>>>>>>>>>>>> Your timewindows are in bad shape anyways, and you pretty much >>>>>>>>>>>>>> lost. >>>>>>>>>>>>>> This routine is nonsense. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Dead letter queues would be the worst possible addition to the >>>>>>>>>>>>>> kafka >>>>>>>>>>>>>> toolkit that I can think of. It just doesn't fit the >>> architecture >>>>>>>>>>>>>> of having clients falling behind is a valid option. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Further. I mentioned already the only bad pill ive seen so far >>> is >>>>>>>>>>>>>> crc >>>>>>>>>>>>>> errors. any plans for those? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 02.06.2017 11:34, Damian Guy wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> I agree with what Matthias has said w.r.t failing fast. There >>> are >>>>>>>>>>>>>>> plenty >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> of >>>>>>>>>>>>>> >>>>>>>>>>>>>> times when you don't want to fail-fast and must attempt to >>> make >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> progress. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The dead-letter queue is exactly for these circumstances. Of >>>>>>>>>>>>>>> course if >>>>>>>>>>>>>>> every record is failing, then you probably do want to give up. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax < >>>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> First a meta comment. KIP discussion should take place on the >>> dev >>>>>>>>>>>>>>> list >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- if user list is cc'ed please make sure to reply to both >>>>>>>>>>>>>>>> lists. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot of >>>>>>>>>>>>>>> sense to >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> focus on deserialization exceptions for now. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> With regard to corrupted state stores, would it make sense to >>>>>>>>>>>>>>>> fail a >>>>>>>>>>>>>>>> task and wipe out the store to repair it via recreation from >>> the >>>>>>>>>>>>>>>> changelog? That's of course a quite advance pattern, but I >>> want >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> bring >>>>>>>>>>>>>>>> it up to design the first step in a way such that we can get >>>>>>>>>>>>>>>> there (if >>>>>>>>>>>>>>>> we think it's a reasonable idea). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I also want to comment about fail fast vs making progress. I >>>>>>>>>>>>>>>> think that >>>>>>>>>>>>>>>> fail-fast must not always be the best option. The scenario I >>>>>>>>>>>>>>>> have in >>>>>>>>>>>>>>>> mind is like this: you got a bunch of producers that feed the >>>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>>> input topic. Most producers work find, but maybe one producer >>>>>>>>>>>>>>>> miss >>>>>>>>>>>>>>>> behaves and the data it writes is corrupted. You might not >>> even >>>>>>>>>>>>>>>> be able >>>>>>>>>>>>>>>> to recover this lost data at any point -- thus, there is no >>>>>>>>>>>>>>>> reason to >>>>>>>>>>>>>>>> stop processing but you just skip over those records. Of >>>>>>>>>>>>>>>> course, you >>>>>>>>>>>>>>>> need to fix the root cause, and thus you need to alert >>> (either >>>>>>>>>>>>>>>> via logs >>>>>>>>>>>>>>>> of the exception handler directly) and you need to start to >>>>>>>>>>>>>>>> investigate >>>>>>>>>>>>>>>> to find the bad producer, shut it down and fix it. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Here the dead letter queue comes into place. From my >>>>>>>>>>>>>>>> understanding, the >>>>>>>>>>>>>>>> purpose of this feature is solely enable post debugging. I >>> don't >>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>> those record would be fed back at any point in time (so I >>> don't >>>>>>>>>>>>>>>> see any >>>>>>>>>>>>>>>> ordering issue -- a skipped record, with this regard, is just >>>>>>>>>>>>>>>> "fully >>>>>>>>>>>>>>>> processed"). Thus, the dead letter queue should actually >>> encode >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> original records metadata (topic, partition offset etc) to >>>>>>>>>>>>>>>> enable >>>>>>>>>>>>>>>> such >>>>>>>>>>>>>>>> debugging. I guess, this might also be possible if you just >>> log >>>>>>>>>>>>>>>> the bad >>>>>>>>>>>>>>>> records, but it would be harder to access (you first must >>> find >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> Streams instance that did write the log and extract the >>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>> there). Reading it from topic is much simpler. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I also want to mention the following. Assume you have such a >>>>>>>>>>>>>>>> topic with >>>>>>>>>>>>>>>> some bad records and some good records. If we always >>> fail-fast, >>>>>>>>>>>>>>>> it's >>>>>>>>>>>>>>>> going to be super hard to process the good data. You would >>> need >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> write >>>>>>>>>>>>>>>> an extra app that copied the data into a new topic filtering >>>>>>>>>>>>>>>> out the >>>>>>>>>>>>>>>> bad >>>>>>>>>>>>>>>> records (or apply the map() workaround withing stream). So I >>>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>> that failing fast is most likely the best option in >>> production >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>> necessarily, true. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Or do you think there are scenarios, for which you can >>> recover >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> corrupted records successfully? And even if this is >>> possible, it >>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>> be a case for reprocessing instead of failing the whole >>>>>>>>>>>>>>>> application? >>>>>>>>>>>>>>>> Also, if you think you can "repair" a corrupted record, >>> should >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> handler allow to return a "fixed" record? This would solve >>> the >>>>>>>>>>>>>>>> ordering >>>>>>>>>>>>>>>> problem. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - I think it would help to improve the KIP by adding an >>>>>>>>>>>>>>>>> end-to-end >>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>> example that demonstrates, with the DSL and with the >>> Processor >>>>>>>>>>>>>>>>> API, >>>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> user would write a simple application that would then be >>>>>>>>>>>>>>>>> augmented >>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> proposed KIP changes to handle exceptions. It should also >>>>>>>>>>>>>>>>> become much >>>>>>>>>>>>>>>>> clearer then that e.g. the KIP would lead to different code >>>>>>>>>>>>>>>>> paths for >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> happy case and any failure scenarios. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Do we have sufficient information available to make >>> informed >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> decisions >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> on >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> what to do next? For example, do we know in which part of >>> the >>>>>>>>>>>>>>>>> topology >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> record failed? `ConsumerRecord` gives us access to topic, >>>>>>>>>>>>>>>>> partition, >>>>>>>>>>>>>>>>> offset, timestamp, etc., but what about topology-related >>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> what is the associated state store, if any)? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - Only partly on-topic for the scope of this KIP, but this >>> is >>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> bigger picture: This KIP would give users the option to send >>>>>>>>>>>>>>>>> corrupted >>>>>>>>>>>>>>>>> records to dead letter queue (quarantine topic). But, what >>>>>>>>>>>>>>>>> pattern >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> we advocate to process such a dead letter queue then, e.g. >>> how to >>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> retries with backoff ("If the first record in the dead letter >>>>>>>>>>>>>>>>> queue >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> fails >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> again, then try the second record for the time being and go >>> back >>>>>>>>>>>>>>> to the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> first record at a later time"). Jay and Jan already alluded >>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ordering >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> problems that will be caused by dead letter queues. As I said, >>>>>>>>>>>>>>> retries >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> might be out of scope but perhaps the implications should be >>>>>>>>>>>>>>>>> considered >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> possible? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Also, I wrote the text below before reaching the point in >>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> conversation >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> that this KIP's scope will be limited to exceptions in the >>>>>>>>>>>>>>>>> category of >>>>>>>>>>>>>>>>> poison pills / deserialization errors. But since Jay >>> brought >>>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> errors again, I decided to include it again. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ----------------------------snip-------------------------- >>> -- >>>>>>>>>>>>>>>>> A meta comment: I am not sure about this split between the >>>>>>>>>>>>>>>>> code for >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure >>>>>>>>>>>>>>>>> path >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> (using >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> exception handlers). In Scala, for example, we can do: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> scala> val computation = scala.util.Try(1 / 0) >>>>>>>>>>>>>>>>> computation: scala.util.Try[Int] = >>>>>>>>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> scala> computation.getOrElse(42) >>>>>>>>>>>>>>>>> res2: Int = 42 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Another example with Scala's pattern matching, which is >>>>>>>>>>>>>>>>> similar to >>>>>>>>>>>>>>>>> `KStream#branch()`: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> computation match { >>>>>>>>>>>>>>>>> case scala.util.Success(x) => x * 5 >>>>>>>>>>>>>>>>> case scala.util.Failure(_) => 42 >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> (The above isn't the most idiomatic way to handle this in >>>>>>>>>>>>>>>>> Scala, >>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> that's >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> not the point I'm trying to make here.) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hence the question I'm raising here is: Do we want to have >>> an >>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>> where >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> code "the happy path", and then have a different code path >>> for >>>>>>>>>>>>>>>>> failures >>>>>>>>>>>>>>>>> (using exceptions and handlers); or should we treat both >>>>>>>>>>>>>>>>> Success and >>>>>>>>>>>>>>>>> Failure in the same way? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think the failure/exception handling approach (as >>> proposed in >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> KIP) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> is well-suited for errors in the category of deserialization >>>>>>>>>>>>>>> problems >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> aka >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> poison pills, partly because the (default) serdes are defined >>>>>>>>>>>>>>> through >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> configuration (explicit serdes however are defined through >>> API >>>>>>>>>>>>>>>>> calls). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> However, I'm not yet convinced that the failure/exception >>>>>>>>>>>>>>>>> handling >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> is the best idea for user code exceptions, e.g. if you fail >>> to >>>>>>>>>>>>>>>>> guard >>>>>>>>>>>>>>>>> against NPE in your lambdas or divide a number by zero. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> scala> val stream = Seq(1, 2, 3, 4, 5) >>>>>>>>>>>>>>>>> stream: Seq[Int] = List(1, 2, 3, 4, 5) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> // Here: Fallback to a sane default when encountering >>>>>>>>>>>>>>>>> failed >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> records >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> scala> stream.map(x => Try(1/(3 - x))).flatMap(t => >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Seq(t.getOrElse(42))) >>>>>>>>>>>>>>>>> res19: Seq[Int] = List(0, 1, 42, -1, 0) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> // Here: Skip over failed records >>>>>>>>>>>>>>>>> scala> stream.map(x => Try(1/(3 - x))).collect{ case >>>>>>>>>>>>>>>>> Success(s) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> => s >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> res20: Seq[Int] = List(0, 1, -1, 0) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The above is more natural to me than using error handlers to >>>>>>>>>>>>>>>>> define >>>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> deal with failed records (here, the value `3` causes an >>>>>>>>>>>>>>>>> arithmetic >>>>>>>>>>>>>>>>> exception). Again, it might help the KIP if we added an >>>>>>>>>>>>>>>>> end-to-end >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> example >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> for such user code errors. >>>>>>>>>>>>>>>>> ----------------------------snip-------------------------- >>> -- >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak < >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> jan.filip...@trivago.com> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Jay, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Eno mentioned that he will narrow down the scope to only >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ConsumerRecord >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> deserialisation. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am working with Database Changelogs only. I would really >>> not >>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> see >>>>>>>>>>>>>>>>> a dead letter queue or something >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> similliar. how am I expected to get these back in order. >>> Just >>>>>>>>>>>>>>>>>> grind >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> hold an call me on the weekend. I'll fix it >>>>>>>>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead >>>>>>>>>>>>>>>>>> letters. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> (where >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> reprocessing might be even the faster fix) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> - I think we should hold off on retries unless we >>> have >>>>>>>>>>>>>>>>>> worked >>>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> full usage pattern, people can always implement their >>>>>>>>>>>>>>>>> own. I >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the idea >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> is that you send the message to some kind of dead >>>>>>>>>>>>>>>>>>> letter queue >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> replay these later. This obviously destroys all >>> semantic >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> guarantees >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> we are >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> working hard to provide right now, which may be okay. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>> >>> >> >> >> -- >> -- Guozhang > >
signature.asc
Description: OpenPGP digital signature