That is correct On Wed, 14 Dec 2016 at 12:09 Jon Yeargers <jon.yearg...@cedexis.com> wrote:
> I have the app running on 5 machines. Is that what you mean? > > On Wed, Dec 14, 2016 at 1:38 AM, Damian Guy <damian....@gmail.com> wrote: > > > Hi Jon, > > > > Do you have more than one instance of the app running? The reason i ask > is > > because the task (task 1_3) that fails with the > > "java.lang.IllegalStateException" in this log is previously running as a > > Standby Task. This would mean the active task for this store would have > > been running elsewhere, but i don't see that in the logs. The exception > > occurs as StreamThread-1 starts to run task 1_3 as an active task. The > > exception might indicate that another thread/instance is still writing to > > the changelog topic for the State Store. > > > > Thanks, > > Damian > > > > On Tue, 13 Dec 2016 at 17:23 Jon Yeargers <jon.yearg...@cedexis.com> > > wrote: > > > > > As near as I can see it's rebalancing constantly. > > > > > > I'll up that value and see what happens. > > > > > > On Tue, Dec 13, 2016 at 9:04 AM, Damian Guy <damian....@gmail.com> > > wrote: > > > > > > > Hi Jon, > > > > > > > > I haven't had much of a chance to look at the logs in detail too much > > > yet, > > > > but i have noticed that your app seems to be rebalancing frequently. > > It > > > > seems that it is usually around the 300 second mark, which usually > > would > > > > mean that poll hasn't been called for at least that long. You might > > want > > > to > > > > try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG to > > > > something > > > > higher than 300000 (which is the default). > > > > > > > > I'll continue to look at your logs and get back to you. > > > > Thanks, > > > > Damian > > > > > > > > On Tue, 13 Dec 2016 at 15:02 Jon Yeargers <jon.yearg...@cedexis.com> > > > > wrote: > > > > > > > > > attached is a log with lots of disconnections and a small amount of > > > > > actual, useful activity. > > > > > > > > > > > > > > > > > > > > On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers < > > > jon.yearg...@cedexis.com> > > > > > wrote: > > > > > > > > > > n/m - I understand the logging issue now. Am generating a new one. > > Will > > > > > send shortly. > > > > > > > > > > On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers < > > > jon.yearg...@cedexis.com> > > > > > wrote: > > > > > > > > > > Yes - saw that one. There were plenty of smaller records available > > > > though. > > > > > > > > > > I sent another log this morning with the level set to DEBUG. > > Hopefully > > > > you > > > > > rec'd it. > > > > > > > > > > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy <damian....@gmail.com> > > > > wrote: > > > > > > > > > > HI Jon, > > > > > > > > > > It looks like you have the logging level for KafkaStreams set to at > > > least > > > > > WARN. I can only see ERROR level logs being produced from Streams. > > > > > > > > > > However, i did notice an issue in the logs (not related to your > > > specific > > > > > error but you will need to fix anyway): > > > > > > > > > > There are lots of messages like: > > > > > task [2_9] Error sending record to topic > > > > > PRTMinuteAgg-prt_hour_agg_stream-changelog > > > > > org.apache.kafka.common.errors.RecordTooLargeException: The message > > is > > > > > 2381750 bytes when serialized which is larger than the maximum > > > > > > > > > > This means you need to add some extra config to your StreamsConfig: > > > > > config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, > > > > > expectedMaximumMessageSizeBytes) > > > > > > > > > > You will also possible need to adjust the broker properties and > > > > > increase message.max.bytes > > > > > - it will need to be at least as large as the setting above. > > > > > > > > > > At the moment all of the change-logs for your state-stores are > being > > > > > dropped due to this issue. > > > > > > > > > > Thanks, > > > > > Damian > > > > > > > > > > > > > > > > > > > > On Tue, 13 Dec 2016 at 11:32 Jon Yeargers < > jon.yearg...@cedexis.com> > > > > > wrote: > > > > > > > > > > > (am attaching a debug log - note that app terminated with no > > further > > > > > > messages) > > > > > > > > > > > > topology: kStream -> groupByKey.aggregate(minute) -> foreach > > > > > > \-> groupByKey.aggregate(hour) -> > > > foreach > > > > > > > > > > > > > > > > > > config: > > > > > > > > > > > > Properties config = new Properties(); > > > > > > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > > BROKER_IP); > > > > > > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > > > > ZOOKEEPER_IP); > > > > > > config.put(StreamsConfig.APPLICATION_ID_CONFIG, > > > > "PRTMinuteAgg" ); > > > > > > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > > > > > AggKey.class.getName()); > > > > > > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > > > > > Serdes.String().getClass().getName()); > > > > > > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, > > "snappy"); > > > > > > config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, > > "2"); > > > > > > config.put(StreamsConfig.STATE_DIR_CONFIG, > > > > "/mnt/PRTMinuteAgg"); > > > > > > > > > > > > config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2"); > > > > > > > > > > > > > > > > > > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang < > wangg...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > Jon, > > > > > > > > > > > > To help investigating this issue, could you let me know 1) your > > > > topology > > > > > > sketch and 2) your app configs? For example did you enable > caching > > in > > > > > your > > > > > > apps with the cache.max.bytes.buffering config? > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers < > > > > jon.yearg...@cedexis.com> > > > > > > wrote: > > > > > > > > > > > > > I get this one quite a bit. It kills my app after a short time > of > > > > > > running. > > > > > > > Driving me nuts. > > > > > > > > > > > > > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax < > > > > > matth...@confluent.io> > > > > > > > wrote: > > > > > > > > > > > > > > > Not sure about this one. > > > > > > > > > > > > > > > > Can you describe what you do exactly? Can you reproduce the > > > issue? > > > > We > > > > > > > > definitely want to investigate this. > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > On 12/10/16 4:17 PM, Jon Yeargers wrote: > > > > > > > > > (Am reporting these as have moved to 0.10.1.0-cp2) > > > > > > > > > > > > > > > > > > ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided > > listener > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 > > for > > > > > group > > > > > > > > > MinuteAgg failed on partition assignment > > > > > > > > > > > > > > > > > > java.lang.IllegalStateException: task [1_9] Log end offset > > > > should > > > > > not > > > > > > > > > change while restoring > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > ProcessorStateManager. > > > > > > > > restoreActiveState(ProcessorStateManager.java:245) > > > > > > > > > > > > > > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > ProcessorStateManager. > > > > > > > > > > > > > register(ProcessorStateManager.java:198) > > > > > > > > > > > > > > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123) > > > > > > > > > > > > > > > > > > at > > > > > > > > > org.apache.kafka.streams.state.internals. > > > > RocksDBWindowStore.init( > > > > > > > > RocksDBWindowStore.java:206) > > > > > > > > > > > > > > > > > > at > > > > > > > > > org.apache.kafka.streams.state.internals. > > > > MeteredWindowStore.init( > > > > > > > > MeteredWindowStore.java:66) > > > > > > > > > > > > > > > > > > at > > > > > > > > > org.apache.kafka.streams.state.internals. > > > > CachingWindowStore.init( > > > > > > > > CachingWindowStore.java:64) > > > > > > > > > > > > > > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask. > > > > > > > > initializeStateStores(AbstractTask.java:81) > > > > > > > > > > > > > > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > > StreamTask.<init>(StreamTask.java:120) > > > > > > > > > > > > > > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > > StreamThread.createStreamTask(StreamThread.java:633) > > > > > > > > > > > > > > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > > StreamThread.addStreamTasks(StreamThread.java:660) > > > > > > > > > > > > > > > > > > at > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.ac > > > > > cess$100( > > > > > > > > StreamThread.java:69) > > > > > > > > > > > > > > > > > > at > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > > > > > > > > onPartitionsAssigned(StreamThread.java:124) > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > > > > > > > > onJoinComplete(ConsumerCoordinator.java:228) > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > > > > > joinGroupIfNeeded(AbstractCoordinator.java:313) > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > > > > > ensureActiveGroup(AbstractCoordinator.java:277) > > > > > > > > > > > > > > > > > > at > > > > > > > > > > > > > > org.apache.kafka.clients.consumer.internals. > > ConsumerCoordinator.poll( > > > > > > > > ConsumerCoordinator.java:259) > > > > > > > > > > > > > > > > > > at > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > > > > > > > > pollOnce(KafkaConsumer.java:1013) > > > > > > > > > > > > > > > > > > at > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > > > > > > KafkaConsumer.java:979) > > > > > > > > > > > > > > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > StreamThread.runLoop( > > > > > > > > StreamThread.java:407) > > > > > > > > > > > > > > > > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals. > > > > > > > > StreamThread.run(StreamThread.java:242) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >