[ 
https://issues.apache.org/jira/browse/KAFKA-9310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001767#comment-17001767
 ] 

ASF GitHub Bot commented on KAFKA-9310:
---------------------------------------

vvcephei commented on pull request #7845: KAFKA-9310: Handle UnknownProducerId 
from RecordCollector.send
URL: https://github.com/apache/kafka/pull/7845
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamThread may die from recoverable UnknownProducerId exception
> -----------------------------------------------------------------
>
>                 Key: KAFKA-9310
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9310
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>
> We attempted to capture and recover from UnknownProducerId exceptions in 
> KAFKA-9231 , but the exception can still be raised, wrapped in a 
> KafkaException, and kill the thread.
> For example, see the stack trace:
> {noformat}
> [2019-12-17 00:08:53,064] ERROR 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
> stream-thread 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
> Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
>   org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_1, processor=KSTREAM-SOURCE-0000000031, 
> topic=windowed-node-counts, partition=1, offset=354933575, 
> stacktrace=org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort 
> sending since an error caught with a previous record (timestamp 
> 1575857317197) to topic 
> stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog due to 
> org.apache.kafka.common.KafkaException: Cannot perform send because at least 
> one previous transactional or idempotent request has failed with errors.
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
>       at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
>       at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>       at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
>       at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
>       at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>       at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
>       at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
>  Caused by: org.apache.kafka.common.KafkaException: Cannot perform send 
> because at least one previous transactional or idempotent request has failed 
> with errors.
>       at 
> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
>       ... 29 more
>  Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This 
> exception is raised by the broker if it could not locate the producer 
> metadata associated with the producerId in question. This could happen if, 
> for instance, the producer's records were deleted because their retention 
> time had elapsed. Once the last records of the producerId are removed, the 
> producer's metadata is removed from the broker, and future appends by the 
> producer will return this exception.
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:446)
>       at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
>  Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_1] 
> Abort sending since an error caught with a previous record (timestamp 
> 1575857317197) to topic 
> stream-soak-test-windowed-node-counts-STATE-STORE-0000000030-changelog due to 
> org.apache.kafka.common.KafkaException: Cannot perform send because at least 
> one previous transactional or idempotent request has failed with errors.
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
>       at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
>       at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
>       at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>       at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
>       at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
>       at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
>       at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
>       at 
> org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>       at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
>       ... 5 more
>  Caused by: org.apache.kafka.common.KafkaException: Cannot perform send 
> because at least one previous transactional or idempotent request has failed 
> with errors.
>       at 
> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171)
>       ... 29 more
>  Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This 
> exception is raised by the broker if it could not locate the producer 
> metadata associated with the producerId in question. This could happen if, 
> for instance, the producer's records were deleted because their retention 
> time had elapsed. Once the last records of the producerId are removed, the 
> producer's metadata is removed from the broker, and future appends by the 
> producer will return this exception.
>  [2019-12-17 00:08:53,066] INFO 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
> stream-thread 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] State 
> transition from RUNNING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> {noformat}
> The catch blocks should be updated to expect the exception in this form.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to