Update: the app ran well for several hours.. until I tried to update it. I
copied a new build up to one machine (of five) and then we went back to
near-endless-rebalance. After about an hour I ended up killing the other
four instances and watching the first (new one). It took 90 minutes before
it started consuming anything.
This morning I copied / started two more. 60 minutes and still waiting for
rebalance to conclude.

Obviously its impractical to delete the topic(s) before updating the
consumer software. What am I doing wrong thats causing all this waiting?

On Wed, Dec 14, 2016 at 9:28 AM, Jon Yeargers <jon.yearg...@cedexis.com>
wrote:

> In a turn of events - this morning I was about to throw in the proverbial
> towel on Kafka. In a last ditch effort I killed all but one instance of my
> app, put it back to a single thread (why offer the option if it's not
> advised?) and deleted every last topic that had any relation to this app.
>
> I restarted it on a single machine and it magically worked. It's been
> running for more than an hour now and hasn't been stuck in 'rebalance-land'
> at all.
>
> I'll keep watching it and see how it goes.
>
> On Wed, Dec 14, 2016 at 6:13 AM, Damian Guy <damian....@gmail.com> wrote:
>
>> We do recommend one thread per instance of the app. However, it should
>> also
>> work with multiple threads.
>> I can't debug the problem any further without the logs from the other
>> apps.
>> We'd need to try and see if another instance still has task 1_3 open ( i
>> suspect it does )
>>
>> Thanks,
>> Damian
>>
>> On Wed, 14 Dec 2016 at 13:20 Jon Yeargers <jon.yearg...@cedexis.com>
>> wrote:
>>
>> > What should I do about this? One thread per app?
>> >
>> > On Wed, Dec 14, 2016 at 4:11 AM, Damian Guy <damian....@gmail.com>
>> wrote:
>> >
>> > > That is correct
>> > >
>> > > On Wed, 14 Dec 2016 at 12:09 Jon Yeargers <jon.yearg...@cedexis.com>
>> > > wrote:
>> > >
>> > > > I have the app running on 5 machines. Is that what you mean?
>> > > >
>> > > > On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy <damian....@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hi Jon,
>> > > > >
>> > > > > Do you have more than one instance of the app running? The reason
>> i
>> > ask
>> > > > is
>> > > > > because the task (task 1_3) that fails with the
>> > > > > "java.lang.IllegalStateException" in this log is previously
>> running
>> > > as a
>> > > > > Standby Task. This would mean the active task for this store would
>> > have
>> > > > > been running elsewhere, but i don't see that in the logs. The
>> > exception
>> > > > > occurs as StreamThread-1 starts to run task 1_3 as an active task.
>> > The
>> > > > > exception might indicate that another thread/instance is still
>> > writing
>> > > to
>> > > > > the changelog topic for the State Store.
>> > > > >
>> > > > > Thanks,
>> > > > > Damian
>> > > > >
>> > > > > On Tue, 13 Dec 2016 at 17:23 Jon Yeargers <
>> jon.yearg...@cedexis.com>
>> > > > > wrote:
>> > > > >
>> > > > > > As near as I can see it's rebalancing constantly.
>> > > > > >
>> > > > > > I'll up that value and see what happens.
>> > > > > >
>> > > > > > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy <
>> damian....@gmail.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi Jon,
>> > > > > > >
>> > > > > > > I haven't had much of a chance to look at the logs in detail
>> too
>> > > much
>> > > > > > yet,
>> > > > > > > but i have noticed that your app seems to be rebalancing
>> > > frequently.
>> > > > > It
>> > > > > > > seems that it is usually around the 300 second mark, which
>> > usually
>> > > > > would
>> > > > > > > mean that poll hasn't been called for at least that long. You
>> > might
>> > > > > want
>> > > > > > to
>> > > > > > > try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG
>> to
>> > > > > > > something
>> > > > > > > higher than 300000 (which is the default).
>> > > > > > >
>> > > > > > > I'll continue to look at your logs and get back to you.
>> > > > > > > Thanks,
>> > > > > > > Damian
>> > > > > > >
>> > > > > > > On Tue, 13 Dec 2016 at 15:02 Jon Yeargers <
>> > > jon.yearg...@cedexis.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > attached is a log with lots of disconnections and a small
>> > amount
>> > > of
>> > > > > > > > actual, useful activity.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers <
>> > > > > > jon.yearg...@cedexis.com>
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > n/m - I understand the logging issue now. Am generating a
>> new
>> > > one.
>> > > > > Will
>> > > > > > > > send shortly.
>> > > > > > > >
>> > > > > > > > On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers <
>> > > > > > jon.yearg...@cedexis.com>
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > Yes - saw that one. There were plenty of smaller records
>> > > available
>> > > > > > > though.
>> > > > > > > >
>> > > > > > > > I sent another log this morning with the level set to DEBUG.
>> > > > > Hopefully
>> > > > > > > you
>> > > > > > > > rec'd it.
>> > > > > > > >
>> > > > > > > > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy <
>> > > damian....@gmail.com>
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > HI Jon,
>> > > > > > > >
>> > > > > > > > It looks like you have the logging level for KafkaStreams
>> set
>> > to
>> > > at
>> > > > > > least
>> > > > > > > > WARN. I can only see ERROR level logs being produced from
>> > > Streams.
>> > > > > > > >
>> > > > > > > > However, i did notice an issue in the logs (not related to
>> your
>> > > > > > specific
>> > > > > > > > error but you will need to fix anyway):
>> > > > > > > >
>> > > > > > > > There are lots of messages like:
>> > > > > > > > task [2_9] Error sending record to topic
>> > > > > > > > PRTMinuteAgg-prt_hour_agg_stream-changelog
>> > > > > > > > org.apache.kafka.common.errors.RecordTooLargeException: The
>> > > message
>> > > > > is
>> > > > > > > > 2381750 bytes when serialized which is larger than the
>> maximum
>> > > > > > > >
>> > > > > > > > This means you need to add some extra config to your
>> > > StreamsConfig:
>> > > > > > > > config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
>> > > > > > > > expectedMaximumMessageSizeBytes)
>> > > > > > > >
>> > > > > > > > You will also possible need to adjust the broker properties
>> and
>> > > > > > > > increase message.max.bytes
>> > > > > > > > - it will need to be at least as large as the setting above.
>> > > > > > > >
>> > > > > > > > At the moment all of the change-logs for your state-stores
>> are
>> > > > being
>> > > > > > > > dropped due to this issue.
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > Damian
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Tue, 13 Dec 2016 at 11:32 Jon Yeargers <
>> > > > jon.yearg...@cedexis.com>
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > (am attaching a debug log - note that app terminated with
>> no
>> > > > > further
>> > > > > > > > > messages)
>> > > > > > > > >
>> > > > > > > > > topology: kStream -> groupByKey.aggregate(minute) ->
>> foreach
>> > > > > > > > >                              \->
>> groupByKey.aggregate(hour)
>> > ->
>> > > > > > foreach
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > config:
>> > > > > > > > >
>> > > > > > > > >         Properties config = new Properties();
>> > > > > > > > >         config.put(StreamsConfig.BOOT
>> STRAP_SERVERS_CONFIG,
>> > > > > > BROKER_IP);
>> > > > > > > > >         config.put(StreamsConfig.ZOOK
>> EEPER_CONNECT_CONFIG,
>> > > > > > > ZOOKEEPER_IP);
>> > > > > > > > >         config.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> > > > > > > "PRTMinuteAgg" );
>> > > > > > > > >         config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > > > > > > > > AggKey.class.getName());
>> > > > > > > > >         config.put(StreamsConfig.VALU
>> E_SERDE_CLASS_CONFIG,
>> > > > > > > > > Serdes.String().getClass().getName());
>> > > > > > > > >         config.put(ProducerConfig.COM
>> PRESSION_TYPE_CONFIG,
>> > > > > "snappy");
>> > > > > > > > >         config.put(StreamsConfig.NUM_
>> STANDBY_REPLICAS_CONFIG,
>> > > > > "2");
>> > > > > > > > >         config.put(StreamsConfig.STATE_DIR_CONFIG,
>> > > > > > > "/mnt/PRTMinuteAgg");
>> > > > > > > > >
>> > > > > > > > >         config.put(StreamsConfig.NUM_
>> STREAM_THREADS_CONFIG,
>> > > "2");
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang <
>> > > > wangg...@gmail.com
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > Jon,
>> > > > > > > > >
>> > > > > > > > > To help investigating this issue, could you let me know 1)
>> > your
>> > > > > > > topology
>> > > > > > > > > sketch and 2) your app configs? For example did you enable
>> > > > caching
>> > > > > in
>> > > > > > > > your
>> > > > > > > > > apps with the cache.max.bytes.buffering config?
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > Guozhang
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers <
>> > > > > > > jon.yearg...@cedexis.com>
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > I get this one quite a bit. It kills my app after a
>> short
>> > > time
>> > > > of
>> > > > > > > > > running.
>> > > > > > > > > > Driving me nuts.
>> > > > > > > > > >
>> > > > > > > > > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax <
>> > > > > > > > matth...@confluent.io>
>> > > > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Not sure about this one.
>> > > > > > > > > > >
>> > > > > > > > > > > Can you describe what you do exactly? Can you
>> reproduce
>> > the
>> > > > > > issue?
>> > > > > > > We
>> > > > > > > > > > > definitely want to investigate this.
>> > > > > > > > > > >
>> > > > > > > > > > > -Matthias
>> > > > > > > > > > >
>> > > > > > > > > > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
>> > > > > > > > > > > > (Am reporting these as have moved to 0.10.1.0-cp2)
>> > > > > > > > > > > >
>> > > > > > > > > > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User
>> provided
>> > > > > listener
>> > > > > > > > > > > > org.apache.kafka.streams.processor.internals.
>> > > StreamThread$1
>> > > > > for
>> > > > > > > > group
>> > > > > > > > > > > > MinuteAgg failed on partition assignment
>> > > > > > > > > > > >
>> > > > > > > > > > > > java.lang.IllegalStateException: task [1_9] Log end
>> > > offset
>> > > > > > > should
>> > > > > > > > not
>> > > > > > > > > > > > change while restoring
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > >
>> > > > > > > > > > > > org.apache.kafka.streams.processor.internals.
>> > > > > > > ProcessorStateManager.
>> > > > > > > > > > > restoreActiveState(ProcessorStateManager.java:245)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > > org.apache.kafka.streams.processor.internals.
>> > > > > > > ProcessorStateManager.
>> > > > > > > >
>> > > > > > > > > > > register(ProcessorStateManager.java:198)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > > org.apache.kafka.streams.processor.internals.
>> > > > > > > > > > > ProcessorContextImpl.register(
>> > > ProcessorContextImpl.java:123)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > > org.apache.kafka.streams.state.internals.
>> > > > > > > RocksDBWindowStore.init(
>> > > > > > > > > > > RocksDBWindowStore.java:206)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > > org.apache.kafka.streams.state.internals.
>> > > > > > > MeteredWindowStore.init(
>> > > > > > > > > > > MeteredWindowStore.java:66)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > > org.apache.kafka.streams.state.internals.
>> > > > > > > CachingWindowStore.init(
>> > > > > > > > > > > CachingWindowStore.java:64)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > > org.apache.kafka.streams.processor.internals.
>> > > AbstractTask.
>> > > > > > > > > > > initializeStateStores(AbstractTask.java:81)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > > org.apache.kafka.streams.processor.internals.
>> > > > > > > > > > > StreamTask.<init>(StreamTask.java:120)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > > org.apache.kafka.streams.processor.internals.
>> > > > > > > > > > > StreamThread.createStreamTask(StreamThread.java:633)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > > org.apache.kafka.streams.processor.internals.
>> > > > > > > > > > > StreamThread.addStreamTasks(StreamThread.java:660)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > >
>> > > > org.apache.kafka.streams.processor.internals.StreamThread.ac
>> > > > > > > > cess$100(
>> > > > > > > > > > > StreamThread.java:69)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > >
>> > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
>> > > > > > > > > > > onPartitionsAssigned(StreamThread.java:124)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > >
>> > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.
>> > > > > > > > > > > onJoinComplete(ConsumerCoordinator.java:228)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > >
>> > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor.
>> > > > > > > > > > > joinGroupIfNeeded(AbstractCoordinator.java:313)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > >
>> > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor.
>> > > > > > > > > > > ensureActiveGroup(AbstractCoordinator.java:277)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > >
>> > > > > > > > org.apache.kafka.clients.consumer.internals.
>> > > > > ConsumerCoordinator.poll(
>> > > > > > > > > > > ConsumerCoordinator.java:259)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.
>> > > > > > > > > > > pollOnce(KafkaConsumer.java:1013)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > > org.apache.kafka.clients.consu
>> mer.KafkaConsumer.poll(
>> > > > > > > > > > > KafkaConsumer.java:979)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > > org.apache.kafka.streams.processor.internals.
>> > > > > > > StreamThread.runLoop(
>> > > > > > > > > > > StreamThread.java:407)
>> > > > > > > > > > > >
>> > > > > > > > > > > >         at
>> > > > > > > > > > > > org.apache.kafka.streams.processor.internals.
>> > > > > > > > > > > StreamThread.run(StreamThread.java:242)
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > --
>> > > > > > > > > -- Guozhang
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to