Hi,

Just to add another data point, I've been occasionally seeing the first error 
with a non-Samza app using the new Kafka producer with Snappy compression. I 
was going to post to the Kafka list but I haven't really narrowed down the 
situations yet. It sort of looks like it most often happens to me some minutes 
after a broker has restarted or had its ZK session time out in periods of very 
heavy load. But I need do more troubleshooting to have something less  vague to 
report over there.

Garry

-----Original Message-----
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: 01 May 2015 23:57
To: dev@samza.apache.org
Subject: Re: Errors and hung job on broker shutdown

Hmm, it seems your snappy compressed data is corrupted and hence keep getting 
rejected by the broker, hence keeping the producer blocked on close(). Not sure 
how this happens as I have not seen this error ever before (myself wrote the 
new Kafka producer's compression module, and have ran it with various kinds of 
unit / integration test cases, but did not see this coming)..

Guozhang

On Wed, Apr 29, 2015 at 11:37 PM, Roger Hoover <roger.hoo...@gmail.com>
wrote:

> Guozhang and Yan,
>
> Thank you both for your responses.  I tried a lot of combinations and 
> I think I've determined that it's new producer + snappy that causes 
> the issue.
>
> It never happens with the old producer and it never happens with lz4 
> or no compression.  It only happens when a broker gets restarted (or 
> maybe just shutdown).
>
> The error is not always the same.  I've noticed at least three types 
> of errors on the Kafka brokers.
>
> 1) java.io.IOException: failed to read chunk at
>
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.jav
> a:356)
> http://pastebin.com/NZrrEHxU
> 2) java.lang.OutOfMemoryError: Java heap space
>    at
>
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.jav
> a:346)
> http://pastebin.com/yuxk1BjY
> 3) java.io.IOException: PARSING_ERROR(2)
>   at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> http://pastebin.com/yq98Hx49
>
> I've noticed a couple different behaviors from the Samza producer/job
> A) It goes into a long retry loop where this message is logged.  I saw 
> this with error #1 above.
>
> 2015-04-29 18:17:31 Sender [WARN] task[Partition 7] 
> ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7] offset[9999253] 
> Got error produce response with correlation id 4878 on topic-partition 
> svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1, retrying (2147483646 
> attempts left). Error: CORRUPT_MESSAGE
>
> B) The job exists with
> org.apache.kafka.common.errors.UnknownServerException (at least when 
> run as ThreadJob).  I saw this with error #3 above.
>
> org.apache.samza.SamzaException: Unable to send message from 
> TaskName-Partition 6 to system kafka.
> org.apache.kafka.common.errors.UnknownServerException: The server 
> experienced an unexpected error when processing the request
>
> This seems most likely to be a bug in the new Kafka producer.  I'll 
> probably file a JIRA for that project.
>
> Thanks,
>
> Roger
>
> On Wed, Apr 29, 2015 at 7:38 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > And just to answer your first question: SIGTERM with 
> > controlled.shutdown=true should be OK for bouncing the broker.
> >
> > Guozhang
> >
> > On Wed, Apr 29, 2015 at 7:36 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Roger,
> > >
> > > I believe Samza 0.9.0 already uses the Java producer.
> > >
> > > Java producer's close() call will try to flush all buffered data 
> > > to the brokers before completing the call. However, if some 
> > > buffered data's destination partition leader is not known, the 
> > > producer will block on refreshing the metadata and then retry sending.
> > >
> > > From the broker logs, it seems it does receive the producer 
> > > request but failed to handle it due to "Leader not local" after the 
> > > bounce:
> > >
> > > --------
> > > [2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request with 
> > > correlation id 226 from client
> > > samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3 on 
> > > partition [sys.samza_metrics,0] failed due to Leader not local for 
> > > partition [sys.samza_metrics,0] on broker 0 
> > > (kafka.server.KafkaApis)
> > > [2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request with 
> > > correlation id 45671 from client
> > > samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4 on 
> > > partition [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0]
> failed
> > > due to Leader not local for partition 
> > > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on broker 
> > > 0
> > > (kafka.server.KafkaApis)
> > > [2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request with 
> > > correlation id 12267 from client
> > > samza_producer-svc_call_join_deploy-1-1429911471254-0 on partition 
> > > [sys.samza_metrics,0] failed due to Leader not local for partition 
> > > [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
> > > --------
> > >
> > > because for these two topic-partitions (sys.samza_metrics,0 and 
> > > __samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0), their lead 
> > > has
> > been
> > > moved to broker id:1,host:sit320w80m7,port:9092. When the producer 
> > > gets
> > the
> > > error code from the old leader, it should refresh its metadata and 
> > > get
> > the
> > > new leader as broker-1, and retry sending, but for some reason it 
> > > does
> > not
> > > refresh its metadata. Without producer logs from Samza container I
> cannot
> > > further investigate the issue.
> > >
> > > Which Kafka version does Samza 0.9.0 use?
> > >
> > > Guozhang
> > >
> > > On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang <yanfang...@gmail.com>
> wrote:
> > >
> > >> Not sure about the Kafka side. From the Samza side, from your 
> > >> description ( "does not exit nor does it make any progress" ), I 
> > >> think the code is stuck
> in
> > >> producer.close
> > >> <
> > >>
> >
> https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala
> /org/apache/samza/system/kafka/KafkaSystemProducer.scala#L143
> > >> >,
> > >> otherwise, it will throw SamzaException to quit the job. So maybe 
> > >> some Kafka experts in this mailing list or Kafka mailing list can 
> > >> help
> > >>
> > >> Fang, Yan
> > >> yanfang...@gmail.com
> > >>
> > >> On Tue, Apr 28, 2015 at 5:35 PM, Roger Hoover 
> > >> <roger.hoo...@gmail.com
> >
> > >> wrote:
> > >>
> > >> > At error level logging, this was the only entry in the Samza log:
> > >> >
> > >> > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 
> > >> > 2] ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2]
> offset[9129395]
> > >> > Unable to send message from TaskName-Partition 1 to system 
> > >> > kafka
> > >> >
> > >> > Here is the log from the Kafka broker that was shutdown.
> > >> >
> > >> > http://pastebin.com/afgmLyNF
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Roger
> > >> >
> > >> >
> > >> > On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan <nickpa...@gmail.com>
> wrote:
> > >> >
> > >> > > Roger, could you paste the full log from Samza container? If 
> > >> > > you
> can
> > >> > figure
> > >> > > out which Kafka broker the message was sent to, it would be
> helpful
> > >> if we
> > >> > > get the log from the broker as well.
> > >> > >
> > >> > > On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover <
> > roger.hoo...@gmail.com
> > >> >
> > >> > > wrote:
> > >> > >
> > >> > > > Hi,
> > >> > > >
> > >> > > > I need some help figuring out what's going on.
> > >> > > >
> > >> > > > I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN.  All the
> topics
> > >> have
> > >> > > > replication factor of 2.
> > >> > > >
> > >> > > > I'm bouncing the Kafka broker using SIGTERM (with 
> > >> > > > controlled.shutdown.enable=true).  I see the Samza job log 
> > >> > > > this
> > >> message
> > >> > > and
> > >> > > > then hang (does not exit nor does it make any progress).
> > >> > > >
> > >> > > > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] 
> > >> > > > task[Partition
> 2]
> > >> > > > ssp[kafka,my-topic,2] offset[9129395] Unable to send 
> > >> > > > message
> from
> > >> > > > TaskName-Partition 1 to system kafka
> > >> > > >
> > >> > > > The Kafka consumer (Druid Real-Time node) on the other side 
> > >> > > > then
> > >> barfs
> > >> > on
> > >> > > > the message:
> > >> > > >
> > >> > > > Exception in thread "chief-svc-perf"
> > >> > > kafka.message.InvalidMessageException:
> > >> > > > Message is corrupt (stored crc = 1792882425, computed crc =
> > >> 3898271689)
> > >> > > > at kafka.message.Message.ensureValid(Message.scala:166)
> > >> > > > at
> > >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:1
> > >> 01)
> > >> > > > at
> > >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:3
> > >> 3)
> > >> > > > at
> > >> > >
> > >>
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:6
> 6)
> > >> > > > at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> > >> > > > at
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEight
> FirehoseFactory.java:106)
> > >> > > > at
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManage
> r.java:234)
> > >> > > >
> > >> > > > My questions are:
> > >> > > > 1) What is the right way to bounce a Kafka broker?
> > >> > > > 2) Is this a bug in Samza that the job hangs after producer
> > request
> > >> > > fails?
> > >> > > > Has anyone seen this?
> > >> > > >
> > >> > > > Thanks,
> > >> > > >
> > >> > > > Roger
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



--
-- Guozhang

Reply via email to