[ 
https://issues.apache.org/jira/browse/KAFKA-7108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525518#comment-16525518
 ] 

Guozhang Wang commented on KAFKA-7108:
--------------------------------------

For the second question, when transaction is turned on we intentionally treat 
`NotEnoughReplicasException` as fatal errors since we cannot "make any 
guarantees about the previously committed message" (from the java comments).

For the first question, here's my rationale: when EOS is turned on, we set 
{{ProducerConfig.RETRIES_CONFIG = Integer.MAX_VALUE}} to preserve ordering, 
which means all errors passed into the exception handler  are fatal ones; in 
this case allowing users to skip under EOS means one transaction is allowed to 
have some dropped data. On the other hand, allowing to skip for `sendOffsets` 
(it is similar to produce, since all errors passed in should be fatal under 
EOS) means the whole transaction may not have committed offsets, and it means 
that if we resume from this transaction we will replay the whole income data 
stream in this transaction again.

> "Exactly-once" stream breaks production exception handler contract
> ------------------------------------------------------------------
>
>                 Key: KAFKA-7108
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7108
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Anna O
>            Priority: Major
>              Labels: exactly-once
>
> I have a stream configured with "default.production.exception.handler" that 
> is supposed to log the error and continue. When I set "processing.guarantee" 
> to "exactly_once" it appeared that retryable NotEnoughReplicasException that 
> passed the production exception handler was rethrown by the 
> TransactionManager wrapped with KafkaException and terminated the stream 
> thread:
> _org.apache.kafka.common.KafkaException: Cannot execute transactional method 
> because we are in an error stateat 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:250)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:617)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:357)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:53)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:316)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
>  [kafka-streams-1.1.0.jar:?]_
>  _Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: 
> Messages are rejected since there are fewer in-sync replicas than required._
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to