[ https://issues.apache.org/jira/browse/KAFKA-7108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525977#comment-16525977 ]
Anna O commented on KAFKA-7108: ------------------------------- [~mjsax] _maybe you can elaborate why you want to skip over output record if EOS is enabled? Seems to be a contradiction to some extend. Would be good to understand your use case._ Originally we didn't have a strong constraint for duplicates. But during the resiliency tests we found that when one of our service k8s pods is deleted under a reasonable load it leads to significant duplicates number. In order to resolve the issue we tried the "exactly-once" configuration. As a result pod delete test looked much better. The problem appeared during kafka broker restart test. During the test some records reached 2 minutes retries period and NotEnoughReplicasException was thrown. Gradually the exception caused for all of the stream threads to terminate that lead to a massive data loss. So, returning to your question, we use all the possible error handlers (like deserialization and producer) in order to prevent streams threads from terminating that is unrecoverable and leads to massive data loss. We prefer to loose some records and not having all our streams threads dead. And "exactly-once" was enabled in hope to minimize duplicates during k8s pods restarts. We'd like to have a clear listener that tells us the stream thread is dead, in order to perform explicit pod restart and maybe some king of global try-catch to control the thread unexpected termination. > "Exactly-once" stream breaks production exception handler contract > ------------------------------------------------------------------ > > Key: KAFKA-7108 > URL: https://issues.apache.org/jira/browse/KAFKA-7108 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Anna O > Priority: Major > Labels: exactly-once > > I have a stream configured with "default.production.exception.handler" that > is supposed to log the error and continue. When I set "processing.guarantee" > to "exactly_once" it appeared that retryable NotEnoughReplicasException that > passed the production exception handler was rethrown by the > TransactionManager wrapped with KafkaException and terminated the stream > thread: > _org.apache.kafka.common.KafkaException: Cannot execute transactional method > because we are in an error stateat > org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:250) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:617) > ~[kafka-clients-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:357) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:53) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:316) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) > ~[kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > [kafka-streams-1.1.0.jar:?]_ > _at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) > [kafka-streams-1.1.0.jar:?]_ > _Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: > Messages are rejected since there are fewer in-sync replicas than required._ > -- This message was sent by Atlassian JIRA (v7.6.3#76005)