Hi Sachin, It might be helpful if you send the logs from the streams application and the broker.
Thanks, Damian On Thu, 9 Feb 2017 at 02:43, Sachin Mittal <sjmit...@gmail.com> wrote: > Hi > I have upgraded to 2.10.2.0 however the streams is still falling via commit > failed exception with cause unknown member id. rocksdb locks issue sseem to > be resolved. > > Also poll is called within the interval. I have posted more detail of the > failure in another thread. > > Please let us know what could be the cause now. > > Thanks > Sachin > > > On 7 Feb 2017 3:21 p.m., "Damian Guy" <damian....@gmail.com> wrote: > > > Hi Sachin, > > > > Sorry i misunderstood what you had said. You are running 3 instances, one > > per machine? I thought you said you were running 3 instances on each > > machine. > > > > Regarding partitions: you are better off having more partitions as this > > effects the maximum degree of parallelism you can achieve in the app. If > > you only have 12 partitions then you can only have at most 12 threads in > > total. If you have 40 partitions then you can have up to 40 threads and > so > > on. It won't help with rebalancing anyway. > > > > You are correct, once all the instances are up and running then > rebalances > > generally shouldn't happen unless there is a failure of some kind. To > > maintain membership of the consumer group the consumer used by streams > > needs to poll at least every max.poll.interval.ms - the default for this > > is > > 5 minutes. If for any reason, the processing of records returned from a > > call to poll takes longer than max.poll.interval.ms then the consumer > > will > > be kicked out of the group and a rebalance will happen. If you are seeing > > rebalances occurring after everything has been up and running and > reached a > > steady state, you might want to increase the value of > max.poll.interval.ms. > > You > > can also adjust max.poll.records to decrease the maximum number of > records > > retrieved by a single call to poll - this will help to reduce the > > processing time. > > > > Thanks, > > Damian > > > > On Tue, 7 Feb 2017 at 07:24 Sachin Mittal <sjmit...@gmail.com> wrote: > > > > > Hi, > > > Everything is understood and I will try out 0.10.2.0-rc0 shortly. > > > > > > However one this is not clear: > > > Firstly i'd recommend you have different state directory configs for > each > > > application instance. > > > > > > Well I am running three separate instance of 4 threads each on three > > > different machines. > > > So each machine has its own physical structure though path to the > states > > > dir is same for each, because that is the relative path where we have > > > mounted the data directory separately for each of these three machine. > > > > > > So my streams state.dir setting is identical for all the instances, but > > > physically there are located at different locations. > > > So why do I need to have different config for each is not clear. > > > > > > I will also test with CLEANUP_DELAY_MS_CONFIG to be 30 minutes. > > > > > > Also one thing I wanted to know if I make partitions equal to total > > streams > > > threads which is 12, will that help in one thread always reading from a > > > single partition, and never a need to re-balance. > > > > > > I however don't understand one thing that once a steady state has > reached > > > and all threads have picked up their partitions then why there is ever > a > > > need to do future re-balance, unless untill a thread dies. > > > > > > Like this is not clear that how often is re-balance is triggered and is > > > there a way we can control it. > > > > > > Thanks > > > Sachin > > > > > > > > > > > > On Mon, Feb 6, 2017 at 10:10 PM, Damian Guy <damian....@gmail.com> > > wrote: > > > > > > > Hi Sachin, > > > > > > > > Firstly i'd recommend you have different state directory configs for > > each > > > > application instance. I suspect you are potentially hitting an issue > > > where > > > > the partition assignment has changed, the state directory locks get > > > > released, and i directory gets removed just before the lock is taken > > out > > > by > > > > another thread or process. Though it is pretty hard to tell from the > > > above > > > > logs. You might also want to set > StreamsConfig.CLEANUP_DELAY_MS_CONFIG > > to > > > > a > > > > higher value than the default of 60 seconds. There is no reason why > it > > > > couldn't be much higher, 30 minutes, maybe more, depends on how badly > > you > > > > need old task data cleaned up. Setting this value higher will reduce > > the > > > > likelihood of the exception occurring. > > > > > > > > 0.10.2 will be out shortly. It would be good if you can try that. You > > can > > > > find the current RC here: http://home.apache.org/~ > > > > ewencp/kafka-0.10.2.0-rc0/ > > > > You don't need to upgrade your kafka brokers, just the streams > client. > > > > > > > > Thanks, > > > > Damian > > > > > > > > On Mon, 6 Feb 2017 at 10:53 Sachin Mittal <sjmit...@gmail.com> > wrote: > > > > > > > > > Hi, > > > > > Yes on first we have three machines with same data directory > setting. > > > > > So the state dir config is same in for each. > > > > > > > > > > If it helps this is the sequence of logs just before the thread > > > shutting > > > > > down > > > > > > > > > > .... > > > > > > > > > > stream-thread [StreamThread-3] Committing all tasks because the > > commit > > > > > interval 30000ms has elapsed > > > > > stream-thread [StreamThread-3] Committing task 0_35 > > > > > stream-thread [StreamThread-3] Committing task 0_38 > > > > > stream-thread [StreamThread-3] Committing task 0_27 > > > > > stream-thread [StreamThread-3] Committing task 0_12 > > > > > > > > > > stream-thread [StreamThread-3] Committing all tasks because the > > commit > > > > > interval 30000ms has elapsed > > > > > stream-thread [StreamThread-3] Committing task 0_35 > > > > > stream-thread [StreamThread-3] Committing task 0_38 > > > > > stream-thread [StreamThread-3] Committing task 0_27 > > > > > stream-thread [StreamThread-3] Committing task 0_12 > > > > > > > > > > stream-thread [StreamThread-3] Committing all tasks because the > > commit > > > > > interval 30000ms has elapsed > > > > > stream-thread [StreamThread-3] Committing task 0_35 > > > > > stream-thread [StreamThread-3] Committing task 0_38 > > > > > stream-thread [StreamThread-3] Failed to commit StreamTask 0_38 > > state: > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task > [0_38] > > > > Failed > > > > > to flush state store key-table > > > > > Caused by: org.rocksdb.RocksDBException: IO error: > > > > > > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key- > > > > table/key-table-201702052100/000009.sst: > > > > > No such file or directory > > > > > stream-thread [StreamThread-3] Shutting down > > > > > stream-thread [StreamThread-3] Committing consumer offsets of task > > 0_35 > > > > > stream-thread [StreamThread-3] Committing consumer offsets of task > > 0_38 > > > > > stream-thread [StreamThread-3] Committing consumer offsets of task > > 0_27 > > > > > stream-thread [StreamThread-3] Committing consumer offsets of task > > 0_12 > > > > > stream-thread [StreamThread-3] Closing a task 0_35 > > > > > stream-thread [StreamThread-3] Closing a task 0_38 > > > > > stream-thread [StreamThread-3] Closing a task 0_27 > > > > > stream-thread [StreamThread-3] Closing a task 0_12 > > > > > stream-thread [StreamThread-3] Flushing state stores of task 0_35 > > > > > stream-thread [StreamThread-3] Flushing state stores of task 0_38 > > > > > stream-thread [StreamThread-3] Failed while executing StreamTask > 0_38 > > > > duet > > > > > to flush state > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task > [0_38] > > > > Failed > > > > > to flush state store key-table > > > > > Caused by: org.rocksdb.RocksDBException: IO error: > > > > > > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key- > > > > table/key-table-201702052100/000009.sst: > > > > > No such file or directory > > > > > stream-thread [StreamThread-3] Flushing state stores of task 0_27 > > > > > stream-thread [StreamThread-3] Flushing state stores of task 0_12 > > > > > stream-thread [StreamThread-3] Closing the state manager of task > 0_35 > > > > > stream-thread [StreamThread-3] Closing the state manager of task > 0_38 > > > > > stream-thread [StreamThread-3] Failed while executing StreamTask > 0_38 > > > > duet > > > > > to close state manager: > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task > [0_38] > > > > Failed > > > > > to close state store key-table > > > > > Caused by: org.rocksdb.RocksDBException: IO error: > > > > > > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key- > > > > table/key-table-201702052100/000009.sst: > > > > > No such file or directory > > > > > stream-thread [StreamThread-3] Closing the state manager of task > 0_27 > > > > > stream-thread [StreamThread-3] Closing the state manager of task > 0_12 > > > > > Closing the Kafka producer with timeoutMillis = 9223372036854775807 > > ms. > > > > > stream-thread [StreamThread-3] Removing all active tasks [[0_35, > > 0_38, > > > > > 0_27, 0_12]] > > > > > stream-thread [StreamThread-3] Removing all standby tasks [[]] > > > > > stream-thread [StreamThread-3] Stream thread shutdown complete > > > > > > > > > > It was working for iterations before that and then suddenly that > > > dir/file > > > > > was gone and it could not commit/flush/close the state. > > > > > > > > > > > > > > > For second, what do you recommend. Is there something we can patch > > from > > > > > 10.2 into 10.1. > > > > > Certain commits or something. > > > > > > > > > > Or do we again need to upgrade the cluster or 10.2, or if we just > > > upgrade > > > > > streams client to 10.2 will it work fine? > > > > > Since 10.2 is not released yet I suppose we would have build the > > > snapshot > > > > > version. > > > > > > > > > > Thanks > > > > > Sachin > > > > > > > > > > > > > > > On Mon, Feb 6, 2017 at 3:58 PM, Damian Guy <damian....@gmail.com> > > > wrote: > > > > > > > > > > > Hi Sachin, > > > > > > > > > > > > The first exception - Is each instance of your streams app on a > > > single > > > > > > machine running with the same state directory config? > > > > > > The second exception - i believe is a bug in 0.10.1 that has been > > > fixed > > > > > in > > > > > > 0.10.2. There has been a number of issues fixed in this area. > > > > > > > > > > > > Thanks, > > > > > > Damian > > > > > > > > > > > > On Mon, 6 Feb 2017 at 05:43 Sachin Mittal <sjmit...@gmail.com> > > > wrote: > > > > > > > > > > > > > Hello All, > > > > > > > We recently upgraded to kafka_2.10-0.10.1.1. > > > > > > > > > > > > > > We have a source topic with replication = 3 and partition = 40. > > > > > > > We have a streams application run with > NUM_STREAM_THREADS_CONFIG > > = > > > 4 > > > > > and > > > > > > on > > > > > > > three machines. So 12 threads in total. > > > > > > > > > > > > > > What we do is start the same streams application one by one on > > > three > > > > > > > machines. > > > > > > > > > > > > > > After some time what we noticed was that one of the machine > > streams > > > > > > > application just crashed. When we inspected the log here is > what > > we > > > > > > found. > > > > > > > There were 2 sets of errors like: > > > > > > > > > > > > > > stream-thread [StreamThread-3] Failed to commit StreamTask 0_38 > > > > state: > > > > > > > org.apache.kafka.streams.errors.ProcessorStateException: task > > > [0_38] > > > > > > Failed > > > > > > > to flush state store key-table > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > ProcessorStateManager.flush(ProcessorStateManager.java:331) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > StreamTask.commit(StreamTask.java:267) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne( > > > > > > StreamThread.java:576) > > > > > > > [kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll( > > > > > > StreamThread.java:562) > > > > > > > [kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > StreamThread.maybeCommit( > > > > > > StreamThread.java:538) > > > > > > > [kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread.runLoop( > > > > > > StreamThread.java:456) > > > > > > > [kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > StreamThread.run(StreamThread.java:242) > > > > > > > [kafka-streams-0.10.1.1.jar:na] > > > > > > > Caused by: org.apache.kafka.streams.errors. > > ProcessorStateException: > > > > > > Error > > > > > > > while executing flush from store key-table-201702052100 > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > > > > flushInternal(RocksDBStore.java:375) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > > > > flush(RocksDBStore.java:366) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > RocksDBWindowStore.flush( > > > > > > RocksDBWindowStore.java:256) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > MeteredWindowStore.flush( > > > > > > MeteredWindowStore.java:116) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > CachingWindowStore.flush( > > > > > > CachingWindowStore.java:119) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > ProcessorStateManager.flush(ProcessorStateManager.java:329) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > ... 6 common frames omitted > > > > > > > Caused by: org.rocksdb.RocksDBException: IO error: > > > > > > > > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table- > > > > > > 201702052100/000009.sst: > > > > > > > No such file or directory > > > > > > > at org.rocksdb.RocksDB.flush(Native Method) > > > > > > ~[rocksdbjni-4.9.0.jar:na] > > > > > > > at org.rocksdb.RocksDB.flush(RocksDB.java:1360) > > > > > > > ~[rocksdbjni-4.9.0.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > > > > flushInternal(RocksDBStore.java:373) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > ... 11 common frames omitted > > > > > > > > > > > > > > When we queried the directory > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table- > > 201702052100/ > > > > we > > > > > > > could not find the directory. > > > > > > > So looks like this directory was earlier deleted by some task > and > > > now > > > > > > some > > > > > > > other task is trying to flush it too. > > > > > > > What could be possible the reason for the same? > > > > > > > > > > > > > > Another set of error we see is this: > > > > > > > org.apache.kafka.streams.errors.StreamsException: stream-thread > > > > > > > [StreamThread-1] Failed to rebalance > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread.runLoop( > > > > > > StreamThread.java:410) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > StreamThread.run(StreamThread.java:242) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > Caused by: org.apache.kafka.streams.errors. > > ProcessorStateException: > > > > > > Error > > > > > > > opening store key-table-201702052000 at location > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table- > > 201702052000 > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > > > > openDB(RocksDBStore.java:190) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > > > > openDB(RocksDBStore.java:159) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment. > > > > > > openDB(RocksDBWindowStore.java:72) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore. > > > > > > getOrCreateSegment(RocksDBWindowStore.java:388) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal( > > > > > > RocksDBWindowStore.java:319) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > > > RocksDBWindowStore.access$000( > > > > > > RocksDBWindowStore.java:51) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > > > RocksDBWindowStore$1.restore( > > > > > > RocksDBWindowStore.java:206) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorStateManager. > > > > > > restoreActiveState(ProcessorStateManager.java:235) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorStateManager. > > > > > > register(ProcessorStateManager.java:198) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > RocksDBWindowStore.init( > > > > > > RocksDBWindowStore.java:200) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > MeteredWindowStore.init( > > > > > > MeteredWindowStore.java:66) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals. > > CachingWindowStore.init( > > > > > > CachingWindowStore.java:64) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask. > > > > > > initializeStateStores(AbstractTask.java:81) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > StreamTask.<init>(StreamTask.java:119) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > StreamThread.createStreamTask(StreamThread.java:633) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > StreamThread.addStreamTasks(StreamThread.java:660) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > StreamThread.access$100( > > > > > > StreamThread.java:69) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > > > > > > onPartitionsAssigned(StreamThread.java:124) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > > > > > > onJoinComplete(ConsumerCoordinator.java:228) > > > > > > > ~[kafka-clients-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > > > joinGroupIfNeeded(AbstractCoordinator.java:313) > > > > > > > ~[kafka-clients-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > > > ensureActiveGroup(AbstractCoordinator.java:277) > > > > > > > ~[kafka-clients-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.internals. > > > > ConsumerCoordinator.poll( > > > > > > ConsumerCoordinator.java:259) > > > > > > > ~[kafka-clients-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > > > > > > pollOnce(KafkaConsumer.java:1013) > > > > > > > ~[kafka-clients-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > > > > KafkaConsumer.java:979) > > > > > > > ~[kafka-clients-0.10.1.1.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread.runLoop( > > > > > > StreamThread.java:407) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > ... 1 common frames omitted > > > > > > > Caused by: org.rocksdb.RocksDBException: IO error: lock > > > > > > > > > > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table-201702052000/LOCK: > > > > > > No > > > > > > > locks available > > > > > > > at org.rocksdb.RocksDB.open(Native Method) > > > > > > ~[rocksdbjni-4.9.0.jar:na] > > > > > > > at org.rocksdb.RocksDB.open(RocksDB.java:184) > > > > > > > ~[rocksdbjni-4.9.0.jar:na] > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > > > > openDB(RocksDBStore.java:183) > > > > > > > ~[kafka-streams-0.10.1.1.jar:na] > > > > > > > ... 26 common frames omitted > > > > > > > > > > > > > > And then the whole application shuts down. We have not > understood > > > > this > > > > > > part > > > > > > > of the error and if this second error is somehow related to > first > > > > > error. > > > > > > > > > > > > > > Please let us know what could be the cause of these errors or > if > > > they > > > > > > have > > > > > > > been fixed and if there is some way to fix this in current > > release. > > > > > > > > > > > > > > Thanks > > > > > > > Sachin > > > > > > > > > > > > > > > > > > > > > > > > > > > >