I also think, that one config is better, with two default
implementations: failing and log-and-continue

However, I think we should fail by default. Similar to timestamp
extractor as "silent" data loss is no good default behavior IMHO.


-Matthias

On 6/22/17 11:00 AM, Eno Thereska wrote:
> Answers inline: 
> 
>> On 22 Jun 2017, at 03:26, Guozhang Wang <wangg...@gmail.com> wrote:
>>
>> Thanks for the updated KIP, some more comments:
>>
>> 1.The config name is "default.deserialization.exception.handler" while the
>> interface class name is "RecordExceptionHandler", which is more general
>> than the intended purpose. Could we rename the class name accordingly?
> 
> Sure.
> 
> 
>>
>> 2. Could you describe the full implementation of "DefaultExceptionHandler",
>> currently it is not clear to me how it is implemented with the configured
>> value.
>>
>> In addition, I think we do not need to include an additional
>> "DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE_CONFIG" as the configure()
>> function is mainly used for users to pass any customized parameters that is
>> out of the Streams library; plus adding such additional config sounds
>> over-complicated for a default exception handler. Instead I'd suggest we
>> just provide two handlers (or three if people feel strong about the
>> LogAndThresholdExceptionHandler), one for FailOnExceptionHandler and one
>> for LogAndContinueOnExceptionHandler. And we can set
>> LogAndContinueOnExceptionHandler
>> by default.
>>
> 
> That's what I had originally. Jay mentioned he preferred one default class, 
> with config options.
> So with that approach, you'd have 2 config options, one for failing, one for 
> continuing, and the one
> exception handler would take those options during it's configure() call.
> 
> After checking the other exception handlers in the code, I might revert back 
> to what I originally had (2 default handlers) 
> as Guozhang also re-suggests, but still have the interface extend 
> Configurable. Guozhang, you ok with that? In that case
> there is no need for the response config option.
> 
> Thanks
> Eno
> 
> 
>>
>> Guozhang
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Jun 21, 2017 at 1:39 AM, Eno Thereska <eno.there...@gmail.com 
>> <mailto:eno.there...@gmail.com>>
>> wrote:
>>
>>> Thanks Guozhang,
>>>
>>> I’ve updated the KIP and hopefully addressed all the comments so far. In
>>> the process also changed the name of the KIP to reflect its scope better:
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ 
>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+>
>>> deserialization+exception+handlers <https://cwiki.apache.org/ 
>>> <https://cwiki.apache.org/>
>>> confluence/display/KAFKA/KIP-161:+streams+deserialization+
>>> exception+handlers>
>>>
>>> Any other feedback appreciated, otherwise I’ll start the vote soon.
>>>
>>> Thanks
>>> Eno
>>>
>>>> On Jun 12, 2017, at 6:28 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>>>>
>>>> Eno, Thanks for bringing this proposal up and sorry for getting late on
>>>> this. Here are my two cents:
>>>>
>>>> 1. First some meta comments regarding "fail fast" v.s. "making
>>> progress". I
>>>> agree that in general we should better "enforce user to do the right
>>> thing"
>>>> in system design, but we also need to keep in mind that Kafka is a
>>>> multi-tenant system, i.e. from a Streams app's pov you probably would not
>>>> control the whole streaming processing pipeline end-to-end. E.g. Your
>>> input
>>>> data may not be controlled by yourself; it could be written by another
>>> app,
>>>> or another team in your company, or even a different organization, and if
>>>> an error happens maybe you cannot fix "to do the right thing" just by
>>>> yourself in time. In such an environment I think it is important to leave
>>>> the door open to let users be more resilient. So I find the current
>>>> proposal which does leave the door open for either fail-fast or make
>>>> progress quite reasonable.
>>>>
>>>> 2. On the other hand, if the question is whether we should provide a
>>>> built-in "send to bad queue" handler from the library, I think that might
>>>> be an overkill: with some tweaks (see my detailed comments below) on the
>>>> API we can allow users to implement such handlers pretty easily. In
>>> fact, I
>>>> feel even "LogAndThresholdExceptionHandler" is not necessary as a
>>> built-in
>>>> handler, as it would then require users to specify the threshold via
>>>> configs, etc. I think letting people provide such "eco-libraries" may be
>>>> better.
>>>>
>>>> 3. Regarding the CRC error: today we validate CRC on both the broker end
>>>> upon receiving produce requests and on consumer end upon receiving fetch
>>>> responses; and if the CRC validation fails in the former case it would
>>> not
>>>> be appended to the broker logs. So if we do see a CRC failure on the
>>>> consumer side it has to be that either we have a flipped bit on the
>>> broker
>>>> disks or over the wire. For the first case it is fatal while for the
>>> second
>>>> it is retriable. Unfortunately we cannot tell which case it is when
>>> seeing
>>>> CRC validation failures. But in either case, just skipping and making
>>>> progress seems not a good choice here, and hence I would personally
>>> exclude
>>>> these errors from the general serde errors to NOT leave the door open of
>>>> making progress.
>>>>
>>>> Currently such errors are thrown as KafkaException that wraps an
>>>> InvalidRecordException, which may be too general and we could consider
>>> just
>>>> throwing the InvalidRecordException directly. But that could be an
>>>> orthogonal discussion if we agrees that CRC failures should not be
>>>> considered in this KIP.
>>>>
>>>> ----------------
>>>>
>>>> Now some detailed comments:
>>>>
>>>> 4. Could we consider adding the processor context in the handle()
>>> function
>>>> as well? This context will be wrapping as the source node that is about
>>> to
>>>> process the record. This could expose more info like which task / source
>>>> node sees this error, which timestamp of the message, etc, and also can
>>>> allow users to implement their handlers by exposing some metrics, by
>>>> calling context.forward() to implement the "send to bad queue" behavior
>>> etc.
>>>>
>>>> 5. Could you add the string name of
>>>> StreamsConfig.DEFAULT_RECORD_EXCEPTION_HANDLER as well in the KIP?
>>>> Personally I find "default" prefix a bit misleading since we do not allow
>>>> users to override it per-node yet. But I'm okay either way as I can see
>>> we
>>>> may extend it in the future and probably would like to not rename the
>>>> config again. Also from the experience of `default partitioner` and
>>>> `default timestamp extractor` we may also make sure that the passed in
>>>> object can be either a string "class name" or a class object?
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Wed, Jun 7, 2017 at 2:16 PM, Jan Filipiak <jan.filip...@trivago.com>
>>>> wrote:
>>>>
>>>>> Hi Eno,
>>>>>
>>>>> On 07.06.2017 22:49, Eno Thereska wrote:
>>>>>
>>>>>> Comments inline:
>>>>>>
>>>>>> On 5 Jun 2017, at 18:19, Jan Filipiak <jan.filip...@trivago.com>
>>> wrote:
>>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> just my few thoughts
>>>>>>>
>>>>>>> On 05.06.2017 11:44, Eno Thereska wrote:
>>>>>>>
>>>>>>>> Hi there,
>>>>>>>>
>>>>>>>> Sorry for the late reply, I was out this past week. Looks like good
>>>>>>>> progress was made with the discussions either way. Let me recap a
>>> couple of
>>>>>>>> points I saw into one big reply:
>>>>>>>>
>>>>>>>> 1. Jan mentioned CRC errors. I think this is a good point. As these
>>>>>>>> happen in Kafka, before Kafka Streams gets a chance to inspect
>>> anything,
>>>>>>>> I'd like to hear the opinion of more Kafka folks like Ismael or
>>> Jason on
>>>>>>>> this one. Currently the documentation is not great with what to do
>>> once a
>>>>>>>> CRC check has failed. From looking at the code, it looks like the
>>> client
>>>>>>>> gets a KafkaException (bubbled up from the fetcher) and currently we
>>> in
>>>>>>>> streams catch this as part of poll() and fail. It might be
>>> advantageous to
>>>>>>>> treat CRC handling in a similar way to serialisation handling (e.g.,
>>> have
>>>>>>>> the option to fail/skip). Let's see what the other folks say.
>>> Worst-case we
>>>>>>>> can do a separate KIP for that if it proved too hard to do in one go.
>>>>>>>>
>>>>>>> there is no reasonable way to "skip" a crc error. How can you know the
>>>>>>> length you read was anything reasonable? you might be completely lost
>>>>>>> inside your response.
>>>>>>>
>>>>>> On the client side, every record received is checked for validity. As
>>> it
>>>>>> happens, if the CRC check fails the exception is wrapped with a
>>>>>> KafkaException that is thrown all the way to poll(). Assuming we change
>>>>>> that and poll() throws a CRC exception, I was thinking we could treat
>>> it
>>>>>> similarly to a deserialize exception and pass it to the exception
>>> handler
>>>>>> to decide what to do. Default would be to fail. This might need a
>>> Kafka KIP
>>>>>> btw and can be done separately from this KIP, but Jan, would you find
>>> this
>>>>>> useful?
>>>>>>
>>>>> I don't think so. IMO you can not reasonably continue parsing when the
>>>>> checksum of a message is not correct. If you are not sure you got the
>>>>> correct length, how can you be sure to find the next record? I would
>>> always
>>>>> straight fail in all cases. Its to hard for me to understand why one
>>> would
>>>>> try to continue. I mentioned CRC's because thats the only bad pills I
>>> ever
>>>>> saw so far. But I am happy that it just stopped and I could check what
>>> was
>>>>> going on. This will also be invasive in the client code then.
>>>>>
>>>>> If you ask me, I am always going to vote for "grind to halt" let the
>>>>> developers see what happened and let them fix it. It helps building good
>>>>> kafka experiences and better software and architectures. For me this is:
>>>>> "force the user todo the right thing". https://youtu.be/aAb7hSCtvGw?
>>> t=374
>>>>> eg. not letting unexpected input slip by.  Letting unexpected input
>>> slip by
>>>>> is what bought us 15+years of war of all sorts of ingestion attacks. I
>>>>> don't even dare to estimate how many missingrecords-search-teams going
>>> be
>>>>> formed, maybe some hackerone for stream apps :D
>>>>>
>>>>> Best Jan
>>>>>
>>>>>
>>>>>>
>>>>>>>> At a minimum, handling this type of exception will need to involve
>>> the
>>>>>>>> exactly-once (EoS) logic. We'd still allow the option of failing or
>>>>>>>> skipping, but EoS would need to clean up by rolling back all the side
>>>>>>>> effects from the processing so far. Matthias, how does this sound?
>>>>>>>>
>>>>>>> Eos will not help the record might be 5,6 repartitions down into the
>>>>>>> topology. I haven't followed but I pray you made EoS optional! We
>>> don't
>>>>>>> need this and we don't want this and we will turn it off if it comes.
>>> So I
>>>>>>> wouldn't recommend relying on it. The option to turn it off is better
>>> than
>>>>>>> forcing it and still beeing unable to rollback badpills (as explained
>>>>>>> before)
>>>>>>>
>>>>>> Yeah as Matthias mentioned EoS is optional.
>>>>>>
>>>>>> Thanks,
>>>>>> Eno
>>>>>>
>>>>>>
>>>>>> 6. Will add an end-to-end example as Michael suggested.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Eno
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>>>>>>>
>>>>>>>>> What I don't understand is this:
>>>>>>>>>
>>>>>>>>> From there on its the easiest way forward: fix, redeploy, start =>
>>>>>>>>>> done
>>>>>>>>>>
>>>>>>>>> If you have many producers that work fine and a new "bad" producer
>>>>>>>>> starts up and writes bad data into your input topic, your Streams
>>> app
>>>>>>>>> dies but all your producers, including the bad one, keep writing.
>>>>>>>>>
>>>>>>>>> Thus, how would you fix this, as you cannot "remove" the corrupted
>>> date
>>>>>>>>> from the topic? It might take some time to identify the root cause
>>> and
>>>>>>>>> stop the bad producer. Up to this point you get good and bad data
>>> into
>>>>>>>>> your Streams input topic. If Streams app in not able to skip over
>>> those
>>>>>>>>> bad records, how would you get all the good data from the topic? Not
>>>>>>>>> saying it's not possible, but it's extra work copying the data with
>>> a
>>>>>>>>> new non-Streams consumer-producer-app into a new topic and than feed
>>>>>>>>> your Streams app from this new topic -- you also need to update all
>>>>>>>>> your
>>>>>>>>> upstream producers to write to the new topic.
>>>>>>>>>
>>>>>>>>> Thus, if you want to fail fast, you can still do this. And after you
>>>>>>>>> detected and fixed the bad producer you might just reconfigure your
>>> app
>>>>>>>>> to skip bad records until it reaches the good part of the data.
>>>>>>>>> Afterwards, you could redeploy with fail-fast again.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thus, for this pattern, I actually don't see any reason why to stop
>>> the
>>>>>>>>> Streams app at all. If you have a callback, and use the callback to
>>>>>>>>> raise an alert (and maybe get the bad data into a bad record
>>> queue), it
>>>>>>>>> will not take longer to identify and stop the "bad" producer. But
>>> for
>>>>>>>>> this case, you have zero downtime for your Streams app.
>>>>>>>>>
>>>>>>>>> This seems to be much simpler. Or do I miss anything?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Having said this, I agree that the "threshold based callback" might
>>> be
>>>>>>>>> questionable. But as you argue for strict "fail-fast", I want to
>>> argue
>>>>>>>>> that this must not always be the best pattern to apply and that the
>>>>>>>>> overall KIP idea is super useful from my point of view.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 6/3/17 11:57 AM, Jan Filipiak wrote:
>>>>>>>>>
>>>>>>>>>> Could not agree more!
>>>>>>>>>>
>>>>>>>>>> But then I think the easiest is still: print exception and die.
>>>>>>>>>> From there on its the easiest way forward: fix, redeploy, start =>
>>>>>>>>>> done
>>>>>>>>>>
>>>>>>>>>> All the other ways to recover a pipeline that was processing
>>> partially
>>>>>>>>>> all the time
>>>>>>>>>> and suddenly went over a "I cant take it anymore" threshold is not
>>>>>>>>>> straight forward IMO.
>>>>>>>>>>
>>>>>>>>>> How to find the offset, when it became to bad when it is not the
>>>>>>>>>> latest
>>>>>>>>>> commited one?
>>>>>>>>>> How to reset there? with some reasonable stuff in your rockses?
>>>>>>>>>>
>>>>>>>>>> If one would do the following. The continuing Handler would measure
>>>>>>>>>> for
>>>>>>>>>> a threshold and
>>>>>>>>>> would terminate after a certain threshold has passed (per task).
>>> Then
>>>>>>>>>> one can use offset commit/ flush intervals
>>>>>>>>>> to make reasonable assumption of how much is slipping by + you get
>>> an
>>>>>>>>>> easy recovery when it gets to bad
>>>>>>>>>> + you could also account for "in processing" records.
>>>>>>>>>>
>>>>>>>>>> Setting this threshold to zero would cover all cases with 1
>>>>>>>>>> implementation. It is still beneficial to have it pluggable
>>>>>>>>>>
>>>>>>>>>> Again CRC-Errors are the only bad pills we saw in production for
>>> now.
>>>>>>>>>>
>>>>>>>>>> Best Jan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 02.06.2017 17:37, Jay Kreps wrote:
>>>>>>>>>>
>>>>>>>>>>> Jan, I agree with you philosophically. I think one practical
>>>>>>>>>>> challenge
>>>>>>>>>>> has
>>>>>>>>>>> to do with data formats. Many people use untyped events, so there
>>> is
>>>>>>>>>>> simply
>>>>>>>>>>> no guarantee on the form of the input. E.g. many companies use
>>> JSON
>>>>>>>>>>> without
>>>>>>>>>>> any kind of schema so it becomes very hard to assert anything
>>> about
>>>>>>>>>>> the
>>>>>>>>>>> input which makes these programs very fragile to the "one
>>> accidental
>>>>>>>>>>> message publication that creates an unsolvable problem.
>>>>>>>>>>>
>>>>>>>>>>> For that reason I do wonder if limiting to just serialization
>>>>>>>>>>> actually
>>>>>>>>>>> gets
>>>>>>>>>>> you a useful solution. For JSON it will help with the problem of
>>>>>>>>>>> non-parseable JSON, but sounds like it won't help in the case
>>> where
>>>>>>>>>>> the
>>>>>>>>>>> JSON is well-formed but does not have any of the fields you expect
>>>>>>>>>>> and
>>>>>>>>>>> depend on for your processing. I expect the reason for limiting
>>> the
>>>>>>>>>>> scope
>>>>>>>>>>> is it is pretty hard to reason about correctness for anything that
>>>>>>>>>>> stops in
>>>>>>>>>>> the middle of processing an operator DAG?
>>>>>>>>>>>
>>>>>>>>>>> -Jay
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak <
>>>>>>>>>>> jan.filip...@trivago.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> IMHO your doing it wrong then. + building to much support into the
>>>>>>>>>>>> kafka
>>>>>>>>>>>> eco system is very counterproductive in fostering a happy
>>> userbase
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 02.06.2017 13:15, Damian Guy wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Jan, you have a choice to Fail fast if you want. This is about
>>>>>>>>>>>>> giving
>>>>>>>>>>>>> people options and there are times when you don't want to fail
>>>>>>>>>>>>> fast.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <
>>> jan.filip...@trivago.com
>>>>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi
>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1.
>>>>>>>>>>>>>> That greatly complicates monitoring.  Fail Fast gives you that
>>>>>>>>>>>>>> when
>>>>>>>>>>>>>> you
>>>>>>>>>>>>>> monitor only the lag of all your apps
>>>>>>>>>>>>>> you are completely covered. With that sort of new application
>>>>>>>>>>>>>> Monitoring
>>>>>>>>>>>>>> is very much more complicated as
>>>>>>>>>>>>>> you know need to monitor fail % of some special apps aswell.
>>> In my
>>>>>>>>>>>>>> opinion that is a huge downside already.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2.
>>>>>>>>>>>>>> using a schema regerstry like Avrostuff it might not even be
>>> the
>>>>>>>>>>>>>> record
>>>>>>>>>>>>>> that is broken, it might be just your app
>>>>>>>>>>>>>> unable to fetch a schema it needs now know. Maybe you got
>>>>>>>>>>>>>> partitioned
>>>>>>>>>>>>>> away from that registry.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 3. When you get alerted because of to high fail percentage.
>>> what
>>>>>>>>>>>>>> are the
>>>>>>>>>>>>>> steps you gonna do?
>>>>>>>>>>>>>> shut it down to buy time. fix the problem. spend way to much
>>> time
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> find a good reprocess offset.
>>>>>>>>>>>>>> Your timewindows are in bad shape anyways, and you pretty much
>>>>>>>>>>>>>> lost.
>>>>>>>>>>>>>> This routine is nonsense.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Dead letter queues would be the worst possible addition to the
>>>>>>>>>>>>>> kafka
>>>>>>>>>>>>>> toolkit that I can think of. It just doesn't fit the
>>> architecture
>>>>>>>>>>>>>> of having clients falling behind is a valid option.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Further. I mentioned already the only bad pill ive seen so far
>>> is
>>>>>>>>>>>>>> crc
>>>>>>>>>>>>>> errors. any plans for those?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 02.06.2017 11:34, Damian Guy wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I agree with what Matthias has said w.r.t failing fast. There
>>> are
>>>>>>>>>>>>>>> plenty
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> times when you don't want to fail-fast and must attempt to
>>> make
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> progress.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The dead-letter queue is exactly for these circumstances. Of
>>>>>>>>>>>>>>> course if
>>>>>>>>>>>>>>> every record is failing, then you probably do want to give up.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <
>>>>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> First a meta comment. KIP discussion should take place on the
>>> dev
>>>>>>>>>>>>>>> list
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -- if user list is cc'ed please make sure to reply to both
>>>>>>>>>>>>>>>> lists.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot of
>>>>>>>>>>>>>>> sense to
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> focus on deserialization exceptions for now.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> With regard to corrupted state stores, would it make sense to
>>>>>>>>>>>>>>>> fail a
>>>>>>>>>>>>>>>> task and wipe out the store to repair it via recreation from
>>> the
>>>>>>>>>>>>>>>> changelog? That's of course a quite advance pattern, but I
>>> want
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> bring
>>>>>>>>>>>>>>>> it up to design the first step in a way such that we can get
>>>>>>>>>>>>>>>> there (if
>>>>>>>>>>>>>>>> we think it's a reasonable idea).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I also want to comment about fail fast vs making progress. I
>>>>>>>>>>>>>>>> think that
>>>>>>>>>>>>>>>> fail-fast must not always be the best option. The scenario I
>>>>>>>>>>>>>>>> have in
>>>>>>>>>>>>>>>> mind is like this: you got a bunch of producers that feed the
>>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>> input topic. Most producers work find, but maybe one producer
>>>>>>>>>>>>>>>> miss
>>>>>>>>>>>>>>>> behaves and the data it writes is corrupted. You might not
>>> even
>>>>>>>>>>>>>>>> be able
>>>>>>>>>>>>>>>> to recover this lost data at any point -- thus, there is no
>>>>>>>>>>>>>>>> reason to
>>>>>>>>>>>>>>>> stop processing but you just skip over those records. Of
>>>>>>>>>>>>>>>> course, you
>>>>>>>>>>>>>>>> need to fix the root cause, and thus you need to alert
>>> (either
>>>>>>>>>>>>>>>> via logs
>>>>>>>>>>>>>>>> of the exception handler directly) and you need to start to
>>>>>>>>>>>>>>>> investigate
>>>>>>>>>>>>>>>> to find the bad producer, shut it down and fix it.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Here the dead letter queue comes into place. From my
>>>>>>>>>>>>>>>> understanding, the
>>>>>>>>>>>>>>>> purpose of this feature is solely enable post debugging. I
>>> don't
>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>> those record would be fed back at any point in time (so I
>>> don't
>>>>>>>>>>>>>>>> see any
>>>>>>>>>>>>>>>> ordering issue -- a skipped record, with this regard, is just
>>>>>>>>>>>>>>>> "fully
>>>>>>>>>>>>>>>> processed"). Thus, the dead letter queue should actually
>>> encode
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> original records metadata (topic, partition offset etc) to
>>>>>>>>>>>>>>>> enable
>>>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>>> debugging. I guess, this might also be possible if you just
>>> log
>>>>>>>>>>>>>>>> the bad
>>>>>>>>>>>>>>>> records, but it would be harder to access (you first must
>>> find
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> Streams instance that did write the log and extract the
>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>> there). Reading it from topic is much simpler.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I also want to mention the following. Assume you have such a
>>>>>>>>>>>>>>>> topic with
>>>>>>>>>>>>>>>> some bad records and some good records. If we always
>>> fail-fast,
>>>>>>>>>>>>>>>> it's
>>>>>>>>>>>>>>>> going to be super hard to process the good data. You would
>>> need
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>> an extra app that copied the data into a new topic filtering
>>>>>>>>>>>>>>>> out the
>>>>>>>>>>>>>>>> bad
>>>>>>>>>>>>>>>> records (or apply the map() workaround withing stream). So I
>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>> that failing fast is most likely the best option in
>>> production
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> necessarily, true.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Or do you think there are scenarios, for which you can
>>> recover
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> corrupted records successfully? And even if this is
>>> possible, it
>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>> be a case for reprocessing instead of failing the whole
>>>>>>>>>>>>>>>> application?
>>>>>>>>>>>>>>>> Also, if you think you can "repair" a corrupted record,
>>> should
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> handler allow to return a "fixed" record? This would solve
>>> the
>>>>>>>>>>>>>>>> ordering
>>>>>>>>>>>>>>>> problem.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> - I think it would help to improve the KIP by adding an
>>>>>>>>>>>>>>>>> end-to-end
>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>> example that demonstrates, with the DSL and with the
>>> Processor
>>>>>>>>>>>>>>>>> API,
>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> user would write a simple application that would then be
>>>>>>>>>>>>>>>>> augmented
>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> proposed KIP changes to handle exceptions.  It should also
>>>>>>>>>>>>>>>>> become much
>>>>>>>>>>>>>>>>> clearer then that e.g. the KIP would lead to different code
>>>>>>>>>>>>>>>>> paths for
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> happy case and any failure scenarios.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> - Do we have sufficient information available to make
>>> informed
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> decisions
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> what to do next?  For example, do we know in which part of
>>> the
>>>>>>>>>>>>>>>>> topology
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> record failed? `ConsumerRecord` gives us access to topic,
>>>>>>>>>>>>>>>>> partition,
>>>>>>>>>>>>>>>>> offset, timestamp, etc., but what about topology-related
>>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> (e.g.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> what is the associated state store, if any)?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> - Only partly on-topic for the scope of this KIP, but this
>>> is
>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> bigger picture: This KIP would give users the option to send
>>>>>>>>>>>>>>>>> corrupted
>>>>>>>>>>>>>>>>> records to dead letter queue (quarantine topic).  But, what
>>>>>>>>>>>>>>>>> pattern
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> we advocate to process such a dead letter queue then, e.g.
>>> how to
>>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> retries with backoff ("If the first record in the dead letter
>>>>>>>>>>>>>>>>> queue
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> fails
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> again, then try the second record for the time being and go
>>> back
>>>>>>>>>>>>>>> to the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> first record at a later time").  Jay and Jan already alluded
>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ordering
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> problems that will be caused by dead letter queues. As I said,
>>>>>>>>>>>>>>> retries
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> might be out of scope but perhaps the implications should be
>>>>>>>>>>>>>>>>> considered
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> possible?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Also, I wrote the text below before reaching the point in
>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> conversation
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> that this KIP's scope will be limited to exceptions in the
>>>>>>>>>>>>>>>>> category of
>>>>>>>>>>>>>>>>> poison pills / deserialization errors.  But since Jay
>>> brought
>>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> errors again, I decided to include it again.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> ----------------------------snip--------------------------
>>> --
>>>>>>>>>>>>>>>>> A meta comment: I am not sure about this split between the
>>>>>>>>>>>>>>>>> code for
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure
>>>>>>>>>>>>>>>>> path
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> (using
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> exception handlers).  In Scala, for example, we can do:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      scala> val computation = scala.util.Try(1 / 0)
>>>>>>>>>>>>>>>>>      computation: scala.util.Try[Int] =
>>>>>>>>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>      scala> computation.getOrElse(42)
>>>>>>>>>>>>>>>>>      res2: Int = 42
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Another example with Scala's pattern matching, which is
>>>>>>>>>>>>>>>>> similar to
>>>>>>>>>>>>>>>>> `KStream#branch()`:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>      computation match {
>>>>>>>>>>>>>>>>>        case scala.util.Success(x) => x * 5
>>>>>>>>>>>>>>>>>        case scala.util.Failure(_) => 42
>>>>>>>>>>>>>>>>>      }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> (The above isn't the most idiomatic way to handle this in
>>>>>>>>>>>>>>>>> Scala,
>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> that's
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> not the point I'm trying to make here.)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hence the question I'm raising here is: Do we want to have
>>> an
>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> code "the happy path", and then have a different code path
>>> for
>>>>>>>>>>>>>>>>> failures
>>>>>>>>>>>>>>>>> (using exceptions and handlers);  or should we treat both
>>>>>>>>>>>>>>>>> Success and
>>>>>>>>>>>>>>>>> Failure in the same way?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I think the failure/exception handling approach (as
>>> proposed in
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> KIP)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> is well-suited for errors in the category of deserialization
>>>>>>>>>>>>>>> problems
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> aka
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> poison pills, partly because the (default) serdes are defined
>>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> configuration (explicit serdes however are defined through
>>> API
>>>>>>>>>>>>>>>>> calls).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> However, I'm not yet convinced that the failure/exception
>>>>>>>>>>>>>>>>> handling
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> is the best idea for user code exceptions, e.g. if you fail
>>> to
>>>>>>>>>>>>>>>>> guard
>>>>>>>>>>>>>>>>> against NPE in your lambdas or divide a number by zero.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>      scala> val stream = Seq(1, 2, 3, 4, 5)
>>>>>>>>>>>>>>>>>      stream: Seq[Int] = List(1, 2, 3, 4, 5)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>      // Here: Fallback to a sane default when encountering
>>>>>>>>>>>>>>>>> failed
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>      scala>     stream.map(x => Try(1/(3 - x))).flatMap(t =>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Seq(t.getOrElse(42)))
>>>>>>>>>>>>>>>>>      res19: Seq[Int] = List(0, 1, 42, -1, 0)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>      // Here: Skip over failed records
>>>>>>>>>>>>>>>>>      scala> stream.map(x => Try(1/(3 - x))).collect{ case
>>>>>>>>>>>>>>>>> Success(s)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> => s
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      res20: Seq[Int] = List(0, 1, -1, 0)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The above is more natural to me than using error handlers to
>>>>>>>>>>>>>>>>> define
>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> deal with failed records (here, the value `3` causes an
>>>>>>>>>>>>>>>>> arithmetic
>>>>>>>>>>>>>>>>> exception).  Again, it might help the KIP if we added an
>>>>>>>>>>>>>>>>> end-to-end
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> example
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> for such user code errors.
>>>>>>>>>>>>>>>>> ----------------------------snip--------------------------
>>> --
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> jan.filip...@trivago.com>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Jay,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Eno mentioned that he will narrow down the scope to only
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> ConsumerRecord
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> deserialisation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am working with Database Changelogs only. I would really
>>> not
>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> see
>>>>>>>>>>>>>>>>> a dead letter queue or something
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> similliar. how am I expected to get these back in order.
>>> Just
>>>>>>>>>>>>>>>>>> grind
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> hold an call me on the weekend. I'll fix it
>>>>>>>>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead
>>>>>>>>>>>>>>>>>> letters.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> (where
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> reprocessing might be even the faster fix)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>      - I think we should hold off on retries unless we
>>> have
>>>>>>>>>>>>>>>>>> worked
>>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      full usage pattern, people can always implement their
>>>>>>>>>>>>>>>>> own. I
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the idea
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      is that you send the message to some kind of dead
>>>>>>>>>>>>>>>>>>> letter queue
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      replay these later. This obviously destroys all
>>> semantic
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> guarantees
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> we are
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>      working hard to provide right now, which may be okay.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>
>>>
>>
>>
>> -- 
>> -- Guozhang
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to