>> I'm not sure I follow your question, the producer will not automatically >> close when there is fatal error IIUC.
Correct. I found the code example just a little bit confusing, because the produce is only closed in case of an error and only if the exception is fatal. Overall, the code example might be better like this: try (Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) { producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); if(e instanceof IllegalStateException || e instanceof ProducerFencedException || e instanceof UnsupportedVersionException || e instanceof AuthorizationException) { throw e; } } } Now the producer is closed via try-with-resources for all cases. @Xiang: feel free to start a vote thread. -Matthias On 5/5/20 6:23 PM, Boyang Chen wrote: > Hey Matthias, > > I'm not sure I follow your question, the producer will not automatically > close when there is fatal error IIUC. > > Boyang > > On Tue, May 5, 2020 at 6:16 PM 张祥 <xiangzhang1...@gmail.com> wrote: > >> Thanks for the comment Matthias. >> >> In fact, I cannot think of why we cannot close the producer no matter what. >> On the other hand, it is also okay to reuse the producer when the error is >> not fatal. @Guozhang Wang <wangg...@gmail.com> @Boyang Chen >> <boy...@confluent.io> >> >> Matthias J. Sax <mj...@apache.org> 于2020年5月1日周五 上午7:52写道: >> >>> Thanks for the KIP. Make sense to me. I think you can start a vote. >>> >>> One minor comment about the code example: From my understanding, a >>> producer should always be closed (independent if there was no error, a >>> transient error, or a fatal error). If that is correct, than the code >>> example seems to be miss-leading? >>> >>> >>> -Matthias >>> >>> On 4/25/20 6:08 PM, 张祥 wrote: >>>> Sorry, but this KIP is still open to discussion, any comments and ideas >>>> would be appreciated, Thanks. >>>> >>>> 张祥 <xiangzhang1...@gmail.com> 于2020年4月17日周五 下午1:04写道: >>>> >>>>> Guozhang, thanks for the valuable suggestion. >>>>> >>>>> A new part called "suggested coding pattern" has been added and I copy >>> the >>>>> core code here: >>>>> >>>>> try { >>>>> producer.beginTransaction(); >>>>> for (int i = 0; i < 100; i++) >>>>> producer.send(new ProducerRecord<>("my-topic", >>>>> Integer.toString(i), Integer.toString(i))); >>>>> producer.commitTransaction(); >>>>> } catch (Exception e) { >>>>> producer.abortTransaction(); >>>>> if(e instanceof IllegalStateException || >>>>> e instanceof ProducerFencedException || >>>>> e instanceof UnsupportedVersionException || >>>>> e instanceof AuthorizationException || >>>>> e instanceof OutOfOrderSequenceException) { >>>>> producer.close(); >>>>> } >>>>> } >>>>> >>>>> As you can see, in the catch block, all fatal exceptions need to be >>>>> listed, I am not sure I have listed all of them and I wonder if there >>> is a >>>>> better way to do this. >>>>> >>>>> >>>>> Guozhang Wang <wangg...@gmail.com> 于2020年4月17日周五 上午8:50写道: >>>>> >>>>>> Xiang, thanks for the written KIP. I just have one meta comment and >>>>>> otherwise it looks good to me: could you also add a section about >>>>>> suggested >>>>>> coding patterns (especially how try - catch should be implemented) as >>> we >>>>>> discussed on the JIRA to the wiki page as well? >>>>>> >>>>>> And please also note that besides the javadoc of the function, on top >>> of >>>>>> the KafkaProducer class there are also comments regarding example >>> snippet: >>>>>> >>>>>> ``` >>>>>> >>>>>> * <pre> >>>>>> * {@code >>>>>> * Properties props = new Properties(); >>>>>> * props.put("bootstrap.servers", "localhost:9092"); >>>>>> * props.put("transactional.id", "my-transactional-id"); >>>>>> * Producer<String, String> producer = new KafkaProducer<>(props, new >>>>>> StringSerializer(), new StringSerializer()); >>>>>> * >>>>>> * producer.initTransactions(); >>>>>> * >>>>>> * try { >>>>>> * producer.beginTransaction(); >>>>>> * for (int i = 0; i < 100; i++) >>>>>> * producer.send(new ProducerRecord<>("my-topic", >>>>>> Integer.toString(i), Integer.toString(i))); >>>>>> * producer.commitTransaction(); >>>>>> * } catch (ProducerFencedException | OutOfOrderSequenceException | >>>>>> AuthorizationException e) { >>>>>> * // We can't recover from these exceptions, so our only option >> is >>>>>> to close the producer and exit. >>>>>> * producer.close(); >>>>>> * } catch (KafkaException e) { >>>>>> * // For all other exceptions, just abort the transaction and try >>>>>> again. >>>>>> * producer.abortTransaction(); >>>>>> * } >>>>>> * producer.close(); >>>>>> >>>>>> * } </pre> >>>>>> ``` >>>>>> >>>>>> I think with this change we do not need to educate users that they >>> should >>>>>> distinguish the types of exceptions when calling `abortTxn`, instead >>> they >>>>>> only need to depend on the exception to decide whether to `close` the >>>>>> producer, so the above recommendation could look like: >>>>>> >>>>>> try { >>>>>> >>>>>> } catch {Exception e} { >>>>>> >>>>>> producer.abortTxn; >>>>>> >>>>>> if (e instanceof /*fatal exceptions*/) { >>>>>> producer.close(); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> Guozhang >>>>>> >>>>>> On Thu, Apr 16, 2020 at 12:14 AM 张祥 <xiangzhang1...@gmail.com> >> wrote: >>>>>> >>>>>>> Thanks for the structure change Boyang. And I agree with you on the >>> weak >>>>>>> proposal part, I have adjusted it according to your suggestion. >> Thanks >>>>>>> again! >>>>>>> >>>>>>> Boyang Chen <reluctanthero...@gmail.com> 于2020年4月16日周四 下午2:39写道: >>>>>>> >>>>>>>> Thanks for the KIP Xiang! >>>>>>>> >>>>>>>> I think the motivation looks good, and I just did a slight >> structure >>>>>>> change >>>>>>>> to separate "Proposed Changes" and "Public Interfaces", hope you >>> don't >>>>>>>> mind. >>>>>>>> >>>>>>>> However, "we can determine whether the producer client is already >> in >>>>>>> error >>>>>>>> state in abortTransaction" sounds a bit weak about the actual >>>>>> proposal, >>>>>>>> instead we could propose something as "we would remember whether a >>>>>> fatal >>>>>>>> exception has already been thrown to the application level, so that >>> in >>>>>>>> abort transaction we will not throw again, thus making the function >>>>>> safe >>>>>>> to >>>>>>>> be called in an error state". >>>>>>>> >>>>>>>> Other than that, I think the KIP is in pretty good shape. >>>>>>>> >>>>>>>> Boyang >>>>>>>> >>>>>>>> On Wed, Apr 15, 2020 at 7:07 PM 张祥 <xiangzhang1...@gmail.com> >> wrote: >>>>>>>> >>>>>>>>> Hi everyone, >>>>>>>>> >>>>>>>>> I have opened a small KIP about safely aborting transaction during >>>>>>>>> shutdown. I'd like to use this thread to discuss about it and any >>>>>>>> feedback >>>>>>>>> is appreciated (sorry for earlier KIP number mistake). Here is a >>>>>> link >>>>>>> to >>>>>>>>> KIP-596 : >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-596%3A+Safely+abort+Producer+transactions+during+application+shutdown >>>>>>>>> >>>>>>>>> Thank you! >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>>> >>>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature