Hi Tony, Looks like you have a known issue that KIP-91( https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer) will address.
In the meantime, as a workaround, you could try setting REQUEST_TIMEOUT_MS_CONFIG to a large value (Integer.MAX_VALUE ?) other secondary configurations to consider changing would be increasing " max.block.ms" and "retries" Thanks, Bill On Thu, Feb 22, 2018 at 8:14 AM, Tony John <tonyjohnant...@gmail.com> wrote: > Hi All, > > I am running into an issue with my Kafka Streams application. The > application was running fine for almost 2 weeks, then it started throwing > the below exception which caused the threads to die. Now when I restart the > application, it dies quickly (1-2 hrs) when trying to catch up the lag. > > The application is running on an AWS EC2 instance with 8 core processor and > 16GB of memory. The streams config is given below and more logs are > available below (I have stripped of some logs which I though may not be > relevant). Towards the end of this thread you will be able to see lot of > exceptions similar to the below one + RocksDBExceptions > (org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store vstore at location > /mnt/store/kafka-streams/pe-v1/1_10/rocksdb/vstore). Could you please take > a look at it and let me know what could be wrong here? > > INFO 2018-02-21 08:37:20.758 [Engine2-StreamThread-2] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-2] Committed all active tasks [7_12, 6_3, 1_27] and > standby tasks [4_0, 1_30] in 0ms > ERROR 2018-02-21 08:37:24.853 [kafka-producer-network-thread | > Engine2-StreamThread-6-producer] > org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task > [1_34] Error sending record to topic cv-v1-cv. No more offsets will be > recorded for this task and the exception will eventually be thrown > org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for > cv-v1-cv-2: 31439 ms has passed since last append > DEBUG 2018-02-21 08:37:24.859 [Engine2-StreamThread-3] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-3] processing latency 46282 > commit time 30000 for > 10000 records. Adjusting down recordsProcessedBeforeCommit=6482 > ERROR 2018-02-21 08:37:24.865 [Engine2-StreamThread-6] > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [Engine2-StreamThread-6] Failed to commit stream task 1_34 due to the > following error: > org.apache.kafka.streams.errors.StreamsException: task [1_34] exception > caught when producing > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:137) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush( > RecordCollectorImpl.java:145) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamTask.flushState( > StreamTask.java:296) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask$1.run(StreamTask.java:275) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:201) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply( > AssignedTasks.java:374) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks. > applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks. > punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread. > processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:513) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:482) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?] > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74 > record(s) for cv-v1-cv-2: 31439 ms has passed since last append > INFO 2018-02-21 08:37:24.865 [Engine2-StreamThread-6] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-6] Shutting down > INFO 2018-02-21 08:37:24.865 [Engine2-StreamThread-6] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-6] State transition from PARTITIONS_ASSIGNED to > PENDING_SHUTDOWN. > DEBUG 2018-02-21 08:37:24.865 [Engine2-StreamThread-6] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-6] Shutting down all active tasks [6_12, 1_34, 7_15], > standby tasks [1_2, 1_7], suspended tasks [1_18, 7_15, 6_3, 1_27], and > suspended standby tasks [1_3, 1_30] > INFO 2018-02-21 08:37:24.879 [Engine2-StreamThread-3] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-3] Committed all active tasks [1_31, 1_24, 6_7] and > standby tasks [1_0, 1_6, 1_13] in 0ms > INFO 2018-02-21 08:37:24.954 [Engine2-StreamThread-6] > org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka > producer with timeoutMillis = 9223372036854775807 ms. > INFO 2018-02-21 08:37:24.957 [Engine2-StreamThread-6] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-6] Stream thread shutdown complete > INFO 2018-02-21 08:37:24.957 [Engine2-StreamThread-6] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-6] State transition from PENDING_SHUTDOWN to DEAD. > > > *Streams Config* > *----------------------* > INFO 2018-02-21 07:11:20.209 [main] org.apache.kafka.streams. > StreamsConfig > - StreamsConfig values: > application.id = cv-v1 > application.server = > bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092] > buffered.records.per.partition = 1000 > cache.max.bytes.buffering = 104857600 > client.id = Engine2 > commit.interval.ms = 30000 > connections.max.idle.ms = 540000 > default.key.serde = class org.apache.kafka.common.serialization.Serdes$ > ByteArraySerde > default.timestamp.extractor = class org.apache.kafka.streams.processor. > FailOnInvalidTimestamp > default.value.serde = class org.apache.kafka.common.serialization.Serdes$ > ByteArraySerde > key.serde = null > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = DEBUG > metrics.sample.window.ms = 30000 > num.standby.replicas = 1 > num.stream.threads = 1 > partition.grouper = class org.apache.kafka.streams.processor. > DefaultPartitionGrouper > poll.ms = 100 > processing.guarantee = at_least_once > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > replication.factor = 2 > request.timeout.ms = 40000 > retry.backoff.ms = 100 > rocksdb.config.setter = null > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > state.cleanup.delay.ms = 600000 > state.dir = /mnt/store/kafka-streams > timestamp.extractor = null > value.serde = null > windowstore.changelog.additional.retention.ms = 86400000 > zookeeper.connect = > > > INFO 2018-02-21 07:11:20.398 [main] org.apache.kafka.streams. > StreamsConfig > - StreamsConfig values: > application.id = pe-v1 > application.server = newengine101:8080 > bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092] > buffered.records.per.partition = 1000 > cache.max.bytes.buffering = 2147483648 > client.id = Engine2 > commit.interval.ms = 30000 > connections.max.idle.ms = 540000 > default.key.serde = class org.apache.kafka.common.serialization.Serdes$ > ByteArraySerde > default.timestamp.extractor = class org.apache.kafka.streams.processor. > FailOnInvalidTimestamp > default.value.serde = class org.apache.kafka.common.serialization.Serdes$ > ByteArraySerde > key.serde = null > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = DEBUG > metrics.sample.window.ms = 30000 > num.standby.replicas = 1 > num.stream.threads = 6 > partition.grouper = class org.apache.kafka.streams.processor. > DefaultPartitionGrouper > poll.ms = 100 > processing.guarantee = at_least_once > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > replication.factor = 2 > request.timeout.ms = 40000 > retry.backoff.ms = 100 > rocksdb.config.setter = null > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > state.cleanup.delay.ms = 600000 > state.dir = /mnt/store/kafka-streams > timestamp.extractor = null > value.serde = null > windowstore.changelog.additional.retention.ms = 86400000 > zookeeper.connect = > > *More Logs* > > INFO 2018-02-21 13:33:19.083 [Engine2-StreamThread-1] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-1] Committed all active tasks [0_1, 1_0, 0_3, 2_2] > and standby tasks [] in 0ms > ERROR 2018-02-21 13:33:34.140 [kafka-producer-network-thread | > Engine2-StreamThread-2-producer] org.apache.kafka.streams. > processor.internals.RecordCollectorImpl - task [1_34] Error sending record > to topic cv-v1-cv. No more offsets will be recorded for this task and the > exception will eventually be thrown > org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for > cv-v1-cv-2: 31903 ms has passed since last append > ERROR 2018-02-21 13:33:34.140 [kafka-producer-network-thread | > Engine2-StreamThread-2-producer] org.apache.kafka.streams. > processor.internals.RecordCollectorImpl - task [1_10] Error sending record > to topic cv-v1-cv. No more offsets will be recorded for this task and the > exception will eventually be thrown > org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for > cv-v1-cv-2: 31903 ms has passed since last append > ERROR 2018-02-21 13:33:36.396 [kafka-producer-network-thread | > Engine2-StreamThread-4-producer] org.apache.kafka.streams. > processor.internals.RecordCollectorImpl - task [1_29] Error sending record > to topic cv-v1-cv. No more offsets will be recorded for this task and the > exception will eventually be thrown > org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for > cv-v1-cv-2: 34111 ms has passed since last append > ERROR 2018-02-21 13:33:36.396 [kafka-producer-network-thread | > Engine2-StreamThread-4-producer] org.apache.kafka.streams. > processor.internals.RecordCollectorImpl - task [1_18] Error sending record > to topic cv-v1-cv. No more offsets will be recorded for this task and the > exception will eventually be thrown > org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for > cv-v1-cv-2: 33431 ms has passed since last append > ERROR 2018-02-21 13:33:36.397 [kafka-producer-network-thread | > Engine2-StreamThread-4-producer] org.apache.kafka.streams. > processor.internals.RecordCollectorImpl - task [1_32] Error sending record > to topic cv-v1-cv. No more offsets will be recorded for this task and the > exception will eventually be thrown > org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for > cv-v1-cv-2: 32717 ms has passed since last append > ERROR 2018-02-21 13:33:40.770 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [Engine2-StreamThread-4] Failed to commit stream task 1_32 due to the > following error: > org.apache.kafka.streams.errors.StreamsException: task [1_32] exception > caught when producing > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:137) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush( > RecordCollectorImpl.java:145) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamTask.flushState( > StreamTask.java:296) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask$1.run(StreamTask.java:275) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:201) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply( > AssignedTasks.java:374) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks. > applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks. > punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread. > processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:513) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:482) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?] > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 > record(s) for cv-v1-cv-2: 32717 ms has passed since last append > ERROR 2018-02-21 13:33:40.774 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [Engine2-StreamThread-4] Failed to commit stream task 1_18 due to the > following error: > org.apache.kafka.streams.errors.ProcessorStateException: task [1_18] > Failed > to flush state store subs > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.flush( > ProcessorStateManager.java:257) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AbstractTask.flushState( > AbstractTask.java:194) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamTask.flushState( > StreamTask.java:295) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask$1.run(StreamTask.java:275) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:201) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply( > AssignedTasks.java:374) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks. > applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks. > punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread. > processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:513) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:482) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?] > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_18] > exception caught when producing > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:137) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:87) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange( > StoreChangeLogger.java:59) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:58) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:29) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. > measureLatency(MeteredKeyValueStore.java:218) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put( > MeteredKeyValueStore.java:133) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore. > putAndMaybeForward(CachingKeyValueStore.java:95) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply( > CachingKeyValueStore.java:78) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:141) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:99) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ThreadCache. > flush(ThreadCache.java:129) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush( > CachingKeyValueStore.java:107) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.flush( > ProcessorStateManager.java:255) ~[kafka-streams-0.11.0.2.jar:?] > ... 13 more > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74 > record(s) for cv-v1-cv-2: 33431 ms has passed since last append > ERROR 2018-02-21 13:33:40.774 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [Engine2-StreamThread-4] Failed to commit stream task 1_29 due to the > following error: > org.apache.kafka.streams.errors.ProcessorStateException: task [1_29] > Failed > to flush state store subs > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.flush( > ProcessorStateManager.java:257) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AbstractTask.flushState( > AbstractTask.java:194) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamTask.flushState( > StreamTask.java:295) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask$1.run(StreamTask.java:275) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:201) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply( > AssignedTasks.java:374) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks. > applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks. > punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread. > processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:513) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:482) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?] > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_29] > exception caught when producing > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:137) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:87) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange( > StoreChangeLogger.java:59) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:58) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:29) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. > measureLatency(MeteredKeyValueStore.java:218) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put( > MeteredKeyValueStore.java:133) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore. > putAndMaybeForward(CachingKeyValueStore.java:95) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply( > CachingKeyValueStore.java:78) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:141) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:99) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ThreadCache. > flush(ThreadCache.java:129) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush( > CachingKeyValueStore.java:107) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.flush( > ProcessorStateManager.java:255) ~[kafka-streams-0.11.0.2.jar:?] > ... 13 more > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 > record(s) for cv-v1-cv-2: 34111 ms has passed since last append > INFO 2018-02-21 13:33:40.775 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-4] Shutting down > INFO 2018-02-21 13:33:40.775 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-4] State transition from RUNNING to PENDING_SHUTDOWN. > DEBUG 2018-02-21 13:33:40.775 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-4] Shutting down all active tasks [1_32, 1_18, 7_15, > 1_24, 6_5, 6_7, 1_29], standby tasks [1_15, 1_3, 1_19, 1_8], suspended > tasks [6_12, 1_34, 6_14, 1_22, 1_10, 1_28], and suspended standby tasks > [1_32, 1_5, 1_23, 1_8] > ERROR 2018-02-21 13:33:40.876 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.ProcessorStateManager - task > [1_18] Failed to close state store vstore: > org.apache.kafka.streams.errors.StreamsException: task [1_18] exception > caught when producing > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:137) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:87) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange( > StoreChangeLogger.java:59) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:58) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:29) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. > measureLatency(MeteredKeyValueStore.java:218) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put( > MeteredKeyValueStore.java:133) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore. > putAndMaybeForward(CachingKeyValueStore.java:95) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply( > CachingKeyValueStore.java:78) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:141) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:99) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ThreadCache. > flush(ThreadCache.java:129) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush( > CachingKeyValueStore.java:107) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close( > CachingKeyValueStore.java:113) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.close( > ProcessorStateManager.java:278) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AbstractTask. > closeStateManager(AbstractTask.java:232) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended( > StreamTask.java:410) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.close(StreamTask.java:479) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread. > shutdownTasksAndState(StreamThread.java:1009) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.shutdown( > StreamThread.java:965) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?] > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74 > record(s) for cv-v1-cv-2: 33431 ms has passed since last append > ERROR 2018-02-21 13:33:40.877 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.StreamTask - task [1_18] > Could > not close state manager due to the following error: > org.apache.kafka.streams.errors.ProcessorStateException: task [1_18] > Failed > to close state store vstore > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.close( > ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AbstractTask. > closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended( > StreamTask.java:410) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.close(StreamTask.java:479) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread. > shutdownTasksAndState(StreamThread.java:1009) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.shutdown( > StreamThread.java:965) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?] > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_18] > exception caught when producing > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:137) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:87) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange( > StoreChangeLogger.java:59) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:58) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:29) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. > measureLatency(MeteredKeyValueStore.java:218) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put( > MeteredKeyValueStore.java:133) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore. > putAndMaybeForward(CachingKeyValueStore.java:95) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply( > CachingKeyValueStore.java:78) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:141) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:99) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ThreadCache. > flush(ThreadCache.java:129) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush( > CachingKeyValueStore.java:107) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close( > CachingKeyValueStore.java:113) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.close( > ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?] > ... 6 more > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74 > record(s) for cv-v1-cv-2: 33431 ms has passed since last append > ERROR 2018-02-21 13:33:40.877 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-4] Failed while closing StreamTask 1_18 due to the > following error: > org.apache.kafka.streams.errors.ProcessorStateException: task [1_18] > Failed > to close state store vstore > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.close( > ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AbstractTask. > closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended( > StreamTask.java:410) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.close(StreamTask.java:479) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread. > shutdownTasksAndState(StreamThread.java:1009) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.shutdown( > StreamThread.java:965) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?] > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_18] > exception caught when producing > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:137) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:87) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange( > StoreChangeLogger.java:59) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:58) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:29) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. > measureLatency(MeteredKeyValueStore.java:218) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put( > MeteredKeyValueStore.java:133) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore. > putAndMaybeForward(CachingKeyValueStore.java:95) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply( > CachingKeyValueStore.java:78) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:141) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:99) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ThreadCache. > flush(ThreadCache.java:129) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush( > CachingKeyValueStore.java:107) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close( > CachingKeyValueStore.java:113) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.close( > ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?] > ... 6 more > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74 > record(s) for cv-v1-cv-2: 33431 ms has passed since last append > ERROR 2018-02-21 13:33:40.986 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.ProcessorStateManager - task > [1_29] Failed to close state store vstore: > org.apache.kafka.streams.errors.StreamsException: task [1_29] exception > caught when producing > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:137) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:87) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange( > StoreChangeLogger.java:59) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:58) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:29) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. > measureLatency(MeteredKeyValueStore.java:218) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put( > MeteredKeyValueStore.java:133) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore. > putAndMaybeForward(CachingKeyValueStore.java:95) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply( > CachingKeyValueStore.java:78) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:141) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:99) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ThreadCache. > flush(ThreadCache.java:129) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush( > CachingKeyValueStore.java:107) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close( > CachingKeyValueStore.java:113) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.close( > ProcessorStateManager.java:278) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AbstractTask. > closeStateManager(AbstractTask.java:232) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended( > StreamTask.java:410) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.close(StreamTask.java:479) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread. > shutdownTasksAndState(StreamThread.java:1009) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.shutdown( > StreamThread.java:965) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?] > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 > record(s) for cv-v1-cv-2: 34111 ms has passed since last append > ERROR 2018-02-21 13:33:40.986 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.StreamTask - task [1_29] > Could > not close state manager due to the following error: > org.apache.kafka.streams.errors.ProcessorStateException: task [1_29] > Failed > to close state store vstore > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.close( > ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AbstractTask. > closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended( > StreamTask.java:410) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.close(StreamTask.java:479) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread. > shutdownTasksAndState(StreamThread.java:1009) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.shutdown( > StreamThread.java:965) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?] > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_29] > exception caught when producing > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:137) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:87) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange( > StoreChangeLogger.java:59) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:58) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:29) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. > measureLatency(MeteredKeyValueStore.java:218) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put( > MeteredKeyValueStore.java:133) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore. > putAndMaybeForward(CachingKeyValueStore.java:95) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply( > CachingKeyValueStore.java:78) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:141) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:99) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ThreadCache. > flush(ThreadCache.java:129) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush( > CachingKeyValueStore.java:107) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close( > CachingKeyValueStore.java:113) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.close( > ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?] > ... 6 more > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 > record(s) for cv-v1-cv-2: 34111 ms has passed since last append > ERROR 2018-02-21 13:33:40.986 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-4] Failed while closing StreamTask 1_29 due to the > following error: > org.apache.kafka.streams.errors.ProcessorStateException: task [1_29] > Failed > to close state store vstore > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.close( > ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AbstractTask. > closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended( > StreamTask.java:410) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.close(StreamTask.java:479) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread. > shutdownTasksAndState(StreamThread.java:1009) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.shutdown( > StreamThread.java:965) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?] > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_29] > exception caught when producing > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:137) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send( > RecordCollectorImpl.java:87) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange( > StoreChangeLogger.java:59) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:58) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor > e.put(ChangeLoggingKeyValueBytesStore.java:29) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3. > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore. > measureLatency(MeteredKeyValueStore.java:218) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put( > MeteredKeyValueStore.java:133) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore. > putAndMaybeForward(CachingKeyValueStore.java:95) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply( > CachingKeyValueStore.java:78) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:141) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.NamedCache. > flush(NamedCache.java:99) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.ThreadCache. > flush(ThreadCache.java:129) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush( > CachingKeyValueStore.java:107) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close( > CachingKeyValueStore.java:113) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.close( > ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?] > ... 6 more > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 > record(s) for cv-v1-cv-2: 34111 ms has passed since last append > INFO 2018-02-21 13:33:41.052 [Engine2-StreamThread-4] > org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka > producer with timeoutMillis = 9223372036854775807 ms. > INFO 2018-02-21 13:33:41.054 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-4] Stream thread shutdown complete > INFO 2018-02-21 13:33:41.054 [Engine2-StreamThread-4] > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [Engine2-StreamThread-4] State transition from PENDING_SHUTDOWN to DEAD. > org.apache.kafka.streams.errors.StreamsException: task [1_32] exception > caught when producing > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:137) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush( > RecordCollectorImpl.java:145) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamTask.flushState( > StreamTask.java:296) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask$1.run(StreamTask.java:275) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:201) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply( > AssignedTasks.java:374) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks. > applyToRunningTasks(AssignedTasks.java:420) ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.processor.internals.AssignedTasks. > punctuateAndCommit(AssignedTasks.java:357) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread. > processAndPunctuate(StreamThread.java:662) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:513) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:482) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:459) ~[kafka-streams-0.11.0.2.jar:?] > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 > record(s) for cv-v1-cv-2: 32717 ms has passed since last append > ERROR 2018-02-21 13:33:42.590 [Engine2-StreamThread-2] > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [Engine2-StreamThread-2] Failed to commit stream task 1_34 due to the > following error: > org.apache.kafka.streams.errors.StreamsException: task [1_34] exception > caught when producing > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:137) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush( > RecordCollectorImpl.java:145) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamTask.flushState( > StreamTask.java:296) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask$1.run(StreamTask.java:275) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:201) > ~[kafka-streams-0.11.0.2.jar:? > ] > at org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply( > AssignedTasks.java:374) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks. > applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AssignedTasks. > punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread. > processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:513) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:482) > [kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?] > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73 > record(s) for cv-v1-cv-2: 31903 ms has passed since last append > ERROR 2018-02-21 13:33:42.591 [Engine2-StreamThread-2] > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [Engine2-StreamThread-2] Failed to commit stream task 1_10 due to the > following error: > org.apache.kafka.streams.errors.ProcessorStateException: task [1_10] > Failed > to flush state store subs > at org.apache.kafka.streams.processor.internals. > ProcessorStateManager.flush( > ProcessorStateManager.java:257) ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.AbstractTask.flushState( > AbstractTask.java:194) > ~[kafka-streams-0.11.0.2.jar:?] > at org.apache.kafka.streams.processor.internals.StreamTask.flushState( > StreamTask.java:295) > ~[kafka-streams-0.11.0.2.jar:?] > > Thanks, > Tony >