n/m - I understand the logging issue now. Am generating a new one. Will send shortly.
On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers <jon.yearg...@cedexis.com> wrote: > Yes - saw that one. There were plenty of smaller records available though. > > I sent another log this morning with the level set to DEBUG. Hopefully you > rec'd it. > > On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy <damian....@gmail.com> wrote: > >> HI Jon, >> >> It looks like you have the logging level for KafkaStreams set to at least >> WARN. I can only see ERROR level logs being produced from Streams. >> >> However, i did notice an issue in the logs (not related to your specific >> error but you will need to fix anyway): >> >> There are lots of messages like: >> task [2_9] Error sending record to topic >> PRTMinuteAgg-prt_hour_agg_stream-changelog >> org.apache.kafka.common.errors.RecordTooLargeException: The message is >> 2381750 bytes when serialized which is larger than the maximum >> >> This means you need to add some extra config to your StreamsConfig: >> config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, >> expectedMaximumMessageSizeBytes) >> >> You will also possible need to adjust the broker properties and >> increase message.max.bytes >> - it will need to be at least as large as the setting above. >> >> At the moment all of the change-logs for your state-stores are being >> dropped due to this issue. >> >> Thanks, >> Damian >> >> >> On Tue, 13 Dec 2016 at 11:32 Jon Yeargers <jon.yearg...@cedexis.com> >> wrote: >> >> > (am attaching a debug log - note that app terminated with no further >> > messages) >> > >> > topology: kStream -> groupByKey.aggregate(minute) -> foreach >> > \-> groupByKey.aggregate(hour) -> foreach >> > >> > >> > config: >> > >> > Properties config = new Properties(); >> > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); >> > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, >> ZOOKEEPER_IP); >> > config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" >> ); >> > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> > AggKey.class.getName()); >> > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> > Serdes.String().getClass().getName()); >> > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); >> > config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2"); >> > config.put(StreamsConfig.STATE_DIR_CONFIG, >> "/mnt/PRTMinuteAgg"); >> > >> > config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2"); >> > >> > >> > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> > Jon, >> > >> > To help investigating this issue, could you let me know 1) your topology >> > sketch and 2) your app configs? For example did you enable caching in >> your >> > apps with the cache.max.bytes.buffering config? >> > >> > >> > Guozhang >> > >> > >> > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers <jon.yearg...@cedexis.com >> > >> > wrote: >> > >> > > I get this one quite a bit. It kills my app after a short time of >> > running. >> > > Driving me nuts. >> > > >> > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax < >> matth...@confluent.io> >> > > wrote: >> > > >> > > > Not sure about this one. >> > > > >> > > > Can you describe what you do exactly? Can you reproduce the issue? >> We >> > > > definitely want to investigate this. >> > > > >> > > > -Matthias >> > > > >> > > > On 12/10/16 4:17 PM, Jon Yeargers wrote: >> > > > > (Am reporting these as have moved to 0.10.1.0-cp2) >> > > > > >> > > > > ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener >> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for >> group >> > > > > MinuteAgg failed on partition assignment >> > > > > >> > > > > java.lang.IllegalStateException: task [1_9] Log end offset >> should not >> > > > > change while restoring >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.processor.internals.ProcessorStateM >> anager. >> > > > restoreActiveState(ProcessorStateManager.java:245) >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.processor.internals.ProcessorStateM >> anager. >> > > > register(ProcessorStateManager.java:198) >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.processor.internals. >> > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123) >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init( >> > > > RocksDBWindowStore.java:206) >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init( >> > > > MeteredWindowStore.java:66) >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.state.internals.CachingWindowStore.init( >> > > > CachingWindowStore.java:64) >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.processor.internals.AbstractTask. >> > > > initializeStateStores(AbstractTask.java:81) >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.processor.internals. >> > > > StreamTask.<init>(StreamTask.java:120) >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.processor.internals. >> > > > StreamThread.createStreamTask(StreamThread.java:633) >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.processor.internals. >> > > > StreamThread.addStreamTasks(StreamThread.java:660) >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.processor.internals.StreamThread. >> access$100( >> > > > StreamThread.java:69) >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. >> > > > onPartitionsAssigned(StreamThread.java:124) >> > > > > >> > > > > at >> > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. >> > > > onJoinComplete(ConsumerCoordinator.java:228) >> > > > > >> > > > > at >> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> > > > joinGroupIfNeeded(AbstractCoordinator.java:313) >> > > > > >> > > > > at >> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> > > > ensureActiveGroup(AbstractCoordinator.java:277) >> > > > > >> > > > > at >> > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordina >> tor.poll( >> > > > ConsumerCoordinator.java:259) >> > > > > >> > > > > at >> > > > > org.apache.kafka.clients.consumer.KafkaConsumer. >> > > > pollOnce(KafkaConsumer.java:1013) >> > > > > >> > > > > at >> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( >> > > > KafkaConsumer.java:979) >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.processor.internals.StreamThread. >> runLoop( >> > > > StreamThread.java:407) >> > > > > >> > > > > at >> > > > > org.apache.kafka.streams.processor.internals. >> > > > StreamThread.run(StreamThread.java:242) >> > > > > >> > > > >> > > > >> > > >> > >> > >> > >> > -- >> > -- Guozhang >> > >> > >> > >> > >