We're consistently seeing this on Kafka Streams 1.0.0, even when not under 
load, exclusively for state change log topics. Tried changing 
`request.timeout.ms` to 60s but it doesn't help. The exception is thrown 
continuously for what looks to be every attempt to update any of the state 
change logs.

We're tried running the Kafka Streams on a 2 to 20 node cluster, connecting to 
a remote broker cluster. The topology is quite simple (7 processors, 2 state 
stores). The brokers have been handling significant loads from other sources 
(librdkafka & Java producers) without any issues, so I doubt there is any 
problem there. 

Another interesting fact we see after these timeout exceptions is that calling 
`.close(5, TimeUnit.Seconds)` on the application (catching the 
`TimeoutException` via a `GlobalExceptionHandler`), makes the JVM process hang 
every time, and we need to manually kill it.

Also tried updating to 1.1.0 to check if the new config param `retries` set to 
1 makes any difference, and it doesn't. `default.production.exception.handler` 
does trigger for this situation but doesn't offer anything different from the 
`GlobalExceptionHandler`.  

Something that comes to mind is that the input and output topics are long lived 
(i.e. the broker cluster along with the ZK quorum) while the Kafka Streams 
cluster is an ephemeral cluster which we take down and up in minutes (and we do 
this quite often, for config changes). When recreating this cluster the hosts 
are changed completely (the apps run in Docker containers in an AWS autoscaling 
group). Is there any long-lived relation between Kafka Streams state change log 
partitions and the Kafka Streams process hosts that breaks when recreating the 
Kafka Streams nodes? The first time we create the Kafka Streams cluster on a 
new broker cluster, things seem to run relatively well. 

Here's an example stack trace:
2018-03-29 13:15:15,614 [kafka-producer-network-thread | 
app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1-producer] ERROR 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task [0_19] 
Error sending record (key ... value [...] timestamp 1522327436038) to topic 
app-sessionStateStore-changelog due to {}; No more records will be sent and no 
more offsets will be recorded for this task.
org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for 
app-sessionStateStore-changelog-19: 30002 ms has passed since last append
2018-03-29 13:15:19,470 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] ERROR 
org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] Failed to commit 
stream task 0_19 due to the following error:
org.apache.kafka.streams.errors.StreamsException: task [0_19] Abort sending 
since an error caught with a previous record (key ... value ... timestamp 
1522327436038) to topic app-sessionStateStore-changelog due to 
org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for 
app-sessionStateStore-changelog-19: 30002 ms has passed since last append.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
        at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 23 
record(s) for app-sessionStateStore-changelog-19: 30002 ms has passed since 
last append
2018-03-29 13:15:19,471 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO  
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] State transition from 
PARTITIONS_ASSIGNED to PENDING_SHUTDOWN
2018-03-29 13:15:19,471 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO  
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] Shutting down
2018-03-29 13:15:19,885 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO  
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1-producer] 
Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-03-29 13:15:19,896 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO  
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] State transition from 
PENDING_SHUTDOWN to DEAD
2018-03-29 13:15:19,896 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO  
org.apache.kafka.streams.KafkaStreams - stream-client 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133]State transition from REBALANCING to 
ERROR
2018-03-29 13:15:19,896 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] WARN  
org.apache.kafka.streams.KafkaStreams - stream-client 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133]All stream threads have died. The 
instance will be in error state and should be closed.
2018-03-29 13:15:19,896 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO  
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] Shutdown complete
2018-03-29 13:15:19,896 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] ERROR com.x.y.z.App$ 
- Thread 13 died with exception task [0_19] Abort sending since an error caught 
with a previous record (key ... value ... timestamp 1522327436038) to topic 
app-sessionStateStore-changelog due to 
org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for 
app-sessionStateStore-changelog-19: 30002 ms has passed since last append.. 
Shutting down the Kafka Streams application
org.apache.kafka.streams.errors.StreamsException: task [0_19] Abort sending 
since an error caught with a previous record (key ... value ... timestamp 
1522327436038) to topic app-sessionStateStore-changelog due to 
org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for 
app-sessionStateStore-changelog-19: 30002 ms has passed since last append.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
        at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 23 
record(s) for app-sessionStateStore-changelog-19: 30002 ms has passed since 
last append

On 2018/01/26 21:15:43, "Matthias J. Sax" <matth...@confluent.io> wrote: 
> I assume that you are hitting a know bug. KIP-91 is addressing this issue.
> 
> The recommended workaround is to increase the request.timeout.ms parameter.
> 
> 
> -Matthias
> 
> On 1/25/18 10:04 AM, Dan Bress wrote:
> > Hi,
> > 
> > I'm running an app on Kafka Streams 1.0.0, and in the past day a lot of
> > nodes are failing and I see this in the log.
> > 
> > These appear to be failures when attempting to update the changelog.  Any
> > ideas on what I should do to work around this?  Should I configure separate
> > retry and timeouts for the changelog producer?  If so How do I do that?
> > 
> > 
> > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > [dp-insightstrack-staging1-9cc75371-000c-420a-b9d7-b46b2b4bab5c-StreamThread-3]
> > Failed to rebalance.
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:860)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> > Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread
> > [dp-insightstrack-staging1-9cc75371-000c-420a-b9d7-b46b2b4bab5c-StreamThread-3]
> > failed to suspend stream tasks
> > at
> > org.apache.kafka.streams.processor.internals.TaskManager.suspendTasksAndState(TaskManager.java:204)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:418)
> > at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
> > at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> > at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> > ... 3 common frames omitted
> > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: task
> > [0_18] Failed to flush state store summarykey-to-summary
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> > at
> > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:414)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:396)
> > at
> > org.apache.kafka.streams.processor.internals.AssignedTasks.suspendTasks(AssignedTasks.java:219)
> > at
> > org.apache.kafka.streams.processor.internals.AssignedTasks.suspend(AssignedTasks.java:184)
> > at
> > org.apache.kafka.streams.processor.internals.TaskManager.suspendTasksAndState(TaskManager.java:197)
> > ... 11 common frames omitted
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_18]
> > Abort sending since an error caught with a previous record (key
> > ComparableSummaryKey(SummaryKey(GnipStream(172819),RuleId(951574687410094106),Some(TweetId(954107387677433858))))
> > value
> > TweetSummaryCountsWindow(SummaryKey(GnipStream(172819),RuleId(951574687410094106),Some(TweetId(954107387677433858))),168,1516311902337,WindowCounts(1516742462337,LongHyperLogLogPlus(com.clearspring.analytics.stream.cardinality.HyperLogLogPlus@2e76a045
> > ),LongHyperLogLogPlus(com.clearspring.analytics.stream.cardinality.HyperLogLogPlus@8610f558
> > ),None,-1,false,MetricMap(true,[I@74cc9b69
> > ),com.twitter.datainsights.insightstrack.domain.InteractionSourceSummary@5653a508),None,false)
> > timestamp 1516742492445) to topic
> > dp-insightstrack-staging1-summarykey-to-summary-changelog due to
> > org.apache.kafka.common.errors.TimeoutException: Expiring 7 record(s) for
> > dp-insightstrack-staging1-summarykey-to-summary-changelog-18: 30011 ms has
> > passed since last append.
> > at
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
> > at
> > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
> > at
> > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
> > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
> > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 7
> > record(s) for dp-insightstrack-staging1-summarykey-to-summary-changelog-18:
> > 30011 ms has passed since last append
> > 
> 
> 

Reply via email to