We're consistently seeing TimeoutExceptions on Kafka Streams 1.0.0, even when 
not under load, exclusively for state change log topic updates. Tried changing 
`request.timeout.ms` to 60s but it doesn't help. The exception is thrown 
continuously for what looks to be every attempt to update any of the state 
change logs.

We're tried running the Kafka Streams on a 2 to 20 node cluster, connecting to 
a remote broker cluster. The topology is quite simple (7 processors, 2 state 
stores). The brokers have been handling significant loads from other sources 
(librdkafka & Java producers) without any issues, so I doubt there is any 
problem there. 

Another interesting fact we see (likely unrelated) after these timeout 
exceptions is that calling `.close(5, TimeUnit.Seconds)` on the application 
(catching the `TimeoutException` via a `GlobalExceptionHandler`), makes the JVM 
process hang every time, and we need to manually kill it.

Also tried updating to 1.1.0 to check if the new config param `retries` set to 
1 makes any difference, and it doesn't. `default.production.exception.handler` 
does trigger for this situation but doesn't offer anything different from the 
`GlobalExceptionHandler`.  

One possible source of the problem that comes to mind is that the input and 
output topics are long lived (i.e. the broker cluster along with the ZK quorum) 
while the Kafka Streams cluster is an ephemeral cluster which we take down and 
up in minutes (and we do this quite often, for config changes). When recreating 
this cluster the hosts are changed completely (the apps run in Docker 
containers in an AWS autoscaling group). Is there any long-lived relation 
between Kafka Streams state change logs and the Kafka Streams process hosts 
that breaks when recreating the Kafka Streams nodes? The first time we create 
the Kafka Streams cluster on a new broker cluster, things seem to run 
relatively well. 

Here's an example stack trace:
2018-03-29 13:15:15,614 [kafka-producer-network-thread | 
app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1-producer] ERROR 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task [0_19] 
Error sending record (key ... value [...] timestamp 1522327436038) to topic 
app-sessionStateStore-changelog due to {}; No more records will be sent and no 
more offsets will be recorded for this task.
org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for 
app-sessionStateStore-changelog-19: 30002 ms has passed since last append
2018-03-29 13:15:19,470 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] ERROR 
org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] Failed to commit 
stream task 0_19 due to the following error:
org.apache.kafka.streams.errors.StreamsException: task [0_19] Abort sending 
since an error caught with a previous record (key ... value ... timestamp 
1522327436038) to topic app-sessionStateStore-changelog due to 
org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for 
app-sessionStateStore-changelog-19: 30002 ms has passed since last append.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
        at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 23 
record(s) for app-sessionStateStore-changelog-19: 30002 ms has passed since 
last append
2018-03-29 13:15:19,471 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO  
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] State transition from 
PARTITIONS_ASSIGNED to PENDING_SHUTDOWN
2018-03-29 13:15:19,471 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO  
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] Shutting down
2018-03-29 13:15:19,885 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO  
org.apache.kafka.clients.producer.KafkaProducer - [Producer 
clientId=app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1-producer] 
Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-03-29 13:15:19,896 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO  
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] State transition from 
PENDING_SHUTDOWN to DEAD
2018-03-29 13:15:19,896 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO  
org.apache.kafka.streams.KafkaStreams - stream-client 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133]State transition from REBALANCING to 
ERROR
2018-03-29 13:15:19,896 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] WARN  
org.apache.kafka.streams.KafkaStreams - stream-client 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133]All stream threads have died. The 
instance will be in error state and should be closed.
2018-03-29 13:15:19,896 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] INFO  
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] Shutdown complete
2018-03-29 13:15:19,896 
[app-ef44224d-ab04-47ea-b2f3-2b74bac22133-StreamThread-1] ERROR com.x.y.z.App$ 
- Thread 13 died with exception task [0_19] Abort sending since an error caught 
with a previous record (key ... value ... timestamp 1522327436038) to topic 
app-sessionStateStore-changelog due to 
org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for 
app-sessionStateStore-changelog-19: 30002 ms has passed since last append.. 
Shutting down the Kafka Streams application
org.apache.kafka.streams.errors.StreamsException: task [0_19] Abort sending 
since an error caught with a previous record (key ... value ... timestamp 
1522327436038) to topic app-sessionStateStore-changelog due to 
org.apache.kafka.common.errors.TimeoutException: Expiring 23 record(s) for 
app-sessionStateStore-changelog-19: 30002 ms has passed since last append.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
        at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 23 
record(s) for app-sessionStateStore-changelog-19: 30002 ms has passed since 
last append

Reply via email to