Roger,

I believe Samza 0.9.0 already uses the Java producer.

Java producer's close() call will try to flush all buffered data to the
brokers before completing the call. However, if some buffered data's
destination partition leader is not known, the producer will block on
refreshing the metadata and then retry sending.

>From the broker logs, it seems it does receive the producer request but
failed to handle it due to "Leader not local" after the bounce:

--------
[2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request with
correlation id 226 from client
samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3 on partition
[sys.samza_metrics,0] failed due to Leader not local for partition
[sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
[2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request with
correlation id 45671 from client
samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4 on
partition [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] failed
due to Leader not local for partition
[__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on broker 0
(kafka.server.KafkaApis)
[2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request with
correlation id 12267 from client
samza_producer-svc_call_join_deploy-1-1429911471254-0 on partition
[sys.samza_metrics,0] failed due to Leader not local for partition
[sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis)
--------

because for these two topic-partitions (sys.samza_metrics,0 and
__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0), their lead has been
moved to broker id:1,host:sit320w80m7,port:9092. When the producer gets the
error code from the old leader, it should refresh its metadata and get the
new leader as broker-1, and retry sending, but for some reason it does not
refresh its metadata. Without producer logs from Samza container I cannot
further investigate the issue.

Which Kafka version does Samza 0.9.0 use?

Guozhang

On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang <yanfang...@gmail.com> wrote:

> Not sure about the Kafka side. From the Samza side, from your
> description ( "does
> not exit nor does it make any progress" ), I think the code is stuck in
> producer.close
> <
> https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala#L143
> >,
> otherwise, it will throw SamzaException to quit the job. So maybe some
> Kafka experts in this mailing list or Kafka mailing list can help
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Tue, Apr 28, 2015 at 5:35 PM, Roger Hoover <roger.hoo...@gmail.com>
> wrote:
>
> > At error level logging, this was the only entry in the Samza log:
> >
> > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
> > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2] offset[9129395]
> > Unable to send message from TaskName-Partition 1 to system kafka
> >
> > Here is the log from the Kafka broker that was shutdown.
> >
> > http://pastebin.com/afgmLyNF
> >
> > Thanks,
> >
> > Roger
> >
> >
> > On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan <nickpa...@gmail.com> wrote:
> >
> > > Roger, could you paste the full log from Samza container? If you can
> > figure
> > > out which Kafka broker the message was sent to, it would be helpful if
> we
> > > get the log from the broker as well.
> > >
> > > On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover <roger.hoo...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I need some help figuring out what's going on.
> > > >
> > > > I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN.  All the topics
> have
> > > > replication factor of 2.
> > > >
> > > > I'm bouncing the Kafka broker using SIGTERM (with
> > > > controlled.shutdown.enable=true).  I see the Samza job log this
> message
> > > and
> > > > then hang (does not exit nor does it make any progress).
> > > >
> > > > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2]
> > > > ssp[kafka,my-topic,2] offset[9129395] Unable to send message from
> > > > TaskName-Partition 1 to system kafka
> > > >
> > > > The Kafka consumer (Druid Real-Time node) on the other side then
> barfs
> > on
> > > > the message:
> > > >
> > > > Exception in thread "chief-svc-perf"
> > > kafka.message.InvalidMessageException:
> > > > Message is corrupt (stored crc = 1792882425, computed crc =
> 3898271689)
> > > > at kafka.message.Message.ensureValid(Message.scala:166)
> > > > at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:101)
> > > > at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> > > > at
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> > > > at
> > > >
> > > >
> > >
> >
> io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEightFirehoseFactory.java:106)
> > > > at
> > > >
> > > >
> > >
> >
> io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:234)
> > > >
> > > > My questions are:
> > > > 1) What is the right way to bounce a Kafka broker?
> > > > 2) Is this a bug in Samza that the job hangs after producer request
> > > fails?
> > > > Has anyone seen this?
> > > >
> > > > Thanks,
> > > >
> > > > Roger
> > > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to