Hi Jon, When you "copied a new build up to one machine" did you mean that you swipe in the new jar, and then bounce the instance?
Kafka Streams should naturally support online upgrading by simply rolling bounce your instances, so I would not expect the scenarios you described to happen. However, note that if you have lots of state stores it indeed will cause them to be migrated and restored (i.e. replaying the whole changelog to get the up-to-date state) on other instances when the current instance is being bounced and then migrated back, which makes the rebalance itself very long time. To validate if the state restoration is taking most of the time in rebalancing, you can turn on TRACE level logging and see if the restore consumer is keep fetching records from the changelog topics, as in "Returning fetched records at offset XXX for assigned partition YYY-changelog and update position to ZZZ" Guozhang On Thu, Dec 15, 2016 at 3:54 AM, Jon Yeargers <jon.yearg...@cedexis.com> wrote: > Update: the app ran well for several hours.. until I tried to update it. I > copied a new build up to one machine (of five) and then we went back to > near-endless-rebalance. After about an hour I ended up killing the other > four instances and watching the first (new one). It took 90 minutes before > it started consuming anything. > > This morning I copied / started two more. 60 minutes and still waiting for > rebalance to conclude. > > Obviously its impractical to delete the topic(s) before updating the > consumer software. What am I doing wrong thats causing all this waiting? > > On Wed, Dec 14, 2016 at 9:28 AM, Jon Yeargers <jon.yearg...@cedexis.com> > wrote: > > > In a turn of events - this morning I was about to throw in the proverbial > > towel on Kafka. In a last ditch effort I killed all but one instance of > my > > app, put it back to a single thread (why offer the option if it's not > > advised?) and deleted every last topic that had any relation to this app. > > > > I restarted it on a single machine and it magically worked. It's been > > running for more than an hour now and hasn't been stuck in > 'rebalance-land' > > at all. > > > > I'll keep watching it and see how it goes. > > > > On Wed, Dec 14, 2016 at 6:13 AM, Damian Guy <damian....@gmail.com> > wrote: > > > >> We do recommend one thread per instance of the app. However, it should > >> also > >> work with multiple threads. > >> I can't debug the problem any further without the logs from the other > >> apps. > >> We'd need to try and see if another instance still has task 1_3 open ( i > >> suspect it does ) > >> > >> Thanks, > >> Damian > >> > >> On Wed, 14 Dec 2016 at 13:20 Jon Yeargers <jon.yearg...@cedexis.com> > >> wrote: > >> > >> > What should I do about this? One thread per app? > >> > > >> > On Wed, Dec 14, 2016 at 4:11 AM, Damian Guy <damian....@gmail.com> > >> wrote: > >> > > >> > > That is correct > >> > > > >> > > On Wed, 14 Dec 2016 at 12:09 Jon Yeargers <jon.yearg...@cedexis.com > > > >> > > wrote: > >> > > > >> > > > I have the app running on 5 machines. Is that what you mean? > >> > > > > >> > > > On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy <damian....@gmail.com > > > >> > > wrote: > >> > > > > >> > > > > Hi Jon, > >> > > > > > >> > > > > Do you have more than one instance of the app running? The > reason > >> i > >> > ask > >> > > > is > >> > > > > because the task (task 1_3) that fails with the > >> > > > > "java.lang.IllegalStateException" in this log is previously > >> running > >> > > as a > >> > > > > Standby Task. This would mean the active task for this store > would > >> > have > >> > > > > been running elsewhere, but i don't see that in the logs. The > >> > exception > >> > > > > occurs as StreamThread-1 starts to run task 1_3 as an active > task. > >> > The > >> > > > > exception might indicate that another thread/instance is still > >> > writing > >> > > to > >> > > > > the changelog topic for the State Store. > >> > > > > > >> > > > > Thanks, > >> > > > > Damian > >> > > > > > >> > > > > On Tue, 13 Dec 2016 at 17:23 Jon Yeargers < > >> jon.yearg...@cedexis.com> > >> > > > > wrote: > >> > > > > > >> > > > > > As near as I can see it's rebalancing constantly. > >> > > > > > > >> > > > > > I'll up that value and see what happens. > >> > > > > > > >> > > > > > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy < > >> damian....@gmail.com> > >> > > > > wrote: > >> > > > > > > >> > > > > > > Hi Jon, > >> > > > > > > > >> > > > > > > I haven't had much of a chance to look at the logs in detail > >> too > >> > > much > >> > > > > > yet, > >> > > > > > > but i have noticed that your app seems to be rebalancing > >> > > frequently. > >> > > > > It > >> > > > > > > seems that it is usually around the 300 second mark, which > >> > usually > >> > > > > would > >> > > > > > > mean that poll hasn't been called for at least that long. > You > >> > might > >> > > > > want > >> > > > > > to > >> > > > > > > try setting the config ConsumerConfig.MAX_POLL_ > INTERVAL_CONFIG > >> to > >> > > > > > > something > >> > > > > > > higher than 300000 (which is the default). > >> > > > > > > > >> > > > > > > I'll continue to look at your logs and get back to you. > >> > > > > > > Thanks, > >> > > > > > > Damian > >> > > > > > > > >> > > > > > > On Tue, 13 Dec 2016 at 15:02 Jon Yeargers < > >> > > jon.yearg...@cedexis.com> > >> > > > > > > wrote: > >> > > > > > > > >> > > > > > > > attached is a log with lots of disconnections and a small > >> > amount > >> > > of > >> > > > > > > > actual, useful activity. > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers < > >> > > > > > jon.yearg...@cedexis.com> > >> > > > > > > > wrote: > >> > > > > > > > > >> > > > > > > > n/m - I understand the logging issue now. Am generating a > >> new > >> > > one. > >> > > > > Will > >> > > > > > > > send shortly. > >> > > > > > > > > >> > > > > > > > On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers < > >> > > > > > jon.yearg...@cedexis.com> > >> > > > > > > > wrote: > >> > > > > > > > > >> > > > > > > > Yes - saw that one. There were plenty of smaller records > >> > > available > >> > > > > > > though. > >> > > > > > > > > >> > > > > > > > I sent another log this morning with the level set to > DEBUG. > >> > > > > Hopefully > >> > > > > > > you > >> > > > > > > > rec'd it. > >> > > > > > > > > >> > > > > > > > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy < > >> > > damian....@gmail.com> > >> > > > > > > wrote: > >> > > > > > > > > >> > > > > > > > HI Jon, > >> > > > > > > > > >> > > > > > > > It looks like you have the logging level for KafkaStreams > >> set > >> > to > >> > > at > >> > > > > > least > >> > > > > > > > WARN. I can only see ERROR level logs being produced from > >> > > Streams. > >> > > > > > > > > >> > > > > > > > However, i did notice an issue in the logs (not related to > >> your > >> > > > > > specific > >> > > > > > > > error but you will need to fix anyway): > >> > > > > > > > > >> > > > > > > > There are lots of messages like: > >> > > > > > > > task [2_9] Error sending record to topic > >> > > > > > > > PRTMinuteAgg-prt_hour_agg_stream-changelog > >> > > > > > > > org.apache.kafka.common.errors.RecordTooLargeException: > The > >> > > message > >> > > > > is > >> > > > > > > > 2381750 bytes when serialized which is larger than the > >> maximum > >> > > > > > > > > >> > > > > > > > This means you need to add some extra config to your > >> > > StreamsConfig: > >> > > > > > > > config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, > >> > > > > > > > expectedMaximumMessageSizeBytes) > >> > > > > > > > > >> > > > > > > > You will also possible need to adjust the broker > properties > >> and > >> > > > > > > > increase message.max.bytes > >> > > > > > > > - it will need to be at least as large as the setting > above. > >> > > > > > > > > >> > > > > > > > At the moment all of the change-logs for your state-stores > >> are > >> > > > being > >> > > > > > > > dropped due to this issue. > >> > > > > > > > > >> > > > > > > > Thanks, > >> > > > > > > > Damian > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > On Tue, 13 Dec 2016 at 11:32 Jon Yeargers < > >> > > > jon.yearg...@cedexis.com> > >> > > > > > > > wrote: > >> > > > > > > > > >> > > > > > > > > (am attaching a debug log - note that app terminated > with > >> no > >> > > > > further > >> > > > > > > > > messages) > >> > > > > > > > > > >> > > > > > > > > topology: kStream -> groupByKey.aggregate(minute) -> > >> foreach > >> > > > > > > > > \-> > >> groupByKey.aggregate(hour) > >> > -> > >> > > > > > foreach > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > config: > >> > > > > > > > > > >> > > > > > > > > Properties config = new Properties(); > >> > > > > > > > > config.put(StreamsConfig.BOOT > >> STRAP_SERVERS_CONFIG, > >> > > > > > BROKER_IP); > >> > > > > > > > > config.put(StreamsConfig.ZOOK > >> EEPER_CONNECT_CONFIG, > >> > > > > > > ZOOKEEPER_IP); > >> > > > > > > > > config.put(StreamsConfig.APPLICATION_ID_CONFIG, > >> > > > > > > "PRTMinuteAgg" ); > >> > > > > > > > > config.put(StreamsConfig.KEY_ > SERDE_CLASS_CONFIG, > >> > > > > > > > > AggKey.class.getName()); > >> > > > > > > > > config.put(StreamsConfig.VALU > >> E_SERDE_CLASS_CONFIG, > >> > > > > > > > > Serdes.String().getClass().getName()); > >> > > > > > > > > config.put(ProducerConfig.COM > >> PRESSION_TYPE_CONFIG, > >> > > > > "snappy"); > >> > > > > > > > > config.put(StreamsConfig.NUM_ > >> STANDBY_REPLICAS_CONFIG, > >> > > > > "2"); > >> > > > > > > > > config.put(StreamsConfig.STATE_DIR_CONFIG, > >> > > > > > > "/mnt/PRTMinuteAgg"); > >> > > > > > > > > > >> > > > > > > > > config.put(StreamsConfig.NUM_ > >> STREAM_THREADS_CONFIG, > >> > > "2"); > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang < > >> > > > wangg...@gmail.com > >> > > > > > > >> > > > > > > > wrote: > >> > > > > > > > > > >> > > > > > > > > Jon, > >> > > > > > > > > > >> > > > > > > > > To help investigating this issue, could you let me know > 1) > >> > your > >> > > > > > > topology > >> > > > > > > > > sketch and 2) your app configs? For example did you > enable > >> > > > caching > >> > > > > in > >> > > > > > > > your > >> > > > > > > > > apps with the cache.max.bytes.buffering config? > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > Guozhang > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers < > >> > > > > > > jon.yearg...@cedexis.com> > >> > > > > > > > > wrote: > >> > > > > > > > > > >> > > > > > > > > > I get this one quite a bit. It kills my app after a > >> short > >> > > time > >> > > > of > >> > > > > > > > > running. > >> > > > > > > > > > Driving me nuts. > >> > > > > > > > > > > >> > > > > > > > > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax < > >> > > > > > > > matth...@confluent.io> > >> > > > > > > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > > Not sure about this one. > >> > > > > > > > > > > > >> > > > > > > > > > > Can you describe what you do exactly? Can you > >> reproduce > >> > the > >> > > > > > issue? > >> > > > > > > We > >> > > > > > > > > > > definitely want to investigate this. > >> > > > > > > > > > > > >> > > > > > > > > > > -Matthias > >> > > > > > > > > > > > >> > > > > > > > > > > On 12/10/16 4:17 PM, Jon Yeargers wrote: > >> > > > > > > > > > > > (Am reporting these as have moved to 0.10.1.0-cp2) > >> > > > > > > > > > > > > >> > > > > > > > > > > > ERROR o.a.k.c.c.i.ConsumerCoordinator - User > >> provided > >> > > > > listener > >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > >> > > StreamThread$1 > >> > > > > for > >> > > > > > > > group > >> > > > > > > > > > > > MinuteAgg failed on partition assignment > >> > > > > > > > > > > > > >> > > > > > > > > > > > java.lang.IllegalStateException: task [1_9] Log > end > >> > > offset > >> > > > > > > should > >> > > > > > > > not > >> > > > > > > > > > > > change while restoring > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > >> > > > > > > ProcessorStateManager. > >> > > > > > > > > > > restoreActiveState(ProcessorStateManager.java:245) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > >> > > > > > > ProcessorStateManager. > >> > > > > > > > > >> > > > > > > > > > > register(ProcessorStateManager.java:198) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > >> > > > > > > > > > > ProcessorContextImpl.register( > >> > > ProcessorContextImpl.java:123) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > org.apache.kafka.streams.state.internals. > >> > > > > > > RocksDBWindowStore.init( > >> > > > > > > > > > > RocksDBWindowStore.java:206) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > org.apache.kafka.streams.state.internals. > >> > > > > > > MeteredWindowStore.init( > >> > > > > > > > > > > MeteredWindowStore.java:66) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > org.apache.kafka.streams.state.internals. > >> > > > > > > CachingWindowStore.init( > >> > > > > > > > > > > CachingWindowStore.java:64) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > >> > > AbstractTask. > >> > > > > > > > > > > initializeStateStores(AbstractTask.java:81) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > >> > > > > > > > > > > StreamTask.<init>(StreamTask.java:120) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > >> > > > > > > > > > > StreamThread.createStreamTask( > StreamThread.java:633) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > >> > > > > > > > > > > StreamThread.addStreamTasks(StreamThread.java:660) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > > >> > > > org.apache.kafka.streams.processor.internals.StreamThread.ac > >> > > > > > > > cess$100( > >> > > > > > > > > > > StreamThread.java:69) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > > >> > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > >> > > > > > > > > > > onPartitionsAssigned(StreamThread.java:124) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > > >> > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordina > >> tor. > >> > > > > > > > > > > onJoinComplete(ConsumerCoordinator.java:228) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > > >> > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordina > >> tor. > >> > > > > > > > > > > joinGroupIfNeeded(AbstractCoordinator.java:313) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > > >> > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordina > >> tor. > >> > > > > > > > > > > ensureActiveGroup(AbstractCoordinator.java:277) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > > >> > > > > > > > org.apache.kafka.clients.consumer.internals. > >> > > > > ConsumerCoordinator.poll( > >> > > > > > > > > > > ConsumerCoordinator.java:259) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > >> > > > > > > > > > > pollOnce(KafkaConsumer.java:1013) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > org.apache.kafka.clients.consu > >> mer.KafkaConsumer.poll( > >> > > > > > > > > > > KafkaConsumer.java:979) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > >> > > > > > > StreamThread.runLoop( > >> > > > > > > > > > > StreamThread.java:407) > >> > > > > > > > > > > > > >> > > > > > > > > > > > at > >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > >> > > > > > > > > > > StreamThread.run(StreamThread.java:242) > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > -- > >> > > > > > > > > -- Guozhang > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > > > -- -- Guozhang