n/m - I understand the logging issue now. Am generating a new one. Will
send shortly.

On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers <jon.yearg...@cedexis.com>
wrote:

> Yes - saw that one. There were plenty of smaller records available though.
>
> I sent another log this morning with the level set to DEBUG. Hopefully you
> rec'd it.
>
> On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy <damian....@gmail.com> wrote:
>
>> HI Jon,
>>
>> It looks like you have the logging level for KafkaStreams set to at least
>> WARN. I can only see ERROR level logs being produced from Streams.
>>
>> However, i did notice an issue in the logs (not related to your specific
>> error but you will need to fix anyway):
>>
>> There are lots of messages like:
>> task [2_9] Error sending record to topic
>> PRTMinuteAgg-prt_hour_agg_stream-changelog
>> org.apache.kafka.common.errors.RecordTooLargeException: The message is
>> 2381750 bytes when serialized which is larger than the maximum
>>
>> This means you need to add some extra config to your StreamsConfig:
>> config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
>> expectedMaximumMessageSizeBytes)
>>
>> You will also possible need to adjust the broker properties and
>> increase message.max.bytes
>> - it will need to be at least as large as the setting above.
>>
>> At the moment all of the change-logs for your state-stores are being
>> dropped due to this issue.
>>
>> Thanks,
>> Damian
>>
>>
>> On Tue, 13 Dec 2016 at 11:32 Jon Yeargers <jon.yearg...@cedexis.com>
>> wrote:
>>
>> > (am attaching a debug log - note that app terminated with no further
>> > messages)
>> >
>> > topology: kStream -> groupByKey.aggregate(minute) -> foreach
>> >                              \-> groupByKey.aggregate(hour) -> foreach
>> >
>> >
>> > config:
>> >
>> >         Properties config = new Properties();
>> >         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
>> >         config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>> ZOOKEEPER_IP);
>> >         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg"
>> );
>> >         config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > AggKey.class.getName());
>> >         config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> > Serdes.String().getClass().getName());
>> >         config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
>> >         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
>> >         config.put(StreamsConfig.STATE_DIR_CONFIG,
>> "/mnt/PRTMinuteAgg");
>> >
>> >         config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
>> >
>> >
>> > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >
>> > Jon,
>> >
>> > To help investigating this issue, could you let me know 1) your topology
>> > sketch and 2) your app configs? For example did you enable caching in
>> your
>> > apps with the cache.max.bytes.buffering config?
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers <jon.yearg...@cedexis.com
>> >
>> > wrote:
>> >
>> > > I get this one quite a bit. It kills my app after a short time of
>> > running.
>> > > Driving me nuts.
>> > >
>> > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax <
>> matth...@confluent.io>
>> > > wrote:
>> > >
>> > > > Not sure about this one.
>> > > >
>> > > > Can you describe what you do exactly? Can you reproduce the issue?
>> We
>> > > > definitely want to investigate this.
>> > > >
>> > > > -Matthias
>> > > >
>> > > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
>> > > > > (Am reporting these as have moved to 0.10.1.0-cp2)
>> > > > >
>> > > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
>> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
>> group
>> > > > > MinuteAgg failed on partition assignment
>> > > > >
>> > > > > java.lang.IllegalStateException: task [1_9] Log end offset
>> should not
>> > > > > change while restoring
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> anager.
>> > > > restoreActiveState(ProcessorStateManager.java:245)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> anager.
>> > > > register(ProcessorStateManager.java:198)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.processor.internals.
>> > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
>> > > > RocksDBWindowStore.java:206)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
>> > > > MeteredWindowStore.java:66)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
>> > > > CachingWindowStore.java:64)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.processor.internals.AbstractTask.
>> > > > initializeStateStores(AbstractTask.java:81)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.processor.internals.
>> > > > StreamTask.<init>(StreamTask.java:120)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.processor.internals.
>> > > > StreamThread.createStreamTask(StreamThread.java:633)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.processor.internals.
>> > > > StreamThread.addStreamTasks(StreamThread.java:660)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.processor.internals.StreamThread.
>> access$100(
>> > > > StreamThread.java:69)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
>> > > > onPartitionsAssigned(StreamThread.java:124)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>> > > > onJoinComplete(ConsumerCoordinator.java:228)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> > > > joinGroupIfNeeded(AbstractCoordinator.java:313)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> > > > ensureActiveGroup(AbstractCoordinator.java:277)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.poll(
>> > > > ConsumerCoordinator.java:259)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.
>> > > > pollOnce(KafkaConsumer.java:1013)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> > > > KafkaConsumer.java:979)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.processor.internals.StreamThread.
>> runLoop(
>> > > > StreamThread.java:407)
>> > > > >
>> > > > >         at
>> > > > > org.apache.kafka.streams.processor.internals.
>> > > > StreamThread.run(StreamThread.java:242)
>> > > > >
>> > > >
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>> >
>> >
>>
>
>

Reply via email to