Thanks for picking up this KIP. My personal take about repartition topics is, that it seems to be ok to apply the handler for those, too (in addition to output topics). It seems to be more flexible and also simplifies the code. In the end, the topic name is passed via `ProducerRecord` into the handler and thus users can decide on a per-topic basis what to do.
About stores and changelogs: yes, serialization happens first. Hence, when we put() into RocksDB and also send() to the changelog topic (in that case we use `ByteArraySerializer`) no serialization error should happen (if there would have been a problem, it would have happened earlier). However, in KIP-210, we did not consider the case that a send() might fail for a changelog topic while the put() into the store was already successfully applies. Hence, it's possible atm, to skip a failed write into the changelog topic, even if the put() into the store was successful. This seems to be a bug to me and we might want to create a separate Jira for it -- it's related to this KIP but should not be mangled into the KIP IMHO. For the KIP itself, what we _could_ do is, to apply the handler to the serialization that happens before the data is put() into the store. However, I am not sure if we should allow this -- atm, I tend to think we should not allow it and exclude store serialization for the handler. -Matthias On 9/10/19 1:46 AM, Alaa Zbair wrote: > Hi, > > I have checked the KIP-399 and the discussion and also KIP-210. > > So the question we need to answer is whether it's okay to also skip > writing the record in the internal topics, the current implementation of > 'ProductionExceptionHandler' is applied for all topics and if we decided > to keep it that way, how to ensure that there will be no divergence in > local store and changelog topic ? > > I would like to get input from others on what they think about this. > > There is a point that I don't understand which is: why in case of a > serialization error do we have the choice of either skipping it or > putting in the store ? shouldn't the record be correctly serialized > before putting it into the store ? > > On 13/12/2018 14:13, Matthias J. Sax wrote: >> For store updates, records are first serialized and afterwards put into >> the store and written into the changelog topic. >> >> In the current implementation, if the send() into the changelog topic >> produces an error and the handler skips over it, the local store content >> and the changelog topic diverge. This seems to be a correctness issue. >> >> For serialization error, it would not happen that store and changelog >> diverge, because serialization happens before and put/send. Thus, with >> this KIP we could skip both put() and send(). However, I am still >> wondering, if it would be ok to skip a store update for this case? (Btw: >> the current PR does not address this atm, and a serialization error for >> a store write would not be covered but kill the instance). >> >> IIRC, the original idea of the KIP was to allow skipping over record for >> output topics only. That's why I am wondering if it's ok to allow >> skipper over record in repartitions topics, too. >> >> In the end, it's some data loss for all 3 cases, so maybe it's ok to >> allow skipping for all 3 cases. However, we should not allow that local >> store and changelog topic diverge IMHO (what might been an orthogonal >> bug thought). >> >> I also don't have an answer or preference. Just think, it's important to >> touch on those cases and get input how people think about it. >> >> >> -Matthias >> >> >> >> On 12/11/18 11:43 AM, Kamal Chandraprakash wrote: >>> Matthias, >>> >>> For changelog topics, I think it does not make sense to allow skipping >>> records if serialization fails? For internal repartitions topics, I am >>> not sure if we should allow it or not. Would you agree with this? We >>> should discuss the implication to derive a sound design. >>> >>> Can you explain the issue that happens when records are skipped to >>> changelog / internal-repartition topics ? So, that I can look into it. >>> >>> On Fri, Dec 7, 2018 at 12:07 AM Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>>>> To accept different types of records from multiple topologies, I >>>>>> have to >>>>>> define the ProducerRecord without generics. >>>> Yes. It does make sense. My point was, that the KIP should >>>> mention/explain this explicitly to allow other not familiar with the >>>> code base to understand it more easily :) >>>> >>>> >>>> >>>> About `ClassCastException`: seems to be an implementation detail. No >>>> need to make it part of the KIP discussion. >>>> >>>> >>>> >>>> One more thing that came to my mind. We use the `RecordCollector` to >>>> write into all topics, ie, user output topics and internal repartition >>>> and changelog topics. >>>> >>>> For changelog topics, I think it does not make sense to allow skipping >>>> records if serialization fails? For internal repartitions topics, I am >>>> not sure if we should allow it or not. Would you agree with this? We >>>> should discuss the implication to derive a sound design. >>>> >>>> I was also just double checking the code, and it seems that the current >>>> `ProductionExceptionHandler` is applied for all topics. This seems >>>> to be >>>> incorrect to me. Seems we missed this case when doing KIP-210? (Or did >>>> we discuss this and I cannot remember? Might be worth to double check.) >>>> >>>> Last thought: of course, the handler will know which topic is affected >>>> and can provide a corresponding implementation. Was just wondering >>>> if we >>>> should be more strict? >>>> >>>> >>>> -Matthias >>>> >>>> On 12/6/18 10:01 AM, Kamal Chandraprakash wrote: >>>>> Matt, >>>>> I agree with Matthias on not to altering the serializer as >>>>> it's used >>>> by >>>>> multiple components. >>>>> >>>>> Matthias, >>>>> >>>>> - the proposed method accepts a `ProducerRecord` -- it might be >>>>> good to >>>>> explain why this cannot be done in a type safe way (ie, missing >>>>> generics) >>>>> >>>>> To accept different types of records from multiple topologies, I >>>>> have to >>>>> define the ProducerRecord without generics. >>>>> >>>>> - `AlwaysProductionExceptionHandler` -> >>>>> `AlwaysContinueProductionExceptionHandler` >>>>> >>>>> Updated the typo error in KIP. >>>>> >>>>> - `DefaultProductionExceptionHandler` is not mentioned >>>>> >>>>> The `handleSerializationException` method in the >>>>> `ProductionExceptionHandler` interface will have default >>>>> implementation >>>>> that is set to FAIL by default. >>>>> This is done to avoid any changes in the user implementation. So, I >>>> didn't >>>>> mentioned the `DefaultProductionExceptionHandler` class. Updated >>>>> the KIP. >>>>> >>>>> - Why do you distinguish between `ClassCastException` and "any other >>>>> unchecked exception? Both second case seems to include the first one? >>>>> >>>>> In SinkNode.java#93 >>>>> < >>>> https://github.com/apache/kafka/blob/87cc31c4e7ea36e7e832a1d02d71480a91a75293/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java#L93 >>>> >>>>> on >>>>> hitting `ClassCastException`, we are halting the streams as it's a >>>>> fatal >>>>> error. >>>>> To keep the original behavior, I've to distinguish the exceptions. >>>>> >>>>> >>>>> On Thu, Dec 6, 2018 at 10:44 PM Matthias J. Sax >>>>> <matth...@confluent.io> >>>>> wrote: >>>>> >>>>>> Well, that's exactly the point. The serializer should not be altered >>>>>> IMHO because this would have impact on other components. Also, for >>>>>> applications that use KafkaProducer directly, they can catch any >>>>>> serialization exception and react to it. Hence, I don't don't see a >>>>>> reason to change the serializer interface. >>>>>> >>>>>> Instead, it seems better to solve this issue in Streams by >>>>>> allowing to >>>>>> skip over a record for this case. >>>>>> >>>>>> Some more comments on the KIP: >>>>>> >>>>>> - the proposed method accepts a `ProducerRecord` -- it might be >>>>>> good to >>>>>> explain why this cannot be done in a type safe way (ie, missing >>>> generics) >>>>>> - `AlwaysProductionExceptionHandler` -> >>>>>> `AlwaysContinueProductionExceptionHandler` >>>>>> >>>>>> - `DefaultProductionExceptionHandler` is not mentioned >>>>>> >>>>>> - Why do you distinguish between `ClassCastException` and "any >>>>>> other >>>>>> unchecked exception? Both second case seems to include the first one? >>>>>> >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 12/6/18 8:35 AM, Matt Farmer wrote: >>>>>>> Ah, good point. >>>>>>> >>>>>>> Should we consider altering the serializer interface to permit not >>>>>> sending >>>>>>> the record? >>>>>>> >>>>>>> On Wed, Dec 5, 2018 at 9:23 PM Kamal Chandraprakash < >>>>>>> kamal.chandraprak...@gmail.com> wrote: >>>>>>> >>>>>>>> Matt, >>>>>>>> >>>>>>>> That's a good point. If these cases are handled in the >>>>>>>> serializer, >>>>>> then >>>>>>>> one cannot continue the stream processing by skipping the record. >>>>>>>> To continue, you may have to send a empty record serialized >>>>>>>> key/value >>>>>> (new >>>>>>>> byte[0]) to the downstream on hitting the error which may cause >>>>>> un-intended >>>>>>>> results. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Dec 5, 2018 at 8:41 PM Matt Farmer <m...@frmr.me> wrote: >>>>>>>> >>>>>>>>> Hi there, >>>>>>>>> >>>>>>>>> Thanks for this KIP. >>>>>>>>> >>>>>>>>> What’s the thinking behind doing this in >>>>>>>>> ProductionExceptionHandler >>>>>>>> versus >>>>>>>>> handling these cases in your serializer implementation? >>>>>>>>> >>>>>>>>> On Mon, Dec 3, 2018 at 1:09 AM Kamal Chandraprakash < >>>>>>>>> kamal.chandraprak...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hello dev, >>>>>>>>>> >>>>>>>>>> I hope to initiate the discussion for KIP-399: Extend >>>>>>>>>> ProductionExceptionHandler to cover serialization exceptions. >>>>>>>>>> >>>>>>>>>> KIP: < >>>>>>>>>> >>>>>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions >>>> >>>>>>>>>> JIRA: https://issues.apache.org/jira/browse/KAFKA-7499 >>>>>>>>>> >>>>>>>>>> All feedbacks will be highly appreciated. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Kamal Chandraprakash >>>>>>>>>> >>>>>> >>>> >
signature.asc
Description: OpenPGP digital signature