Interesting, this does look like it could be a bug in Streams and I'm not
aware of
any known or already-fixed issues resembling this. Could you file a bug
report
over here <https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA>
and
include as much context/information as possible? Providing logs
from around the time leading up to this exception in particular would
greatly
help in debugging this

On Tue, Nov 15, 2022 at 2:15 AM Tomasz Gac <tomasz....@empirica.io.invalid>
wrote:

> Hi group,
>
> We've encountered a problem during regular operation of the kafka-streams
> application. While processing a record we received the following error.
> There's very little documentation on this kind of problem but I've gathered
> that it's a synchronization issue between kafka consumer and producer. Have
> you ever encountered it before?
>
> My questions are: is this a matter of misconfiguration, or rather a bug?
> Has it been fixed?
>
> We're using Java 8, kafka-client 2.8.1 with kafka streams version 2.8.1
> against the kafka broker version 2.1.1. We are running it as an OSGI bundle
> with dependencies packaged within the bundle.
>
> Thank you.
>
> 2022-10-18T01:20:19,665 | ERROR |
> TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1 |
> TaskManager | 218 - org.apache.servicemix.bundles.kafka-clients - 2.8.1.1 |
> stream-thread
> [TRADE-ENRICHER-35daba19-3907-4a55-8fce-606abf0e9b3a-StreamThread-1] Failed
> to process stream task 0_2 due to the following error:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_2, processor=KSTREAM-SOURCE-0000000002,
> topic=ORDER_BOOK_BEST, partition=2, offset=13452067,
> stacktrace=java.lang.IllegalStateException: Cannot call send in state
> COMMITTING_TRANSACTION at
>
> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451)
> at
>
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948)
> at
>
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> at
>
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211)
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182)
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139)
> at
>
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> at
>
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
> at
>
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
> [!/:?] at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
> [!/:?] at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
> [!/:?] at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
> [!/:?] Caused by: java.lang.IllegalStateException: Cannot call send in
> state COMMITTING_TRANSACTION at
>
> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:451)
> ~[!/:?] at
>
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:948)
> ~[!/:?] at
>
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:211)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:182)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:139)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> ~[!/:?] at
>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> ~[!/:?] at
>
> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> ~[!/:?] at
>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> ~[!/:?] at
>
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
> ~[!/:?] at
>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
> ~[!/:?] at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
> ~[!/:?] ... 4 more
>

Reply via email to