[ https://issues.apache.org/jira/browse/KAFKA-7108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525665#comment-16525665 ]
Matthias J. Sax commented on KAFKA-7108: ---------------------------------------- [~guozhang] My suggestion is to always abort the transaction, no matter if the handler returns FAIL or CONTINUE. The returned flag only determines if Streams should stop processing and shut down or continue processing. Thus, on reprocessing you would not see duplicates in the output as the transaction was never committed. If you argue that this implies data loss and thus is not appropriate if EOS is enabled, than it's questionable if registering a `production.exception.handler` for EOS make sense at all and if we should disallow it. [~anuta] maybe you can elaborate why you want to skip over output record if EOS is enabled? Seems to be a contradiction to some extend. Would be good to understand your use case. [~bbejeck] I had a similar though. However, if we don't reuse the existing handler, than we would need to introduce a new handler for this case (maybe `default.commit.exception.handler` ?). This might be more confusing to users than helpful. Not sure what solution might be the better one, especially, because for non-EOS case we always swallow any exception during offset commit – furthermore, the name is not good in the first place (any better idea?) because with EOS enable, we actually don't commit yet at this point. Committing offsets and adding offsets to a transaction are two different things while committing a transaction is something else by itself. Overall it might be confusing for users if we follow this path. Another alternative might be to handle this case within Kafka Streams. With EOS enabled, if an error occurs and the handler returns CONTINUE, we abort the transaction, reset the consumer to the old offsets, cleanup the stores, and retry processing the input data. Of course, if Kafka Streams fail (current behavior) and the application is restarted, the same exact thing would happen and thus not sure if we would gain a lot here. > "Exactly-once" stream breaks production exception handler contract > ------------------------------------------------------------------ > > Key: KAFKA-7108 > URL: https://issues.apache.org/jira/browse/KAFKA-7108 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Anna O > Priority: Major > Labels: exactly-once > > I have a stream configured with "default.production.exception.handler" that > is supposed to log the error and continue. When I set "processing.guarantee" > to "exactly_once" it appeared that retryable NotEnoughReplicasException that > passed the production exception handler was rethrown by the > TransactionManager wrapped with KafkaException and terminated the stream > thread: > _org.apache.kafka.common.KafkaException: Cannot execute transactional method > because we are in an error stateat > org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:250) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:617) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:357) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:53) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:316) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) > [kafka-streams-1.1.0.jar:?]_ > _Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: > Messages are rejected since there are fewer in-sync replicas than required._ > -- This message was sent by Atlassian JIRA (v7.6.3#76005)