Update: the app ran well for several hours.. until I tried to update it. I copied a new build up to one machine (of five) and then we went back to near-endless-rebalance. After about an hour I ended up killing the other four instances and watching the first (new one). It took 90 minutes before it started consuming anything.
This morning I copied / started two more. 60 minutes and still waiting for rebalance to conclude. Obviously its impractical to delete the topic(s) before updating the consumer software. What am I doing wrong thats causing all this waiting? On Wed, Dec 14, 2016 at 9:28 AM, Jon Yeargers <jon.yearg...@cedexis.com> wrote: > In a turn of events - this morning I was about to throw in the proverbial > towel on Kafka. In a last ditch effort I killed all but one instance of my > app, put it back to a single thread (why offer the option if it's not > advised?) and deleted every last topic that had any relation to this app. > > I restarted it on a single machine and it magically worked. It's been > running for more than an hour now and hasn't been stuck in 'rebalance-land' > at all. > > I'll keep watching it and see how it goes. > > On Wed, Dec 14, 2016 at 6:13 AM, Damian Guy <damian....@gmail.com> wrote: > >> We do recommend one thread per instance of the app. However, it should >> also >> work with multiple threads. >> I can't debug the problem any further without the logs from the other >> apps. >> We'd need to try and see if another instance still has task 1_3 open ( i >> suspect it does ) >> >> Thanks, >> Damian >> >> On Wed, 14 Dec 2016 at 13:20 Jon Yeargers <jon.yearg...@cedexis.com> >> wrote: >> >> > What should I do about this? One thread per app? >> > >> > On Wed, Dec 14, 2016 at 4:11 AM, Damian Guy <damian....@gmail.com> >> wrote: >> > >> > > That is correct >> > > >> > > On Wed, 14 Dec 2016 at 12:09 Jon Yeargers <jon.yearg...@cedexis.com> >> > > wrote: >> > > >> > > > I have the app running on 5 machines. Is that what you mean? >> > > > >> > > > On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy <damian....@gmail.com> >> > > wrote: >> > > > >> > > > > Hi Jon, >> > > > > >> > > > > Do you have more than one instance of the app running? The reason >> i >> > ask >> > > > is >> > > > > because the task (task 1_3) that fails with the >> > > > > "java.lang.IllegalStateException" in this log is previously >> running >> > > as a >> > > > > Standby Task. This would mean the active task for this store would >> > have >> > > > > been running elsewhere, but i don't see that in the logs. The >> > exception >> > > > > occurs as StreamThread-1 starts to run task 1_3 as an active task. >> > The >> > > > > exception might indicate that another thread/instance is still >> > writing >> > > to >> > > > > the changelog topic for the State Store. >> > > > > >> > > > > Thanks, >> > > > > Damian >> > > > > >> > > > > On Tue, 13 Dec 2016 at 17:23 Jon Yeargers < >> jon.yearg...@cedexis.com> >> > > > > wrote: >> > > > > >> > > > > > As near as I can see it's rebalancing constantly. >> > > > > > >> > > > > > I'll up that value and see what happens. >> > > > > > >> > > > > > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy < >> damian....@gmail.com> >> > > > > wrote: >> > > > > > >> > > > > > > Hi Jon, >> > > > > > > >> > > > > > > I haven't had much of a chance to look at the logs in detail >> too >> > > much >> > > > > > yet, >> > > > > > > but i have noticed that your app seems to be rebalancing >> > > frequently. >> > > > > It >> > > > > > > seems that it is usually around the 300 second mark, which >> > usually >> > > > > would >> > > > > > > mean that poll hasn't been called for at least that long. You >> > might >> > > > > want >> > > > > > to >> > > > > > > try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG >> to >> > > > > > > something >> > > > > > > higher than 300000 (which is the default). >> > > > > > > >> > > > > > > I'll continue to look at your logs and get back to you. >> > > > > > > Thanks, >> > > > > > > Damian >> > > > > > > >> > > > > > > On Tue, 13 Dec 2016 at 15:02 Jon Yeargers < >> > > jon.yearg...@cedexis.com> >> > > > > > > wrote: >> > > > > > > >> > > > > > > > attached is a log with lots of disconnections and a small >> > amount >> > > of >> > > > > > > > actual, useful activity. >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers < >> > > > > > jon.yearg...@cedexis.com> >> > > > > > > > wrote: >> > > > > > > > >> > > > > > > > n/m - I understand the logging issue now. Am generating a >> new >> > > one. >> > > > > Will >> > > > > > > > send shortly. >> > > > > > > > >> > > > > > > > On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers < >> > > > > > jon.yearg...@cedexis.com> >> > > > > > > > wrote: >> > > > > > > > >> > > > > > > > Yes - saw that one. There were plenty of smaller records >> > > available >> > > > > > > though. >> > > > > > > > >> > > > > > > > I sent another log this morning with the level set to DEBUG. >> > > > > Hopefully >> > > > > > > you >> > > > > > > > rec'd it. >> > > > > > > > >> > > > > > > > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy < >> > > damian....@gmail.com> >> > > > > > > wrote: >> > > > > > > > >> > > > > > > > HI Jon, >> > > > > > > > >> > > > > > > > It looks like you have the logging level for KafkaStreams >> set >> > to >> > > at >> > > > > > least >> > > > > > > > WARN. I can only see ERROR level logs being produced from >> > > Streams. >> > > > > > > > >> > > > > > > > However, i did notice an issue in the logs (not related to >> your >> > > > > > specific >> > > > > > > > error but you will need to fix anyway): >> > > > > > > > >> > > > > > > > There are lots of messages like: >> > > > > > > > task [2_9] Error sending record to topic >> > > > > > > > PRTMinuteAgg-prt_hour_agg_stream-changelog >> > > > > > > > org.apache.kafka.common.errors.RecordTooLargeException: The >> > > message >> > > > > is >> > > > > > > > 2381750 bytes when serialized which is larger than the >> maximum >> > > > > > > > >> > > > > > > > This means you need to add some extra config to your >> > > StreamsConfig: >> > > > > > > > config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, >> > > > > > > > expectedMaximumMessageSizeBytes) >> > > > > > > > >> > > > > > > > You will also possible need to adjust the broker properties >> and >> > > > > > > > increase message.max.bytes >> > > > > > > > - it will need to be at least as large as the setting above. >> > > > > > > > >> > > > > > > > At the moment all of the change-logs for your state-stores >> are >> > > > being >> > > > > > > > dropped due to this issue. >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > Damian >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On Tue, 13 Dec 2016 at 11:32 Jon Yeargers < >> > > > jon.yearg...@cedexis.com> >> > > > > > > > wrote: >> > > > > > > > >> > > > > > > > > (am attaching a debug log - note that app terminated with >> no >> > > > > further >> > > > > > > > > messages) >> > > > > > > > > >> > > > > > > > > topology: kStream -> groupByKey.aggregate(minute) -> >> foreach >> > > > > > > > > \-> >> groupByKey.aggregate(hour) >> > -> >> > > > > > foreach >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > config: >> > > > > > > > > >> > > > > > > > > Properties config = new Properties(); >> > > > > > > > > config.put(StreamsConfig.BOOT >> STRAP_SERVERS_CONFIG, >> > > > > > BROKER_IP); >> > > > > > > > > config.put(StreamsConfig.ZOOK >> EEPER_CONNECT_CONFIG, >> > > > > > > ZOOKEEPER_IP); >> > > > > > > > > config.put(StreamsConfig.APPLICATION_ID_CONFIG, >> > > > > > > "PRTMinuteAgg" ); >> > > > > > > > > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> > > > > > > > > AggKey.class.getName()); >> > > > > > > > > config.put(StreamsConfig.VALU >> E_SERDE_CLASS_CONFIG, >> > > > > > > > > Serdes.String().getClass().getName()); >> > > > > > > > > config.put(ProducerConfig.COM >> PRESSION_TYPE_CONFIG, >> > > > > "snappy"); >> > > > > > > > > config.put(StreamsConfig.NUM_ >> STANDBY_REPLICAS_CONFIG, >> > > > > "2"); >> > > > > > > > > config.put(StreamsConfig.STATE_DIR_CONFIG, >> > > > > > > "/mnt/PRTMinuteAgg"); >> > > > > > > > > >> > > > > > > > > config.put(StreamsConfig.NUM_ >> STREAM_THREADS_CONFIG, >> > > "2"); >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang < >> > > > wangg...@gmail.com >> > > > > > >> > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > Jon, >> > > > > > > > > >> > > > > > > > > To help investigating this issue, could you let me know 1) >> > your >> > > > > > > topology >> > > > > > > > > sketch and 2) your app configs? For example did you enable >> > > > caching >> > > > > in >> > > > > > > > your >> > > > > > > > > apps with the cache.max.bytes.buffering config? >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > Guozhang >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers < >> > > > > > > jon.yearg...@cedexis.com> >> > > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > I get this one quite a bit. It kills my app after a >> short >> > > time >> > > > of >> > > > > > > > > running. >> > > > > > > > > > Driving me nuts. >> > > > > > > > > > >> > > > > > > > > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax < >> > > > > > > > matth...@confluent.io> >> > > > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Not sure about this one. >> > > > > > > > > > > >> > > > > > > > > > > Can you describe what you do exactly? Can you >> reproduce >> > the >> > > > > > issue? >> > > > > > > We >> > > > > > > > > > > definitely want to investigate this. >> > > > > > > > > > > >> > > > > > > > > > > -Matthias >> > > > > > > > > > > >> > > > > > > > > > > On 12/10/16 4:17 PM, Jon Yeargers wrote: >> > > > > > > > > > > > (Am reporting these as have moved to 0.10.1.0-cp2) >> > > > > > > > > > > > >> > > > > > > > > > > > ERROR o.a.k.c.c.i.ConsumerCoordinator - User >> provided >> > > > > listener >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. >> > > StreamThread$1 >> > > > > for >> > > > > > > > group >> > > > > > > > > > > > MinuteAgg failed on partition assignment >> > > > > > > > > > > > >> > > > > > > > > > > > java.lang.IllegalStateException: task [1_9] Log end >> > > offset >> > > > > > > should >> > > > > > > > not >> > > > > > > > > > > > change while restoring >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. >> > > > > > > ProcessorStateManager. >> > > > > > > > > > > restoreActiveState(ProcessorStateManager.java:245) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. >> > > > > > > ProcessorStateManager. >> > > > > > > > >> > > > > > > > > > > register(ProcessorStateManager.java:198) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. >> > > > > > > > > > > ProcessorContextImpl.register( >> > > ProcessorContextImpl.java:123) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > org.apache.kafka.streams.state.internals. >> > > > > > > RocksDBWindowStore.init( >> > > > > > > > > > > RocksDBWindowStore.java:206) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > org.apache.kafka.streams.state.internals. >> > > > > > > MeteredWindowStore.init( >> > > > > > > > > > > MeteredWindowStore.java:66) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > org.apache.kafka.streams.state.internals. >> > > > > > > CachingWindowStore.init( >> > > > > > > > > > > CachingWindowStore.java:64) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. >> > > AbstractTask. >> > > > > > > > > > > initializeStateStores(AbstractTask.java:81) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. >> > > > > > > > > > > StreamTask.<init>(StreamTask.java:120) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. >> > > > > > > > > > > StreamThread.createStreamTask(StreamThread.java:633) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. >> > > > > > > > > > > StreamThread.addStreamTasks(StreamThread.java:660) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > org.apache.kafka.streams.processor.internals.StreamThread.ac >> > > > > > > > cess$100( >> > > > > > > > > > > StreamThread.java:69) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > org.apache.kafka.streams.processor.internals.StreamThread$1. >> > > > > > > > > > > onPartitionsAssigned(StreamThread.java:124) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordina >> tor. >> > > > > > > > > > > onJoinComplete(ConsumerCoordinator.java:228) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordina >> tor. >> > > > > > > > > > > joinGroupIfNeeded(AbstractCoordinator.java:313) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordina >> tor. >> > > > > > > > > > > ensureActiveGroup(AbstractCoordinator.java:277) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > > > > > org.apache.kafka.clients.consumer.internals. >> > > > > ConsumerCoordinator.poll( >> > > > > > > > > > > ConsumerCoordinator.java:259) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer. >> > > > > > > > > > > pollOnce(KafkaConsumer.java:1013) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > org.apache.kafka.clients.consu >> mer.KafkaConsumer.poll( >> > > > > > > > > > > KafkaConsumer.java:979) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. >> > > > > > > StreamThread.runLoop( >> > > > > > > > > > > StreamThread.java:407) >> > > > > > > > > > > > >> > > > > > > > > > > > at >> > > > > > > > > > > > org.apache.kafka.streams.processor.internals. >> > > > > > > > > > > StreamThread.run(StreamThread.java:242) >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > -- >> > > > > > > > > -- Guozhang >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >