[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075541#comment-16075541
 ] 

Guozhang Wang edited comment on KAFKA-5545 at 7/5/17 10:34 PM:
---------------------------------------------------------------

[~yogeshbelur] In your code snippet it seems you did not ever close the 
instance before creating the new instance and then call {{cleanUp}}, or are the 
{{close()}} and {{start()}} calls for the previous instance (it is hard to tell 
how {{setupDiscovery}} is triggered)? 

{code}
close();
streams = new KafkaStreams(buildTopology(config), config);
logger.info("cleaning up oldBootstrapServerString [" + oldBootstrapServerString 
+ "].");
streams.cleanUp();
start();
{code}

Anyways, if {{Streams.close()}} is indeed called, then the producer will be 
closed in that function and the inner {{Sender}} thread will be terminated and 
not try to connect to the broker anymore. But from your attached logs it does 
seems the thread was notified to shutdown but never existed the main loop:

{code}
10:02:33.981 [pool-1-thread-1] INFO  o.apache.kafka.streams.KafkaStreams - 
stream-client [ks_0_inst] State transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.987 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Informed 
thread to shut down
10:02:33.987 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-2] Informed 
thread to shut down
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-2] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-3] Informed 
thread to shut down
10:02:33.988 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-3] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-4] Informed 
thread to shut down
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-4] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-5] Informed 
thread to shut down
10:02:33.989 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-5] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.990 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-6] Informed 
thread to shut down
10:02:33.990 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-6] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.990 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-7] Informed 
thread to shut down
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-7] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-8] Informed 
thread to shut down
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-8] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-9] Informed 
thread to shut down
10:02:33.991 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-9] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-10] Informed 
thread to shut down
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-10] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-11] Informed 
thread to shut down
10:02:33.992 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-11] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.995 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-12] Informed 
thread to shut down
10:02:33.995 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-12] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.995 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-13] Informed 
thread to shut down
10:02:33.995 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-13] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.996 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-14] Informed 
thread to shut down
10:02:33.996 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-14] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.996 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-15] Informed 
thread to shut down
10:02:33.997 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-15] State 
transition from RUNNING to PENDING_SHUTDOWN.
10:02:33.997 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-16] Informed 
thread to shut down
10:02:33.997 [kafka-streams-close-thread] INFO  
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-16] State 
transition from RUNNING to PENDING_SHUTDOWN.

... There is no entry showing `Shutting down`...
{code}




was (Author: guozhang):
[~yogeshbelur] In your code snippet it seems you did not ever close the 
instance before creating the new instance and then call {{cleanUp}}, or are the 
{{close()}} and {{start()}} calls for the previous instance (it is hard to tell 
how {{setupDiscovery}} is triggered)? 

{code}
close();
streams = new KafkaStreams(buildTopology(config), config);
logger.info("cleaning up oldBootstrapServerString [" + oldBootstrapServerString 
+ "].");
streams.cleanUp();
start();
{code}

Anyways, if {{Streams.close()}} is indeed called, then the producer will be 
closed in that function and the inner {{Sender}} thread will be terminated and 
not try to connect to the broker anymore.

> Kafka Stream not able to successfully restart over new broker ip
> ----------------------------------------------------------------
>
>                 Key: KAFKA-5545
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5545
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Yogesh BG
>            Priority: Critical
>         Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.035 [StreamThread-41] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_1. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the 
> state directory for task 0_1
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.037 [StreamThread-37] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_3. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_3] Failed to lock the 
> state directory for task 0_3
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-34] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_7. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_7] Failed to lock the 
> state directory for task 0_7
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-43] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_6. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_6] Failed to lock the 
> state directory for task 0_6
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-45] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_0. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the 
> state directory for task 0_0
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-36] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_4. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_4] Failed to lock the 
> state directory for task 0_4
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:09.039 [StreamThread-48] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_2. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_2] Failed to lock the 
> state directory for task 0_2
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
> [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 11:04:13.642 [StreamThread-44] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-44] Committing all tasks because the commit 
> interval 10000ms has elapsed
> 11:04:13.642 [StreamThread-47] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-47] Committing all tasks because the commit 
> interval 10000ms has elapsed
> 11:04:13.642 [StreamThread-42] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-42] Committing all tasks because the commit 
> interval 10000ms has elapsed
> 11:04:13.642 [StreamThread-46] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-46] Committing all tasks because the commit 
> interval 10000ms has ela
> ]
> psed
> 11:04:13.646 [StreamThread-33] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-33] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:13.648 [StreamThread-40] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-40] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:13.655 [StreamThread-39] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-39] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:13.660 [StreamThread-35] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-35] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-42] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-42] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-46] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-46] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-47] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-47] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.663 [StreamThread-44] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-44] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.671 [StreamThread-33] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-33] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.676 [StreamThread-40] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-40] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.677 [StreamThread-39] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-39] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:23.682 [StreamThread-35] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-35] Committing all tasks because the commit 
> interval 10000ms has ela
> psed
> 11:04:29.025 [pool-4-thread-1]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to