Hmm, it seems your snappy compressed data is corrupted and hence keep getting rejected by the broker, hence keeping the producer blocked on close(). Not sure how this happens as I have not seen this error ever before (myself wrote the new Kafka producer's compression module, and have ran it with various kinds of unit / integration test cases, but did not see this coming)..
Guozhang On Wed, Apr 29, 2015 at 11:37 PM, Roger Hoover <roger.hoo...@gmail.com> wrote: > Guozhang and Yan, > > Thank you both for your responses. I tried a lot of combinations and I > think I've determined that it's new producer + snappy that causes the > issue. > > It never happens with the old producer and it never happens with lz4 or no > compression. It only happens when a broker gets restarted (or maybe just > shutdown). > > The error is not always the same. I've noticed at least three types of > errors on the Kafka brokers. > > 1) java.io.IOException: failed to read chunk > at > > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:356) > http://pastebin.com/NZrrEHxU > 2) java.lang.OutOfMemoryError: Java heap space > at > > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:346) > http://pastebin.com/yuxk1BjY > 3) java.io.IOException: PARSING_ERROR(2) > at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) > http://pastebin.com/yq98Hx49 > > I've noticed a couple different behaviors from the Samza producer/job > A) It goes into a long retry loop where this message is logged. I saw this > with error #1 above. > > 2015-04-29 18:17:31 Sender [WARN] task[Partition 7] > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7] offset[9999253] Got > error produce response with correlation id 4878 on topic-partition > svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1, retrying (2147483646 attempts > left). Error: CORRUPT_MESSAGE > > B) The job exists with > org.apache.kafka.common.errors.UnknownServerException (at least when run as > ThreadJob). I saw this with error #3 above. > > org.apache.samza.SamzaException: Unable to send message from > TaskName-Partition 6 to system kafka. > org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request > > This seems most likely to be a bug in the new Kafka producer. I'll > probably file a JIRA for that project. > > Thanks, > > Roger > > On Wed, Apr 29, 2015 at 7:38 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > And just to answer your first question: SIGTERM with > > controlled.shutdown=true should be OK for bouncing the broker. > > > > Guozhang > > > > On Wed, Apr 29, 2015 at 7:36 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Roger, > > > > > > I believe Samza 0.9.0 already uses the Java producer. > > > > > > Java producer's close() call will try to flush all buffered data to the > > > brokers before completing the call. However, if some buffered data's > > > destination partition leader is not known, the producer will block on > > > refreshing the metadata and then retry sending. > > > > > > From the broker logs, it seems it does receive the producer request but > > > failed to handle it due to "Leader not local" after the bounce: > > > > > > -------- > > > [2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request with > > > correlation id 226 from client > > > samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3 on partition > > > [sys.samza_metrics,0] failed due to Leader not local for partition > > > [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis) > > > [2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request with > > > correlation id 45671 from client > > > samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4 on > > > partition [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] > failed > > > due to Leader not local for partition > > > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on broker 0 > > > (kafka.server.KafkaApis) > > > [2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request with > > > correlation id 12267 from client > > > samza_producer-svc_call_join_deploy-1-1429911471254-0 on partition > > > [sys.samza_metrics,0] failed due to Leader not local for partition > > > [sys.samza_metrics,0] on broker 0 (kafka.server.KafkaApis) > > > -------- > > > > > > because for these two topic-partitions (sys.samza_metrics,0 and > > > __samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0), their lead has > > been > > > moved to broker id:1,host:sit320w80m7,port:9092. When the producer gets > > the > > > error code from the old leader, it should refresh its metadata and get > > the > > > new leader as broker-1, and retry sending, but for some reason it does > > not > > > refresh its metadata. Without producer logs from Samza container I > cannot > > > further investigate the issue. > > > > > > Which Kafka version does Samza 0.9.0 use? > > > > > > Guozhang > > > > > > On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang <yanfang...@gmail.com> > wrote: > > > > > >> Not sure about the Kafka side. From the Samza side, from your > > >> description ( "does > > >> not exit nor does it make any progress" ), I think the code is stuck > in > > >> producer.close > > >> < > > >> > > > https://github.com/apache/samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala#L143 > > >> >, > > >> otherwise, it will throw SamzaException to quit the job. So maybe some > > >> Kafka experts in this mailing list or Kafka mailing list can help > > >> > > >> Fang, Yan > > >> yanfang...@gmail.com > > >> > > >> On Tue, Apr 28, 2015 at 5:35 PM, Roger Hoover <roger.hoo...@gmail.com > > > > >> wrote: > > >> > > >> > At error level logging, this was the only entry in the Samza log: > > >> > > > >> > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition 2] > > >> > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2] > offset[9129395] > > >> > Unable to send message from TaskName-Partition 1 to system kafka > > >> > > > >> > Here is the log from the Kafka broker that was shutdown. > > >> > > > >> > http://pastebin.com/afgmLyNF > > >> > > > >> > Thanks, > > >> > > > >> > Roger > > >> > > > >> > > > >> > On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan <nickpa...@gmail.com> > wrote: > > >> > > > >> > > Roger, could you paste the full log from Samza container? If you > can > > >> > figure > > >> > > out which Kafka broker the message was sent to, it would be > helpful > > >> if we > > >> > > get the log from the broker as well. > > >> > > > > >> > > On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover < > > roger.hoo...@gmail.com > > >> > > > >> > > wrote: > > >> > > > > >> > > > Hi, > > >> > > > > > >> > > > I need some help figuring out what's going on. > > >> > > > > > >> > > > I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN. All the > topics > > >> have > > >> > > > replication factor of 2. > > >> > > > > > >> > > > I'm bouncing the Kafka broker using SIGTERM (with > > >> > > > controlled.shutdown.enable=true). I see the Samza job log this > > >> message > > >> > > and > > >> > > > then hang (does not exit nor does it make any progress). > > >> > > > > > >> > > > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] task[Partition > 2] > > >> > > > ssp[kafka,my-topic,2] offset[9129395] Unable to send message > from > > >> > > > TaskName-Partition 1 to system kafka > > >> > > > > > >> > > > The Kafka consumer (Druid Real-Time node) on the other side then > > >> barfs > > >> > on > > >> > > > the message: > > >> > > > > > >> > > > Exception in thread "chief-svc-perf" > > >> > > kafka.message.InvalidMessageException: > > >> > > > Message is corrupt (stored crc = 1792882425, computed crc = > > >> 3898271689) > > >> > > > at kafka.message.Message.ensureValid(Message.scala:166) > > >> > > > at > > >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:101) > > >> > > > at > > >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) > > >> > > > at > > >> > > > > >> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > > >> > > > at > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > > >> > > > at > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > > io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEightFirehoseFactory.java:106) > > >> > > > at > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > > io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:234) > > >> > > > > > >> > > > My questions are: > > >> > > > 1) What is the right way to bounce a Kafka broker? > > >> > > > 2) Is this a bug in Samza that the job hangs after producer > > request > > >> > > fails? > > >> > > > Has anyone seen this? > > >> > > > > > >> > > > Thanks, > > >> > > > > > >> > > > Roger > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang