Filed https://issues.apache.org/jira/browse/KAFKA-3708

As for your case now, you can see if setting KafkaStreams.
setUncaughtExceptionHandler works for you or not.
In the future, it maybe better handling them in the library and stop the
process since they are not really recoverable.

Guozhang

On Thu, May 12, 2016 at 6:57 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> After looking at the source code (trunk), I think this is a bug that there
> are some cases where KafkaExceptions are not logged in log4j yet before it
> pop up to the StreamThread. I will file a JIRA for this issue.
>
>
> Guozhang
>
> On Thu, May 12, 2016 at 5:57 PM, Greg Fodor <gfo...@gmail.com> wrote:
>
>> Hey Guozhang, output was silent (since the log4j is streaming it to a
>> log file) except for the stack trace landing in stdout, i'm assuming
>> from the rethrow:
>>
>> Exception in thread "StreamThread-9"
>> org.apache.kafka.common.errors.SerializationException: Error
>> serializing Avro message
>>
>> Caused by: java.lang.NullPointerException: null of
>> com.altvr.schema.jobs.RoomViewId in field source_view_id of
>> com.altvr.schema.jobs.ViewBroadcastKey
>>
>>         at
>> org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:93)
>>
>>         at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:87)
>>
>>         at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
>>
>>         at
>> io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:65)
>>
>>         at
>> io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:61)
>>
>>         at
>> com.altvr.streams.serializers.SpecificKafkaAvroSerializer.serialize(SpecificKafkaAvroSerializer.java:27)
>>
>>         at
>> com.altvr.streams.serializers.SpecificKafkaAvroSerializer.serialize(SpecificKafkaAvroSerializer.java:10)
>>
>>         at
>> org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:103)
>>
>>         at
>> org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:229)
>>
>>         at
>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:100)
>>
>>         at
>> com.altvr.streams.jobs.RoomOperationMessageProcessor.hasBegunBroadcastingRootViewToSpace(RoomOperationMessageProcessor.java:407)
>>
>>         at
>> com.altvr.streams.jobs.RoomOperationMessageProcessor.setupOrTeardownBroadcasts(RoomOperationMessageProcessor.java:377)
>>
>>         at
>> com.altvr.streams.jobs.RoomOperationMessageProcessor.forwardMessagesForBroadcasts(RoomOperationMessageProcessor.java:106)
>>
>>         at
>> com.altvr.streams.jobs.BroadcastPhotonMessages$1.transform(BroadcastPhotonMessages.java:193)
>>
>>         at
>> com.altvr.streams.jobs.BroadcastPhotonMessages$1.transform(BroadcastPhotonMessages.java:167)
>>
>>         at
>> org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:57)
>>
>>         at
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
>>
>>         at
>> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
>>
>>         at
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>
>>         at
>> org.apache.kafka.streams.kstream.internals.KStreamKTableLeftJoin$KStreamKTableLeftJoinProcessor.process(KStreamKTableLeftJoin.java:61)
>>
>>         at
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
>>
>>         at
>> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
>>
>>         at
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
>>
>>         at
>> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
>>
>>         at
>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
>>
>>         at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:352)
>>
>>         at
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:250)
>>
>> Caused by: java.lang.NullPointerException
>>
>>         at
>> org.apache.avro.generic.GenericData.getField(GenericData.java:580)
>>
>>         at
>> org.apache.avro.generic.GenericData.getField(GenericData.java:595)
>>
>>         at
>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:112)
>>
>>         at
>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>>
>>         at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
>>
>>         at
>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:114)
>>
>>         at
>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
>>
>>         at
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
>>
>>
>>
>> On Thu, May 12, 2016 at 5:09 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> > Greg,
>> >
>> > Could you post the output from stdout when running the console as well?
>> >
>> >
>> > Guozhang
>> >
>> > On Thu, May 12, 2016 at 4:52 PM, Greg Fodor <gfo...@gmail.com> wrote:
>> >
>> >> We noticed that some errors were happening in one of our KafkaStreams
>> >> jobs but they were not appearing in our logs or being sent to our
>> >> error reporting service (Airbrake) -- they only became visible on
>> >> stdout when running from the console. I believe the reason is because
>> >> of this explicit catch-rethrow of KafkaExceptions in the main run loop
>> >> of the StreamThread:
>> >>
>> >>
>> >>
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L219
>> >>
>> >> What's the proper way for these exceptions to be getting logged? The
>> >> comment indicates they should be getting logged but they are not being
>> >> processed by our loggers in log4j.properties. Here is our properties
>> >> file:
>> >>
>> >> https://gist.github.com/gfodor/7d9a2102bde5f2dc062ead4e9551e670
>> >>
>> >> Thanks!
>> >>
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to