Hi Tony,

Looks like you have a known issue that KIP-91(
https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer)
will address.

In the meantime, as a workaround, you could try
setting REQUEST_TIMEOUT_MS_CONFIG to a large value (Integer.MAX_VALUE ?)
other secondary configurations to consider changing would be increasing "
max.block.ms" and "retries"

Thanks,
Bill

On Thu, Feb 22, 2018 at 8:14 AM, Tony John <tonyjohnant...@gmail.com> wrote:

> Hi All,
>
> I am running into an issue with my Kafka Streams application. The
> application was running fine for almost 2 weeks, then it started throwing
> the below exception which caused the threads to die. Now when I restart the
> application, it dies quickly (1-2 hrs) when trying to catch up the lag.
>
> The application is running on an AWS EC2 instance with 8 core processor and
> 16GB of memory. The streams config is given below and more logs are
> available below (I have stripped of some logs which I though may not be
> relevant). Towards the end of this thread you will be able to see lot of
> exceptions similar to the below one + RocksDBExceptions
> (org.apache.kafka.streams.errors.ProcessorStateException: Error opening
> store vstore at location
> /mnt/store/kafka-streams/pe-v1/1_10/rocksdb/vstore). Could you please take
> a look at it and let me know what could be wrong here?
>
> INFO  2018-02-21 08:37:20.758 [Engine2-StreamThread-2]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-2] Committed all active tasks [7_12, 6_3, 1_27] and
> standby tasks [4_0, 1_30] in 0ms
> ERROR 2018-02-21 08:37:24.853 [kafka-producer-network-thread |
> Engine2-StreamThread-6-producer]
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task
> [1_34] Error sending record to topic cv-v1-cv. No more offsets will be
> recorded for this task and the exception will eventually be thrown
> org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for
> cv-v1-cv-2: 31439 ms has passed since last append
> DEBUG 2018-02-21 08:37:24.859 [Engine2-StreamThread-3]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-3] processing latency 46282 > commit time 30000 for
> 10000 records. Adjusting down recordsProcessedBeforeCommit=6482
> ERROR 2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread
> [Engine2-StreamThread-6] Failed to commit stream task 1_34 due to the
> following error:
> org.apache.kafka.streams.errors.StreamsException: task [1_34] exception
> caught when producing
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:137)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
> RecordCollectorImpl.java:145)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> StreamTask.java:296)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.java:275)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:201)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> AssignedTasks.java:374)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.
> processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:513)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:482)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?]
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74
> record(s) for cv-v1-cv-2: 31439 ms has passed since last append
> INFO  2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-6] Shutting down
> INFO  2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-6] State transition from PARTITIONS_ASSIGNED to
> PENDING_SHUTDOWN.
> DEBUG 2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-6] Shutting down all active tasks [6_12, 1_34, 7_15],
> standby tasks [1_2, 1_7], suspended tasks [1_18, 7_15, 6_3, 1_27], and
> suspended standby tasks [1_3, 1_30]
> INFO  2018-02-21 08:37:24.879 [Engine2-StreamThread-3]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-3] Committed all active tasks [1_31, 1_24, 6_7] and
> standby tasks [1_0, 1_6, 1_13] in 0ms
> INFO  2018-02-21 08:37:24.954 [Engine2-StreamThread-6]
> org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka
> producer with timeoutMillis = 9223372036854775807 ms.
> INFO  2018-02-21 08:37:24.957 [Engine2-StreamThread-6]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-6] Stream thread shutdown complete
> INFO  2018-02-21 08:37:24.957 [Engine2-StreamThread-6]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-6] State transition from PENDING_SHUTDOWN to DEAD.
>
>
> *Streams Config*
> *----------------------*
> INFO  2018-02-21 07:11:20.209 [main] org.apache.kafka.streams.
> StreamsConfig
> - StreamsConfig values:
> application.id = cv-v1
> application.server =
> bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092]
> buffered.records.per.partition = 1000
> cache.max.bytes.buffering = 104857600
> client.id = Engine2
> commit.interval.ms = 30000
> connections.max.idle.ms = 540000
> default.key.serde = class org.apache.kafka.common.serialization.Serdes$
> ByteArraySerde
> default.timestamp.extractor = class org.apache.kafka.streams.processor.
> FailOnInvalidTimestamp
> default.value.serde = class org.apache.kafka.common.serialization.Serdes$
> ByteArraySerde
> key.serde = null
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = DEBUG
> metrics.sample.window.ms = 30000
> num.standby.replicas = 1
> num.stream.threads = 1
> partition.grouper = class org.apache.kafka.streams.processor.
> DefaultPartitionGrouper
> poll.ms = 100
> processing.guarantee = at_least_once
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> replication.factor = 2
> request.timeout.ms = 40000
> retry.backoff.ms = 100
> rocksdb.config.setter = null
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> state.cleanup.delay.ms = 600000
> state.dir = /mnt/store/kafka-streams
> timestamp.extractor = null
> value.serde = null
> windowstore.changelog.additional.retention.ms = 86400000
> zookeeper.connect =
>
>
> INFO  2018-02-21 07:11:20.398 [main] org.apache.kafka.streams.
> StreamsConfig
> - StreamsConfig values:
> application.id = pe-v1
> application.server = newengine101:8080
> bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092]
> buffered.records.per.partition = 1000
> cache.max.bytes.buffering = 2147483648
> client.id = Engine2
> commit.interval.ms = 30000
> connections.max.idle.ms = 540000
> default.key.serde = class org.apache.kafka.common.serialization.Serdes$
> ByteArraySerde
> default.timestamp.extractor = class org.apache.kafka.streams.processor.
> FailOnInvalidTimestamp
> default.value.serde = class org.apache.kafka.common.serialization.Serdes$
> ByteArraySerde
> key.serde = null
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = DEBUG
> metrics.sample.window.ms = 30000
> num.standby.replicas = 1
> num.stream.threads = 6
> partition.grouper = class org.apache.kafka.streams.processor.
> DefaultPartitionGrouper
> poll.ms = 100
> processing.guarantee = at_least_once
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> replication.factor = 2
> request.timeout.ms = 40000
> retry.backoff.ms = 100
> rocksdb.config.setter = null
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> state.cleanup.delay.ms = 600000
> state.dir = /mnt/store/kafka-streams
> timestamp.extractor = null
> value.serde = null
> windowstore.changelog.additional.retention.ms = 86400000
> zookeeper.connect =
>
> *More Logs*
>
> INFO  2018-02-21 13:33:19.083 [Engine2-StreamThread-1]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-1] Committed all active tasks [0_1, 1_0, 0_3, 2_2]
> and standby tasks [] in 0ms
> ERROR 2018-02-21 13:33:34.140 [kafka-producer-network-thread |
> Engine2-StreamThread-2-producer] org.apache.kafka.streams.
> processor.internals.RecordCollectorImpl - task [1_34] Error sending record
> to topic cv-v1-cv. No more offsets will be recorded for this task and the
> exception will eventually be thrown
> org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for
> cv-v1-cv-2: 31903 ms has passed since last append
> ERROR 2018-02-21 13:33:34.140 [kafka-producer-network-thread |
> Engine2-StreamThread-2-producer] org.apache.kafka.streams.
> processor.internals.RecordCollectorImpl - task [1_10] Error sending record
> to topic cv-v1-cv. No more offsets will be recorded for this task and the
> exception will eventually be thrown
> org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for
> cv-v1-cv-2: 31903 ms has passed since last append
> ERROR 2018-02-21 13:33:36.396 [kafka-producer-network-thread |
> Engine2-StreamThread-4-producer] org.apache.kafka.streams.
> processor.internals.RecordCollectorImpl - task [1_29] Error sending record
> to topic cv-v1-cv. No more offsets will be recorded for this task and the
> exception will eventually be thrown
> org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for
> cv-v1-cv-2: 34111 ms has passed since last append
> ERROR 2018-02-21 13:33:36.396 [kafka-producer-network-thread |
> Engine2-StreamThread-4-producer] org.apache.kafka.streams.
> processor.internals.RecordCollectorImpl - task [1_18] Error sending record
> to topic cv-v1-cv. No more offsets will be recorded for this task and the
> exception will eventually be thrown
> org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for
> cv-v1-cv-2: 33431 ms has passed since last append
> ERROR 2018-02-21 13:33:36.397 [kafka-producer-network-thread |
> Engine2-StreamThread-4-producer] org.apache.kafka.streams.
> processor.internals.RecordCollectorImpl - task [1_32] Error sending record
> to topic cv-v1-cv. No more offsets will be recorded for this task and the
> exception will eventually be thrown
> org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s) for
> cv-v1-cv-2: 32717 ms has passed since last append
> ERROR 2018-02-21 13:33:40.770 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread
> [Engine2-StreamThread-4] Failed to commit stream task 1_32 due to the
> following error:
> org.apache.kafka.streams.errors.StreamsException: task [1_32] exception
> caught when producing
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:137)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
> RecordCollectorImpl.java:145)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> StreamTask.java:296)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.java:275)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:201)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> AssignedTasks.java:374)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.
> processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:513)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:482)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?]
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> record(s) for cv-v1-cv-2: 32717 ms has passed since last append
> ERROR 2018-02-21 13:33:40.774 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread
> [Engine2-StreamThread-4] Failed to commit stream task 1_18 due to the
> following error:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_18]
> Failed
> to flush state store subs
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(
> ProcessorStateManager.java:257) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(
> AbstractTask.java:194)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> StreamTask.java:295)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.java:275)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:201)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> AssignedTasks.java:374)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.
> processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:513)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:482)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?]
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_18]
> exception caught when producing
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:137)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:87)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> StoreChangeLogger.java:59)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:58)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:29)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> measureLatency(MeteredKeyValueStore.java:218)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> MeteredKeyValueStore.java:133)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> putAndMaybeForward(CachingKeyValueStore.java:95)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(
> CachingKeyValueStore.java:78)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:141)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:99)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ThreadCache.
> flush(ThreadCache.java:129)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> CachingKeyValueStore.java:107)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(
> ProcessorStateManager.java:255) ~[kafka-streams-0.11.0.2.jar:?]
> ... 13 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74
> record(s) for cv-v1-cv-2: 33431 ms has passed since last append
> ERROR 2018-02-21 13:33:40.774 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread
> [Engine2-StreamThread-4] Failed to commit stream task 1_29 due to the
> following error:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_29]
> Failed
> to flush state store subs
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(
> ProcessorStateManager.java:257) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(
> AbstractTask.java:194)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> StreamTask.java:295)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.java:275)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:201)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> AssignedTasks.java:374)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.
> processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:513)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:482)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?]
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_29]
> exception caught when producing
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:137)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:87)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> StoreChangeLogger.java:59)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:58)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:29)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> measureLatency(MeteredKeyValueStore.java:218)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> MeteredKeyValueStore.java:133)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> putAndMaybeForward(CachingKeyValueStore.java:95)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(
> CachingKeyValueStore.java:78)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:141)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:99)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ThreadCache.
> flush(ThreadCache.java:129)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> CachingKeyValueStore.java:107)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(
> ProcessorStateManager.java:255) ~[kafka-streams-0.11.0.2.jar:?]
> ... 13 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> record(s) for cv-v1-cv-2: 34111 ms has passed since last append
> INFO  2018-02-21 13:33:40.775 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-4] Shutting down
> INFO  2018-02-21 13:33:40.775 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-4] State transition from RUNNING to PENDING_SHUTDOWN.
> DEBUG 2018-02-21 13:33:40.775 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-4] Shutting down all active tasks [1_32, 1_18, 7_15,
> 1_24, 6_5, 6_7, 1_29], standby tasks [1_15, 1_3, 1_19, 1_8], suspended
> tasks [6_12, 1_34, 6_14, 1_22, 1_10, 1_28], and suspended standby tasks
> [1_32, 1_5, 1_23, 1_8]
> ERROR 2018-02-21 13:33:40.876 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.ProcessorStateManager - task
> [1_18] Failed to close state store vstore:
> org.apache.kafka.streams.errors.StreamsException: task [1_18] exception
> caught when producing
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:137)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:87)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> StoreChangeLogger.java:59)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:58)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:29)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> measureLatency(MeteredKeyValueStore.java:218)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> MeteredKeyValueStore.java:133)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> putAndMaybeForward(CachingKeyValueStore.java:95)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(
> CachingKeyValueStore.java:78)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:141)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:99)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ThreadCache.
> flush(ThreadCache.java:129)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> CachingKeyValueStore.java:107)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(
> CachingKeyValueStore.java:113)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.close(
> ProcessorStateManager.java:278) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AbstractTask.
> closeStateManager(AbstractTask.java:232) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(
> StreamTask.java:410)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.close(StreamTask.java:479)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.
> shutdownTasksAndState(StreamThread.java:1009)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> StreamThread.java:965)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?]
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74
> record(s) for cv-v1-cv-2: 33431 ms has passed since last append
> ERROR 2018-02-21 13:33:40.877 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.StreamTask - task [1_18]
> Could
> not close state manager due to the following error:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_18]
> Failed
> to close state store vstore
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.close(
> ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AbstractTask.
> closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(
> StreamTask.java:410)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.close(StreamTask.java:479)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.
> shutdownTasksAndState(StreamThread.java:1009)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> StreamThread.java:965)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?]
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_18]
> exception caught when producing
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:137)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:87)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> StoreChangeLogger.java:59)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:58)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:29)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> measureLatency(MeteredKeyValueStore.java:218)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> MeteredKeyValueStore.java:133)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> putAndMaybeForward(CachingKeyValueStore.java:95)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(
> CachingKeyValueStore.java:78)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:141)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:99)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ThreadCache.
> flush(ThreadCache.java:129)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> CachingKeyValueStore.java:107)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(
> CachingKeyValueStore.java:113)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.close(
> ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?]
> ... 6 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74
> record(s) for cv-v1-cv-2: 33431 ms has passed since last append
> ERROR 2018-02-21 13:33:40.877 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-4] Failed while closing StreamTask 1_18 due to the
> following error:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_18]
> Failed
> to close state store vstore
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.close(
> ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AbstractTask.
> closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(
> StreamTask.java:410)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.close(StreamTask.java:479)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.
> shutdownTasksAndState(StreamThread.java:1009)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> StreamThread.java:965)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?]
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_18]
> exception caught when producing
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:137)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:87)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> StoreChangeLogger.java:59)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:58)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:29)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> measureLatency(MeteredKeyValueStore.java:218)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> MeteredKeyValueStore.java:133)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> putAndMaybeForward(CachingKeyValueStore.java:95)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(
> CachingKeyValueStore.java:78)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:141)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:99)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ThreadCache.
> flush(ThreadCache.java:129)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> CachingKeyValueStore.java:107)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(
> CachingKeyValueStore.java:113)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.close(
> ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?]
> ... 6 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74
> record(s) for cv-v1-cv-2: 33431 ms has passed since last append
> ERROR 2018-02-21 13:33:40.986 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.ProcessorStateManager - task
> [1_29] Failed to close state store vstore:
> org.apache.kafka.streams.errors.StreamsException: task [1_29] exception
> caught when producing
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:137)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:87)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> StoreChangeLogger.java:59)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:58)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:29)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> measureLatency(MeteredKeyValueStore.java:218)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> MeteredKeyValueStore.java:133)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> putAndMaybeForward(CachingKeyValueStore.java:95)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(
> CachingKeyValueStore.java:78)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:141)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:99)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ThreadCache.
> flush(ThreadCache.java:129)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> CachingKeyValueStore.java:107)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(
> CachingKeyValueStore.java:113)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.close(
> ProcessorStateManager.java:278) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AbstractTask.
> closeStateManager(AbstractTask.java:232) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(
> StreamTask.java:410)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.close(StreamTask.java:479)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.
> shutdownTasksAndState(StreamThread.java:1009)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> StreamThread.java:965)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?]
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> record(s) for cv-v1-cv-2: 34111 ms has passed since last append
> ERROR 2018-02-21 13:33:40.986 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.StreamTask - task [1_29]
> Could
> not close state manager due to the following error:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_29]
> Failed
> to close state store vstore
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.close(
> ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AbstractTask.
> closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(
> StreamTask.java:410)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.close(StreamTask.java:479)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.
> shutdownTasksAndState(StreamThread.java:1009)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> StreamThread.java:965)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?]
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_29]
> exception caught when producing
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:137)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:87)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> StoreChangeLogger.java:59)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:58)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:29)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> measureLatency(MeteredKeyValueStore.java:218)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> MeteredKeyValueStore.java:133)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> putAndMaybeForward(CachingKeyValueStore.java:95)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(
> CachingKeyValueStore.java:78)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:141)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:99)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ThreadCache.
> flush(ThreadCache.java:129)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> CachingKeyValueStore.java:107)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(
> CachingKeyValueStore.java:113)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.close(
> ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?]
> ... 6 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> record(s) for cv-v1-cv-2: 34111 ms has passed since last append
> ERROR 2018-02-21 13:33:40.986 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-4] Failed while closing StreamTask 1_29 due to the
> following error:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_29]
> Failed
> to close state store vstore
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.close(
> ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AbstractTask.
> closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(
> StreamTask.java:410)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.close(StreamTask.java:479)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.
> shutdownTasksAndState(StreamThread.java:1009)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> StreamThread.java:965)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?]
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_29]
> exception caught when producing
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:137)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:87)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> StoreChangeLogger.java:59)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:58)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStor
> e.put(ChangeLoggingKeyValueBytesStore.java:29)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> measureLatency(MeteredKeyValueStore.java:218)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> MeteredKeyValueStore.java:133)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> putAndMaybeForward(CachingKeyValueStore.java:95)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(
> CachingKeyValueStore.java:78)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:141)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.NamedCache.
> flush(NamedCache.java:99)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.ThreadCache.
> flush(ThreadCache.java:129)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> CachingKeyValueStore.java:107)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(
> CachingKeyValueStore.java:113)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.close(
> ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?]
> ... 6 more
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> record(s) for cv-v1-cv-2: 34111 ms has passed since last append
> INFO  2018-02-21 13:33:41.052 [Engine2-StreamThread-4]
> org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka
> producer with timeoutMillis = 9223372036854775807 ms.
> INFO  2018-02-21 13:33:41.054 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-4] Stream thread shutdown complete
> INFO  2018-02-21 13:33:41.054 [Engine2-StreamThread-4]
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [Engine2-StreamThread-4] State transition from PENDING_SHUTDOWN to DEAD.
> org.apache.kafka.streams.errors.StreamsException: task [1_32] exception
> caught when producing
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:137)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
> RecordCollectorImpl.java:145)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> StreamTask.java:296)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.java:275)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:201)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> AssignedTasks.java:374)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> applyToRunningTasks(AssignedTasks.java:420) ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> punctuateAndCommit(AssignedTasks.java:357) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.
> processAndPunctuate(StreamThread.java:662) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:513)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:482)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:459) ~[kafka-streams-0.11.0.2.jar:?]
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> record(s) for cv-v1-cv-2: 32717 ms has passed since last append
> ERROR 2018-02-21 13:33:42.590 [Engine2-StreamThread-2]
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread
> [Engine2-StreamThread-2] Failed to commit stream task 1_34 due to the
> following error:
> org.apache.kafka.streams.errors.StreamsException: task [1_34] exception
> caught when producing
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:137)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
> RecordCollectorImpl.java:145)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> StreamTask.java:296)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.java:275)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:201)
> ~[kafka-streams-0.11.0.2.jar:?
> ]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> AssignedTasks.java:374)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AssignedTasks.
> punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.
> processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:513)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:482)
> [kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?]
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> record(s) for cv-v1-cv-2: 31903 ms has passed since last append
> ERROR 2018-02-21 13:33:42.591 [Engine2-StreamThread-2]
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread
> [Engine2-StreamThread-2] Failed to commit stream task 1_10 due to the
> following error:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_10]
> Failed
> to flush state store subs
> at org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(
> ProcessorStateManager.java:257) ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(
> AbstractTask.java:194)
> ~[kafka-streams-0.11.0.2.jar:?]
> at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> StreamTask.java:295)
> ~[kafka-streams-0.11.0.2.jar:?]
>
> Thanks,
> Tony
>

Reply via email to