>> I'm not sure I follow your question, the producer will not automatically
>> close when there is fatal error IIUC.

Correct. I found the code example just a little bit confusing, because
the produce is only closed in case of an error and only if the exception
is fatal.

Overall, the code example might be better like this:



try (Producer<String, String> producer = new KafkaProducer<>(props, new
StringSerializer(), new StringSerializer())) {

  producer.initTransactions();

  try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
      producer.send(new ProducerRecord<>("my-topic",
Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
  } catch (Exception e) {
    producer.abortTransaction();
    if(e instanceof IllegalStateException ||
        e instanceof ProducerFencedException ||
        e instanceof UnsupportedVersionException ||
        e instanceof AuthorizationException) {
      throw e;
    }
  }
}



Now the producer is closed via try-with-resources for all cases.



@Xiang: feel free to start a vote thread.


-Matthias

On 5/5/20 6:23 PM, Boyang Chen wrote:
> Hey Matthias,
> 
> I'm not sure I follow your question, the producer will not automatically
> close when there is fatal error IIUC.
> 
> Boyang
> 
> On Tue, May 5, 2020 at 6:16 PM 张祥 <xiangzhang1...@gmail.com> wrote:
> 
>> Thanks for the comment Matthias.
>>
>> In fact, I cannot think of why we cannot close the producer no matter what.
>> On the other hand, it is also okay to reuse the producer when the error is
>> not fatal. @Guozhang Wang <wangg...@gmail.com>  @Boyang Chen
>> <boy...@confluent.io>
>>
>> Matthias J. Sax <mj...@apache.org> 于2020年5月1日周五 上午7:52写道:
>>
>>> Thanks for the KIP. Make sense to me. I think you can start a vote.
>>>
>>> One minor comment about the code example: From my understanding, a
>>> producer should always be closed (independent if there was no error, a
>>> transient error, or a fatal error). If that is correct, than the code
>>> example seems to be miss-leading?
>>>
>>>
>>> -Matthias
>>>
>>> On 4/25/20 6:08 PM, 张祥 wrote:
>>>> Sorry, but this KIP is still open to discussion, any comments and ideas
>>>> would be appreciated, Thanks.
>>>>
>>>> 张祥 <xiangzhang1...@gmail.com> 于2020年4月17日周五 下午1:04写道:
>>>>
>>>>> Guozhang, thanks for the valuable suggestion.
>>>>>
>>>>> A new part called "suggested coding pattern" has been added and I copy
>>> the
>>>>> core code here:
>>>>>
>>>>> try {
>>>>>     producer.beginTransaction();
>>>>>     for (int i = 0; i < 100; i++)
>>>>>         producer.send(new ProducerRecord<>("my-topic",
>>>>> Integer.toString(i), Integer.toString(i)));
>>>>>     producer.commitTransaction();
>>>>> } catch (Exception e) {
>>>>>     producer.abortTransaction();
>>>>>     if(e instanceof IllegalStateException ||
>>>>>         e instanceof ProducerFencedException ||
>>>>>         e instanceof UnsupportedVersionException ||
>>>>>         e instanceof AuthorizationException ||
>>>>>         e instanceof OutOfOrderSequenceException) {
>>>>>         producer.close();
>>>>>     }
>>>>> }
>>>>>
>>>>> As you can see, in the catch block,  all fatal exceptions need to be
>>>>> listed, I am not sure I have listed all of them and I wonder if there
>>> is a
>>>>> better way to do this.
>>>>>
>>>>>
>>>>> Guozhang Wang <wangg...@gmail.com> 于2020年4月17日周五 上午8:50写道:
>>>>>
>>>>>> Xiang, thanks for the written KIP. I just have one meta comment and
>>>>>> otherwise it looks good to me: could you also add a section about
>>>>>> suggested
>>>>>> coding patterns (especially how try - catch should be implemented) as
>>> we
>>>>>> discussed on the JIRA to the wiki page as well?
>>>>>>
>>>>>> And please also note that besides the javadoc of the function, on top
>>> of
>>>>>> the KafkaProducer class there are also comments regarding example
>>> snippet:
>>>>>>
>>>>>> ```
>>>>>>
>>>>>> * <pre>
>>>>>> * {@code
>>>>>> * Properties props = new Properties();
>>>>>> * props.put("bootstrap.servers", "localhost:9092");
>>>>>> * props.put("transactional.id", "my-transactional-id");
>>>>>> * Producer<String, String> producer = new KafkaProducer<>(props, new
>>>>>> StringSerializer(), new StringSerializer());
>>>>>> *
>>>>>> * producer.initTransactions();
>>>>>> *
>>>>>> * try {
>>>>>> *     producer.beginTransaction();
>>>>>> *     for (int i = 0; i < 100; i++)
>>>>>> *         producer.send(new ProducerRecord<>("my-topic",
>>>>>> Integer.toString(i), Integer.toString(i)));
>>>>>> *     producer.commitTransaction();
>>>>>> * } catch (ProducerFencedException | OutOfOrderSequenceException |
>>>>>> AuthorizationException e) {
>>>>>> *     // We can't recover from these exceptions, so our only option
>> is
>>>>>> to close the producer and exit.
>>>>>> *     producer.close();
>>>>>> * } catch (KafkaException e) {
>>>>>> *     // For all other exceptions, just abort the transaction and try
>>>>>> again.
>>>>>> *     producer.abortTransaction();
>>>>>> * }
>>>>>> * producer.close();
>>>>>>
>>>>>> * } </pre>
>>>>>> ```
>>>>>>
>>>>>> I think with this change we do not need to educate users that they
>>> should
>>>>>> distinguish the types of exceptions when calling `abortTxn`, instead
>>> they
>>>>>> only need to depend on the exception to decide whether to `close` the
>>>>>> producer, so the above recommendation could look like:
>>>>>>
>>>>>> try {
>>>>>>
>>>>>> } catch {Exception e} {
>>>>>>
>>>>>>     producer.abortTxn;
>>>>>>
>>>>>>     if (e instanceof /*fatal exceptions*/) {
>>>>>>         producer.close();
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>> On Thu, Apr 16, 2020 at 12:14 AM 张祥 <xiangzhang1...@gmail.com>
>> wrote:
>>>>>>
>>>>>>> Thanks for the structure change Boyang. And I agree with you on the
>>> weak
>>>>>>> proposal part, I have adjusted it according to your suggestion.
>> Thanks
>>>>>>> again!
>>>>>>>
>>>>>>> Boyang Chen <reluctanthero...@gmail.com> 于2020年4月16日周四 下午2:39写道:
>>>>>>>
>>>>>>>> Thanks for the KIP Xiang!
>>>>>>>>
>>>>>>>> I think the motivation looks good, and I just did a slight
>> structure
>>>>>>> change
>>>>>>>> to separate "Proposed Changes" and "Public Interfaces", hope you
>>> don't
>>>>>>>> mind.
>>>>>>>>
>>>>>>>> However, "we can determine whether the producer client is already
>> in
>>>>>>> error
>>>>>>>> state in abortTransaction" sounds a bit weak about the actual
>>>>>> proposal,
>>>>>>>> instead we could propose something as "we would remember whether a
>>>>>> fatal
>>>>>>>> exception has already been thrown to the application level, so that
>>> in
>>>>>>>> abort transaction we will not throw again, thus making the function
>>>>>> safe
>>>>>>> to
>>>>>>>> be called in an error state".
>>>>>>>>
>>>>>>>> Other than that, I think the KIP is in pretty good shape.
>>>>>>>>
>>>>>>>> Boyang
>>>>>>>>
>>>>>>>> On Wed, Apr 15, 2020 at 7:07 PM 张祥 <xiangzhang1...@gmail.com>
>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone,
>>>>>>>>>
>>>>>>>>> I have opened a small KIP about safely aborting transaction during
>>>>>>>>> shutdown. I'd like to use this thread to discuss about it and any
>>>>>>>> feedback
>>>>>>>>> is appreciated (sorry for earlier KIP number mistake). Here is a
>>>>>> link
>>>>>>> to
>>>>>>>>> KIP-596 :
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-596%3A+Safely+abort+Producer+transactions+during+application+shutdown
>>>>>>>>>
>>>>>>>>> Thank you!
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to