Hi All, I am running into an issue with my Kafka Streams application. The application was running fine for almost 2 weeks, then it started throwing the below exception which caused the threads to die. Now when I restart the application, it dies quickly (1-2 hrs) when trying to catch up the lag.
The application is running on an AWS EC2 instance with 8 core processor and 16GB of memory. The streams config is given below and more logs are available below (I have stripped of some logs which I though may not be relevant). Towards the end of this thread you will be able to see lot of exceptions similar to the below one + RocksDBExceptions (org.apache.kafka.streams.errors.ProcessorStateException: Error opening store vstore at location /mnt/store/kafka-streams/pe-v1/1_10/rocksdb/vstore). Could you please take a look at it and let me know what could be wrong here? INFO 2018-02-21 08:37:20.758 [Engine2-StreamThread-2] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-2] Committed all active tasks [7_12, 6_3, 1_27] and standby tasks [4_0, 1_30] in 0ms ERROR 2018-02-21 08:37:24.853 [kafka-producer-network-thread | Engine2-StreamThread-6-producer] org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task [1_34] Error sending record to topic cv-v1-cv. No more offsets will be recorded for this task and the exception will eventually be thrown org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for cv-v1-cv-2: 31439 ms has passed since last append DEBUG 2018-02-21 08:37:24.859 [Engine2-StreamThread-3] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-3] processing latency 46282 > commit time 30000 for 10000 records. Adjusting down recordsProcessedBeforeCommit=6482 ERROR 2018-02-21 08:37:24.865 [Engine2-StreamThread-6] org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread [Engine2-StreamThread-6] Failed to commit stream task 1_34 due to the following error: org.apache.kafka.streams.errors.StreamsException: task [1_34] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:137) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:145) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:296) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:275) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. measureLatencyNs(StreamsMetricsImpl.java:201) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(AssignedTasks.java:374) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread. processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?] Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for cv-v1-cv-2: 31439 ms has passed since last append INFO 2018-02-21 08:37:24.865 [Engine2-StreamThread-6] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-6] Shutting down INFO 2018-02-21 08:37:24.865 [Engine2-StreamThread-6] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-6] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN. DEBUG 2018-02-21 08:37:24.865 [Engine2-StreamThread-6] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-6] Shutting down all active tasks [6_12, 1_34, 7_15], standby tasks [1_2, 1_7], suspended tasks [1_18, 7_15, 6_3, 1_27], and suspended standby tasks [1_3, 1_30] INFO 2018-02-21 08:37:24.879 [Engine2-StreamThread-3] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-3] Committed all active tasks [1_31, 1_24, 6_7] and standby tasks [1_0, 1_6, 1_13] in 0ms INFO 2018-02-21 08:37:24.954 [Engine2-StreamThread-6] org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. INFO 2018-02-21 08:37:24.957 [Engine2-StreamThread-6] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-6] Stream thread shutdown complete INFO 2018-02-21 08:37:24.957 [Engine2-StreamThread-6] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-6] State transition from PENDING_SHUTDOWN to DEAD. *Streams Config* *----------------------* INFO 2018-02-21 07:11:20.209 [main] org.apache.kafka.streams.StreamsConfig - StreamsConfig values: application.id = cv-v1 application.server = bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092] buffered.records.per.partition = 1000 cache.max.bytes.buffering = 104857600 client.id = Engine2 commit.interval.ms = 30000 connections.max.idle.ms = 540000 default.key.serde = class org.apache.kafka.common.serialization.Serdes$ ByteArraySerde default.timestamp.extractor = class org.apache.kafka.streams.processor. FailOnInvalidTimestamp default.value.serde = class org.apache.kafka.common.serialization.Serdes$ ByteArraySerde key.serde = null metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = DEBUG metrics.sample.window.ms = 30000 num.standby.replicas = 1 num.stream.threads = 1 partition.grouper = class org.apache.kafka.streams.processor. DefaultPartitionGrouper poll.ms = 100 processing.guarantee = at_least_once receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = 2 request.timeout.ms = 40000 retry.backoff.ms = 100 rocksdb.config.setter = null security.protocol = PLAINTEXT send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /mnt/store/kafka-streams timestamp.extractor = null value.serde = null windowstore.changelog.additional.retention.ms = 86400000 zookeeper.connect = INFO 2018-02-21 07:11:20.398 [main] org.apache.kafka.streams.StreamsConfig - StreamsConfig values: application.id = pe-v1 application.server = newengine101:8080 bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092] buffered.records.per.partition = 1000 cache.max.bytes.buffering = 2147483648 client.id = Engine2 commit.interval.ms = 30000 connections.max.idle.ms = 540000 default.key.serde = class org.apache.kafka.common.serialization.Serdes$ ByteArraySerde default.timestamp.extractor = class org.apache.kafka.streams.processor. FailOnInvalidTimestamp default.value.serde = class org.apache.kafka.common.serialization.Serdes$ ByteArraySerde key.serde = null metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = DEBUG metrics.sample.window.ms = 30000 num.standby.replicas = 1 num.stream.threads = 6 partition.grouper = class org.apache.kafka.streams.processor. DefaultPartitionGrouper poll.ms = 100 processing.guarantee = at_least_once receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = 2 request.timeout.ms = 40000 retry.backoff.ms = 100 rocksdb.config.setter = null security.protocol = PLAINTEXT send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /mnt/store/kafka-streams timestamp.extractor = null value.serde = null windowstore.changelog.additional.retention.ms = 86400000 zookeeper.connect = *More Logs* INFO 2018-02-21 13:33:19.083 [Engine2-StreamThread-1] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-1] Committed all active tasks [0_1, 1_0, 0_3, 2_2] and standby tasks [] in 0ms ERROR 2018-02-21 13:33:34.140 [kafka-producer-network-thread | Engine2-StreamThread-2-producer] org.apache.kafka.streams. processor.internals.RecordCollectorImpl - task [1_34] Error sending record to topic cv-v1-cv. No more offsets will be recorded for this task and the exception will eventually be thrown org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for cv-v1-cv-2: 31903 ms has passed since last append ERROR 2018-02-21 13:33:34.140 [kafka-producer-network-thread | Engine2-StreamThread-2-producer] org.apache.kafka.streams. processor.internals.RecordCollectorImpl - task [1_10] Error sending record to topic cv-v1-cv. No more offsets will be recorded for this task and the exception will eventually be thrown org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for cv-v1-cv-2: 31903 ms has passed since last append ERROR 2018-02-21 13:33:36.396 [kafka-producer-network-thread | Engine2-StreamThread-4-producer] org.apache.kafka.streams. processor.internals.RecordCollectorImpl - task [1_29] Error sending record to topic cv-v1-cv. No more offsets will be recorded for this task and the exception will eventually be thrown org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for cv-v1-cv-2: 34111 ms has passed since last append ERROR 2018-02-21 13:33:36.396 [kafka-producer-network-thread | Engine2-StreamThread-4-producer] org.apache.kafka.streams. processor.internals.RecordCollectorImpl - task [1_18] Error sending record to topic cv-v1-cv. No more offsets will be recorded for this task and the exception will eventually be thrown org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for cv-v1-cv-2: 33431 ms has passed since last append ERROR 2018-02-21 13:33:36.397 [kafka-producer-network-thread | Engine2-StreamThread-4-producer] org.apache.kafka.streams. processor.internals.RecordCollectorImpl - task [1_32] Error sending record to topic cv-v1-cv. No more offsets will be recorded for this task and the exception will eventually be thrown org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for cv-v1-cv-2: 32717 ms has passed since last append ERROR 2018-02-21 13:33:40.770 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread [Engine2-StreamThread-4] Failed to commit stream task 1_32 due to the following error: org.apache.kafka.streams.errors.StreamsException: task [1_32] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:137) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:145) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:296) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:275) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. measureLatencyNs(StreamsMetricsImpl.java:201) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(AssignedTasks.java:374) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread. processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?] Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for cv-v1-cv-2: 32717 ms has passed since last append ERROR 2018-02-21 13:33:40.774 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread [Engine2-StreamThread-4] Failed to commit stream task 1_18 due to the following error: org.apache.kafka.streams.errors.ProcessorStateException: task [1_18] Failed to flush state store subs at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush( ProcessorStateManager.java:257) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:194) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:295) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:275) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. measureLatencyNs(StreamsMetricsImpl.java:201) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(AssignedTasks.java:374) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread. processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?] Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_18] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:137) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:58) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:29) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. measureLatency(MeteredKeyValueStore.java:218) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:133) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore. putAndMaybeForward(CachingKeyValueStore.java:95) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:129) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush( ProcessorStateManager.java:255) ~[kafka-streams-0.11.0.2.jar:?] ... 13 more Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for cv-v1-cv-2: 33431 ms has passed since last append ERROR 2018-02-21 13:33:40.774 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread [Engine2-StreamThread-4] Failed to commit stream task 1_29 due to the following error: org.apache.kafka.streams.errors.ProcessorStateException: task [1_29] Failed to flush state store subs at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush( ProcessorStateManager.java:257) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:194) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:295) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:275) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. measureLatencyNs(StreamsMetricsImpl.java:201) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(AssignedTasks.java:374) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread. processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?] Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_29] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:137) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:58) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:29) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. measureLatency(MeteredKeyValueStore.java:218) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:133) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore. putAndMaybeForward(CachingKeyValueStore.java:95) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:129) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush( ProcessorStateManager.java:255) ~[kafka-streams-0.11.0.2.jar:?] ... 13 more Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for cv-v1-cv-2: 34111 ms has passed since last append INFO 2018-02-21 13:33:40.775 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-4] Shutting down INFO 2018-02-21 13:33:40.775 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-4] State transition from RUNNING to PENDING_SHUTDOWN. DEBUG 2018-02-21 13:33:40.775 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-4] Shutting down all active tasks [1_32, 1_18, 7_15, 1_24, 6_5, 6_7, 1_29], standby tasks [1_15, 1_3, 1_19, 1_8], suspended tasks [6_12, 1_34, 6_14, 1_22, 1_10, 1_28], and suspended standby tasks [1_32, 1_5, 1_23, 1_8] ERROR 2018-02-21 13:33:40.876 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.ProcessorStateManager - task [1_18] Failed to close state store vstore: org.apache.kafka.streams.errors.StreamsException: task [1_18] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:137) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:58) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:29) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. measureLatency(MeteredKeyValueStore.java:218) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:133) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore. putAndMaybeForward(CachingKeyValueStore.java:95) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:129) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:113) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close( ProcessorStateManager.java:278) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask. closeStateManager(AbstractTask.java:232) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:410) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:479) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread. shutdownTasksAndState(StreamThread.java:1009) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:965) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?] Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for cv-v1-cv-2: 33431 ms has passed since last append ERROR 2018-02-21 13:33:40.877 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.StreamTask - task [1_18] Could not close state manager due to the following error: org.apache.kafka.streams.errors.ProcessorStateException: task [1_18] Failed to close state store vstore at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close( ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask. closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:410) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:479) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread. shutdownTasksAndState(StreamThread.java:1009) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:965) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?] Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_18] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:137) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:58) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:29) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. measureLatency(MeteredKeyValueStore.java:218) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:133) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore. putAndMaybeForward(CachingKeyValueStore.java:95) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:129) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:113) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close( ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?] ... 6 more Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for cv-v1-cv-2: 33431 ms has passed since last append ERROR 2018-02-21 13:33:40.877 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-4] Failed while closing StreamTask 1_18 due to the following error: org.apache.kafka.streams.errors.ProcessorStateException: task [1_18] Failed to close state store vstore at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close( ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask. closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:410) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:479) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread. shutdownTasksAndState(StreamThread.java:1009) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:965) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?] Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_18] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:137) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:58) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:29) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. measureLatency(MeteredKeyValueStore.java:218) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:133) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore. putAndMaybeForward(CachingKeyValueStore.java:95) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:129) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:113) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close( ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?] ... 6 more Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for cv-v1-cv-2: 33431 ms has passed since last append ERROR 2018-02-21 13:33:40.986 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.ProcessorStateManager - task [1_29] Failed to close state store vstore: org.apache.kafka.streams.errors.StreamsException: task [1_29] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:137) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:58) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:29) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. measureLatency(MeteredKeyValueStore.java:218) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:133) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore. putAndMaybeForward(CachingKeyValueStore.java:95) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:129) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:113) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close( ProcessorStateManager.java:278) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask. closeStateManager(AbstractTask.java:232) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:410) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:479) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread. shutdownTasksAndState(StreamThread.java:1009) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:965) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?] Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for cv-v1-cv-2: 34111 ms has passed since last append ERROR 2018-02-21 13:33:40.986 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.StreamTask - task [1_29] Could not close state manager due to the following error: org.apache.kafka.streams.errors.ProcessorStateException: task [1_29] Failed to close state store vstore at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close( ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask. closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:410) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:479) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread. shutdownTasksAndState(StreamThread.java:1009) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:965) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?] Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_29] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:137) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:58) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:29) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. measureLatency(MeteredKeyValueStore.java:218) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:133) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore. putAndMaybeForward(CachingKeyValueStore.java:95) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:129) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:113) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close( ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?] ... 6 more Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for cv-v1-cv-2: 34111 ms has passed since last append ERROR 2018-02-21 13:33:40.986 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-4] Failed while closing StreamTask 1_29 due to the following error: org.apache.kafka.streams.errors.ProcessorStateException: task [1_29] Failed to close state store vstore at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close( ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask. closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:410) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:479) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread. shutdownTasksAndState(StreamThread.java:1009) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:965) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?] Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_29] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:137) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:58) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor e.put(ChangeLoggingKeyValueBytesStore.java:29) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. measureLatency(MeteredKeyValueStore.java:218) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:133) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore. putAndMaybeForward(CachingKeyValueStore.java:95) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:129) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:113) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close( ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?] ... 6 more Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for cv-v1-cv-2: 34111 ms has passed since last append INFO 2018-02-21 13:33:41.052 [Engine2-StreamThread-4] org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. INFO 2018-02-21 13:33:41.054 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-4] Stream thread shutdown complete INFO 2018-02-21 13:33:41.054 [Engine2-StreamThread-4] org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Engine2-StreamThread-4] State transition from PENDING_SHUTDOWN to DEAD. org.apache.kafka.streams.errors.StreamsException: task [1_32] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:137) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:145) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:296) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:275) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. measureLatencyNs(StreamsMetricsImpl.java:201) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(AssignedTasks.java:374) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. applyToRunningTasks(AssignedTasks.java:420) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. punctuateAndCommit(AssignedTasks.java:357) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread. processAndPunctuate(StreamThread.java:662) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:459) ~[kafka-streams-0.11.0.2.jar:?] Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for cv-v1-cv-2: 32717 ms has passed since last append ERROR 2018-02-21 13:33:42.590 [Engine2-StreamThread-2] org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread [Engine2-StreamThread-2] Failed to commit stream task 1_34 due to the following error: org.apache.kafka.streams.errors.StreamsException: task [1_34] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. checkForException(RecordCollectorImpl.java:137) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:145) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:296) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:275) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. measureLatencyNs(StreamsMetricsImpl.java:201) ~[kafka-streams-0.11.0.2.jar:? ] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(AssignedTasks.java:374) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks. punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread. processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482) [kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?] Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for cv-v1-cv-2: 31903 ms has passed since last append ERROR 2018-02-21 13:33:42.591 [Engine2-StreamThread-2] org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread [Engine2-StreamThread-2] Failed to commit stream task 1_10 due to the following error: org.apache.kafka.streams.errors.ProcessorStateException: task [1_10] Failed to flush state store subs at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush( ProcessorStateManager.java:257) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:194) ~[kafka-streams-0.11.0.2.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:295) ~[kafka-streams-0.11.0.2.jar:?] Thanks, Tony