Hi Sophie, Thank you very much. Will do a PR. Please allow some extra time, as I'd like to test the change first. Not going overboard. I have to test the real thing anyway.
Kind regards, Matthias Kraaz > Hey Matthias -- I'm quite sure you should *not* do what we're doing in > https://github.com/apache/kafka/pull/15315. That's definitely a bad hack, > and IIUC the only reason we accepted it was because the choice was between > implementing a hacky temporary fix and blocking the entire release while we > figured out the "right" way to fix this. Sadly, it seems like we never > followed up after the release was cut, and AFAICT everyone forgot about > this so-called "temporary" hack.... as predicted > <https://github.com/apache/kafka/pull/15315#discussion_r1478629131[https://github.com/apache/kafka/pull/15315#discussion_r1478629131]>... > > Anyways, as a general rule, you should never try to catch and swallow an > IllegalStateException. You never know if you might be swallowing an actual > problem and messing up your application. It's better to just make sure it > doesn't get thrown in the first place. > > In this specific case, it seems like the "illegal state" that both you and > Kafka Streams are hitting comes from calling #abortTransaction after a > timeout. So it sounds to me like you should just add a separate catch block > for TimeoutException and retry the #commitTransaction. > > For the full explanation as to why you need to retry the #commitTxn (as > opposed to just starting a new transaction), check out this paragraph in > the #commitTransaction javadocs: > > Note that this method will raise TimeoutException if the transaction cannot > > be committed before expiration of max.block.ms, but this does not mean > > the request did not actually reach the broker. In fact, it only indicates > > that we cannot get the acknowledgement response in time, so it's up to the > > application's logic to decide how to handle timeouts. Additionally, it will > > raise InterruptException if interrupted. It is safe to retry in either > > case, but it is not possible to attempt a different operation (such as > > abortTransaction) since the commit may already be in the progress of > > completing. If not retrying, the only option is to close the producer. > > > It does seem like the exception handling example is objectively incorrect, > since TimeoutException extends KafkaException and will, as you experienced, > end up in the KafkaException catch block that tries to abort the exception. > I get that people might want to handle timeouts differently, and it's > tricky since they can also be thrown from abortTransaction and has to be > handled in the same way (ie you can only retry the same API or close the > producer). But the example is very misleading and should absolutely be > updated to include explicit TimeoutException handling in my opinion. > > Would you be interested in doing a PR to fix the Producer javadcos and > improve the exception handling example? > > On Thu, May 2, 2024 at 4:22 AM <matthias.kr...@gmx.de.invalid> wrote: > > > Hi, > > > > Thanks for your work and sorry to bother you. > > > > My code gets the same IllegalStateException from KafkaProducer as Kafka > > Stream gets in KAFKA-16221: > > > > java.lang.IllegalStateException: Cannot attempt operation > > `abortTransaction` because the previous call to `commitTransaction` timed > > out and must be retried > > at > > org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1109) > > at > > org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:266) > > at > > org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:835) > > > > I have followed the recipe from > > https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html[https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html] > > for error handling for a transactional producer, i.e. > > > > try { > > producer.beginTransaction(); > > for (int i = 0; i < 100; i++) > > producer.send(new ProducerRecord<>("my-topic", > > Integer.toString(i), Integer.toString(i))); > > producer.commitTransaction(); > > } catch (ProducerFencedException | OutOfOrderSequenceException | > > AuthorizationException e) { > > // We can't recover from these exceptions, so our only option is to > > close the producer and exit. > > producer.close(); > > } catch (KafkaException e) { > > // For all other exceptions, just abort the transaction and try again. > > producer.abortTransaction(); > > } > > producer.close(); > > > > Kafka Streams has solved KAFKA-16221 by introducing a hack ( > > https://github.com/apache/kafka/pull/15315[https://github.com/apache/kafka/pull/15315]), > > but plans a clean solution. > > > > Does that mean that above recipe is outdated? > > Is there really no simple, clean solution how to do the error handling? > > Should I use the solution from > > https://github.com/apache/kafka/pull/15315[https://github.com/apache/kafka/pull/15315] > > and wait for what Kafka Streams comes up next for the clean solution? > > > > Kind regards, Matthias Kraaz