We're consistently seeing this on Kafka Streams 1.0.0, even when not under load, exclusively for state change log topics. Tried changing `request.timeout.ms` to 60s but it doesn't help. The exception is thrown continuously for what looks to be every attempt to update any of the state change logs.
We're tried running the Kafka Streams on a 2 to 20 node cluster, connecting to a remote broker cluster. The topology is quite simple (7 processors, 2 state stores). The brokers have been handling significant loads from other sources (librdkafka & Java producers) without any issues, so I doubt there is any problem there. Another interesting fact we see after these timeout exceptions is that calling `.close(5, TimeUnit.Seconds)` on the application (catching the `TimeoutException` via a `GlobalExceptionHandler`), makes the JVM process hang every time, and we need to manually kill it. Also tried updating to 1.1.0 to check if the new config param `retries` set to 1 makes any difference, and it doesn't. `default.production.exception.handler` does trigger for this situation but doesn't offer anything different from the `GlobalExceptionHandler`. Something that comes to mind is that the input and output topics are long lived (i.e. the broker cluster along with the ZK quorum) while the Kafka Streams cluster is an ephemeral cluster which we take down and up in minutes (and we do this quite often, for config changes). When recreating this cluster the hosts are changed completely (the apps run in Docker containers in an AWS autoscaling group). Is there any long-lived relation between Kafka Streams state change log partitions and the Kafka Streams process hosts that breaks when recreating the Kafka Streams nodes? The first time we create the Kafka Streams cluster on a new broker cluster, things seem to run relatively well. Here's an example stack trace: 2018-03-29 13:15:15,614 [kafka-producer-network-thread | app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1-producer] ERROR org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task [0_19] Error sending record (key ... value [...] timestamp 1522327436038) to topic app-sessionStateStore-changelog due to {}; No more records will be sent and no more offsets will be recorded for this task. org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for app-sessionStateStore-changelog-19: 30002 ms has passed since last append 2018-03-29 13:15:19,470 [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] Failed to commit stream task 0_19 due to the following error: org.apache.kafka.streams.errors.StreamsException: task [0_19] Abort sending since an error caught with a previous record (key ... value ... timestamp 1522327436038) to topic app-sessionStateStore-changelog due to org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for app-sessionStateStore-changelog-19: 30002 ms has passed since last append. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for app-sessionStateStore-changelog-19: 30002 ms has passed since last append 2018-03-29 13:15:19,471 [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN 2018-03-29 13:15:19,471 [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] Shutting down 2018-03-29 13:15:19,885 [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 2018-03-29 13:15:19,896 [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD 2018-03-29 13:15:19,896 [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [app-ef44224d-ab04-47ea-b2f3-2b74bac22133]State transition from REBALANCING to ERROR 2018-03-29 13:15:19,896 [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] WARN org.apache.kafka.streams.KafkaStreams - stream-client [app-ef44224d-ab04-47ea-b2f3-2b74bac22133]All stream threads have died. The instance will be in error state and should be closed. 2018-03-29 13:15:19,896 [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] Shutdown complete 2018-03-29 13:15:19,896 [app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] ERROR com.x.y.z.App$ - Thread 13 died with exception task [0_19] Abort sending since an error caught with a previous record (key ... value ... timestamp 1522327436038) to topic app-sessionStateStore-changelog due to org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for app-sessionStateStore-changelog-19: 30002 ms has passed since last append.. Shutting down the Kafka Streams application org.apache.kafka.streams.errors.StreamsException: task [0_19] Abort sending since an error caught with a previous record (key ... value ... timestamp 1522327436038) to topic app-sessionStateStore-changelog due to org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for app-sessionStateStore-changelog-19: 30002 ms has passed since last append. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for app-sessionStateStore-changelog-19: 30002 ms has passed since last append On 2018/01/26 21:15:43, "Matthias J. Sax" <matth...@confluent.io> wrote: > I assume that you are hitting a know bug. KIP-91 is addressing this issue. > > The recommended workaround is to increase the request.timeout.ms parameter. > > > -Matthias > > On 1/25/18 10:04 AM, Dan Bress wrote: > > Hi, > > > > I'm running an app on Kafka Streams 1.0.0, and in the past day a lot of > > nodes are failing and I see this in the log. > > > > These appear to be failures when attempting to update the changelog. Any > > ideas on what I should do to work around this? Should I configure separate > > retry and timeouts for the changelog producer? If so How do I do that? > > > > > > org.apache.kafka.streams.errors.StreamsException: stream-thread > > [dp-insightstrack-staging1-9cc75371-000c-420a-b9d7-b46b2b4bab5c-StreamThread-3] > > Failed to rebalance. > > at > > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:860) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > > Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread > > [dp-insightstrack-staging1-9cc75371-000c-420a-b9d7-b46b2b4bab5c-StreamThread-3] > > failed to suspend stream tasks > > at > > org.apache.kafka.streams.processor.internals.TaskManager.suspendTasksAndState(TaskManager.java:204) > > at > > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:291) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:418) > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359) > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) > > ... 3 common frames omitted > > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: task > > [0_18] Failed to flush state store summarykey-to-summary > > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248) > > at > > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324) > > at > > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304) > > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:414) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:396) > > at > > org.apache.kafka.streams.processor.internals.AssignedTasks.suspendTasks(AssignedTasks.java:219) > > at > > org.apache.kafka.streams.processor.internals.AssignedTasks.suspend(AssignedTasks.java:184) > > at > > org.apache.kafka.streams.processor.internals.TaskManager.suspendTasksAndState(TaskManager.java:197) > > ... 11 common frames omitted > > Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_18] > > Abort sending since an error caught with a previous record (key > > ComparableSummaryKey(SummaryKey(GnipStream(172819),RuleId(951574687410094106),Some(TweetId(954107387677433858)))) > > value > > TweetSummaryCountsWindow(SummaryKey(GnipStream(172819),RuleId(951574687410094106),Some(TweetId(954107387677433858))),168,1516311902337,WindowCounts(1516742462337,LongHyperLogLogPlus(com.clearspring.analytics.stream.cardinality.HyperLogLogPlus@2e76a045 > > ),LongHyperLogLogPlus(com.clearspring.analytics.stream.cardinality.HyperLogLogPlus@8610f558 > > ),None,-1,false,MetricMap(true,[I@74cc9b69 > > ),com.twitter.datainsights.insightstrack.domain.InteractionSourceSummary@5653a508),None,false) > > timestamp 1516742492445) to topic > > dp-insightstrack-staging1-summarykey-to-summary-changelog due to > > org.apache.kafka.common.errors.TimeoutException: Expiring 7 record(s) for > > dp-insightstrack-staging1-summarykey-to-summary-changelog-18: 30011 ms has > > passed since last append. > > at > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118) > > at > > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) > > at > > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) > > at > > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) > > at > > org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287) > > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) > > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 7 > > record(s) for dp-insightstrack-staging1-summarykey-to-summary-changelog-18: > > 30011 ms has passed since last append > > > >