[ 
https://issues.apache.org/jira/browse/KAFKA-7108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525251#comment-16525251
 ] 

Matthias J. Sax commented on KAFKA-7108:
----------------------------------------

Thanks for reporting this [~anuta].

[~yuzhih...@gmail.com] I am not sure atm. The `production.exception.handler` is 
supposed to handle writes, but `sendOffsetsToTransaction()` is not writing any 
data. For example, if EOS is disabled, and committing offsets fails, the 
handler would also not be called for this error – the difference is, that a 
failed offset commit for non-EOS can just be ignored (if fact we swallow the 
error and continue) as it does not violate at-least-once guarantees. Also, we 
could call the handler only with `null` because there is not record to be 
passed into the handler. Therefore, I am not sure if re-using the 
`production.exception.handler` is the best choice. If we do it, we would need 
document that the handler could be called with `null` for this case... Think we 
need a broader discussion how to handle this. \cc [~guozhang] [~bbejeck] 
[~vvcephei]

> "Exactly-once" stream breaks production exception handler contract
> ------------------------------------------------------------------
>
>                 Key: KAFKA-7108
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7108
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Anna O
>            Priority: Critical
>              Labels: exactly-once
>
> I have a stream configured with "default.production.exception.handler" that 
> is supposed to log the error and continue. When I set "processing.guarantee" 
> to "exactly_once" it appeared that retryable NotEnoughReplicasException that 
> passed the production exception handler was rethrown by the 
> TransactionManager wrapped with KafkaException and terminated the stream 
> thread:
> _org.apache.kafka.common.KafkaException: Cannot execute transactional method 
> because we are in an error stateat 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:250)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:617)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:357)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:53)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:316)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
>  [kafka-streams-1.1.0.jar:?]_
>  _Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: 
> Messages are rejected since there are fewer in-sync replicas than required._
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to