[ https://issues.apache.org/jira/browse/KAFKA-7108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525251#comment-16525251 ]
Matthias J. Sax commented on KAFKA-7108: ---------------------------------------- Thanks for reporting this [~anuta]. [~yuzhih...@gmail.com] I am not sure atm. The `production.exception.handler` is supposed to handle writes, but `sendOffsetsToTransaction()` is not writing any data. For example, if EOS is disabled, and committing offsets fails, the handler would also not be called for this error – the difference is, that a failed offset commit for non-EOS can just be ignored (if fact we swallow the error and continue) as it does not violate at-least-once guarantees. Also, we could call the handler only with `null` because there is not record to be passed into the handler. Therefore, I am not sure if re-using the `production.exception.handler` is the best choice. If we do it, we would need document that the handler could be called with `null` for this case... Think we need a broader discussion how to handle this. \cc [~guozhang] [~bbejeck] [~vvcephei] > "Exactly-once" stream breaks production exception handler contract > ------------------------------------------------------------------ > > Key: KAFKA-7108 > URL: https://issues.apache.org/jira/browse/KAFKA-7108 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Anna O > Priority: Critical > Labels: exactly-once > > I have a stream configured with "default.production.exception.handler" that > is supposed to log the error and continue. When I set "processing.guarantee" > to "exactly_once" it appeared that retryable NotEnoughReplicasException that > passed the production exception handler was rethrown by the > TransactionManager wrapped with KafkaException and terminated the stream > thread: > _org.apache.kafka.common.KafkaException: Cannot execute transactional method > because we are in an error stateat > org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:250) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:617) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:357) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:53) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:316) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) > [kafka-streams-1.1.0.jar:?]_ > _Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: > Messages are rejected since there are fewer in-sync replicas than required._ > -- This message was sent by Atlassian JIRA (v7.6.3#76005)