Hmm, that is a little wired since if the ip addresses are not changed then the producer should be able to re-connect. I checked your logs again, and only saw two ip addresses (10.0.0.40, 10.0.2.105), and the thrown exceptions indicate that the producer cannot connect to them (I am assuming the logs you attached are after the brokers have resumed).
Guozhang On Mon, May 11, 2015 at 11:53 PM, Dan <danharve...@gmail.com> wrote: > The three ip addresses were the same as before, but in a different mapping > to id's. Does what you said still hold true in that case? > > We are fixing the ip and id mappings so that they will be fixed so this > problem won't happen for us again. I know this scenario is not a expected > case. > > - Dan > > > On Tue, 12 May 2015 at 01:36 Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Dan, > > > > I think your scenario is different with Roger / Garry's. For your case > this > > is actually the expected behavior since if all brokers are gone at > roughly > > the same time, and then re-appear with different ip address, then > producer > > cannot detect the "new" brokers via metadata refresh since there is no > > transition period when some old broker can tell the producer about the > new > > brokers. In general, you should only do rolling bounce on kafka brokers > for > > upgrades / maintenance / etc. > > > > Guozhang > > > > On Fri, May 8, 2015 at 8:24 AM, Dan <danharve...@gmail.com> wrote: > > > > > Hi, > > > > > > I think we are seeing a similar error too where Samza is getting stuck > on > > > producing to Kafka. This was on our sandbox environment and not > > production, > > > we had all Kafka instances (3) go away briefly then re-appear with > > > different ip address -> broker mappings. I know this is not ideal and > > we're > > > fixing that separately, but I would have expected Samza/Producer to > > > recover. > > > > > > What happened instead is that Samza (via the Kafka Producer client) got > > > stuck in a loop outputting "org.apache.kafka.common.network.Selector: > > Error > > > in I/O with <HOST>" > > > > > > So I think there might be a bug in the Kafka client where it fails to > > > discover these new hosts? But Samza also does not seem to fail that > > quickly > > > when it is stuck. We had to manually restart the Samza task for this to > > be > > > fixed. > > > > > > I've put a full stack trace > > > https://gist.github.com/danharvey/3909d731f130a0fe1aad > > > > > > We also have a non-Samza Kafka consumer and that coped fine with the > > > restart. We are using Kafka 0.8.2.1 and Samza 0.9.0. > > > > > > Thanks, > > > Dan > > > > > > > > > > > > On Mon, 4 May 2015 at 14:37 Garry Turkington < > > > g.turking...@improvedigital.com> wrote: > > > > > > > Hi Guozhang, > > > > > > > > Honestly snappy is so hugely more performant for me that I have > rarely > > > > used anything but it. So I've only seen the problem using snappy but > > > since > > > > I use it all the time that may or may not be a valid data point. > > > > > > > > I did find some old logs from what happens client side when I see > this > > > > happen. First a series of communication errors along the lines of > the > > > > following which I think were due to a broker bouncing or timing out: > > > > > > > > WARN Got error produce response with correlation id 331092 on > > > > topic-partition <topic-name>-16, retrying (4 attempts left). Error: > > > > NOT_LEADER_FOR_PARTITION 2015-04-15 13:54:13,890 > > > > (org.apache.kafka.clients.producer.internals.Sender) > > > > > > > > But then the client dies with: > > > > > > > > java: target/snappy-1.1.1/snappy.cc:423: char* > > > > snappy::internal::CompressFragment(const char*, size_t, char*, > > > > snappy::uint16*, int): Assertion `0 == memcmp(base, candidate, > > matched)' > > > > failed. > > > > > > > > I'll try and get some better traces and post over on the kafka list. > > But > > > > it'll be after Strata this week. > > > > > > > > Cheers > > > > Garry > > > > > > > > > > > > -----Original Message----- > > > > From: Guozhang Wang [mailto:wangg...@gmail.com] > > > > Sent: 04 May 2015 00:38 > > > > To: dev@samza.apache.org > > > > Subject: Re: Errors and hung job on broker shutdown > > > > > > > > Garry, > > > > > > > > Just wondering, does this error not exist with Gzip compression? Or > you > > > > could see it with any compression schemes? > > > > > > > > Guozhang > > > > > > > > On Sun, May 3, 2015 at 2:32 AM, Garry Turkington < > > > > g.turking...@improvedigital.com> wrote: > > > > > > > > > Hi, > > > > > > > > > > Just to add another data point, I've been occasionally seeing the > > > > > first error with a non-Samza app using the new Kafka producer with > > > > > Snappy compression. I was going to post to the Kafka list but I > > > > > haven't really narrowed down the situations yet. It sort of looks > > like > > > > > it most often happens to me some minutes after a broker has > restarted > > > > > or had its ZK session time out in periods of very heavy load. But I > > > > > need do more troubleshooting to have something less vague to > report > > > > over there. > > > > > > > > > > Garry > > > > > > > > > > -----Original Message----- > > > > > From: Guozhang Wang [mailto:wangg...@gmail.com] > > > > > Sent: 01 May 2015 23:57 > > > > > To: dev@samza.apache.org > > > > > Subject: Re: Errors and hung job on broker shutdown > > > > > > > > > > Hmm, it seems your snappy compressed data is corrupted and hence > keep > > > > > getting rejected by the broker, hence keeping the producer blocked > on > > > > > close(). Not sure how this happens as I have not seen this error > ever > > > > > before (myself wrote the new Kafka producer's compression module, > and > > > > > have ran it with various kinds of unit / integration test cases, > but > > > > > did not see this coming).. > > > > > > > > > > Guozhang > > > > > > > > > > On Wed, Apr 29, 2015 at 11:37 PM, Roger Hoover > > > > > <roger.hoo...@gmail.com> > > > > > wrote: > > > > > > > > > > > Guozhang and Yan, > > > > > > > > > > > > Thank you both for your responses. I tried a lot of combinations > > > > > > and I think I've determined that it's new producer + snappy that > > > > > > causes the issue. > > > > > > > > > > > > It never happens with the old producer and it never happens with > > lz4 > > > > > > or no compression. It only happens when a broker gets restarted > > (or > > > > > > maybe just shutdown). > > > > > > > > > > > > The error is not always the same. I've noticed at least three > > types > > > > > > of errors on the Kafka brokers. > > > > > > > > > > > > 1) java.io.IOException: failed to read chunk at > > > > > > > > > > > > > > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.j > > > > > > av > > > > > > a:356) > > > > > > http://pastebin.com/NZrrEHxU > > > > > > 2) java.lang.OutOfMemoryError: Java heap space > > > > > > at > > > > > > > > > > > > > > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.j > > > > > > av > > > > > > a:346) > > > > > > http://pastebin.com/yuxk1BjY > > > > > > 3) java.io.IOException: PARSING_ERROR(2) > > > > > > at > > > > > > org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) > > > > > > http://pastebin.com/yq98Hx49 > > > > > > > > > > > > I've noticed a couple different behaviors from the Samza > > > > > > producer/job > > > > > > A) It goes into a long retry loop where this message is logged. > I > > > > > > saw this with error #1 above. > > > > > > > > > > > > 2015-04-29 18:17:31 Sender [WARN] task[Partition 7] > > > > > > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7] > > > > > > offset[9999253] Got error produce response with correlation id > 4878 > > > > > > on topic-partition svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1, > > > > > > retrying (2147483646 attempts left). Error: CORRUPT_MESSAGE > > > > > > > > > > > > B) The job exists with > > > > > > org.apache.kafka.common.errors.UnknownServerException (at least > > when > > > > > > run as ThreadJob). I saw this with error #3 above. > > > > > > > > > > > > org.apache.samza.SamzaException: Unable to send message from > > > > > > TaskName-Partition 6 to system kafka. > > > > > > org.apache.kafka.common.errors.UnknownServerException: The server > > > > > > experienced an unexpected error when processing the request > > > > > > > > > > > > This seems most likely to be a bug in the new Kafka producer. > I'll > > > > > > probably file a JIRA for that project. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Roger > > > > > > > > > > > > On Wed, Apr 29, 2015 at 7:38 PM, Guozhang Wang < > wangg...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > And just to answer your first question: SIGTERM with > > > > > > > controlled.shutdown=true should be OK for bouncing the broker. > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > On Wed, Apr 29, 2015 at 7:36 PM, Guozhang Wang > > > > > > > <wangg...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > Roger, > > > > > > > > > > > > > > > > I believe Samza 0.9.0 already uses the Java producer. > > > > > > > > > > > > > > > > Java producer's close() call will try to flush all buffered > > data > > > > > > > > to the brokers before completing the call. However, if some > > > > > > > > buffered data's destination partition leader is not known, > the > > > > > > > > producer will block on refreshing the metadata and then retry > > > > > sending. > > > > > > > > > > > > > > > > From the broker logs, it seems it does receive the producer > > > > > > > > request but failed to handle it due to "Leader not local" > after > > > > > > > > the > > > > > bounce: > > > > > > > > > > > > > > > > -------- > > > > > > > > [2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request > > with > > > > > > > > correlation id 226 from client > > > > > > > > samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3 on > > > > > > > > partition [sys.samza_metrics,0] failed due to Leader not > local > > > > > > > > for partition [sys.samza_metrics,0] on broker 0 > > > > > > > > (kafka.server.KafkaApis) > > > > > > > > [2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request > > with > > > > > > > > correlation id 45671 from client > > > > > > > > > samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4 > > > > > > > > on partition > > > > > > > > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] > > > > > > failed > > > > > > > > due to Leader not local for partition > > > > > > > > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on > > > > > > > > broker > > > > > > > > 0 > > > > > > > > (kafka.server.KafkaApis) > > > > > > > > [2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request > > with > > > > > > > > correlation id 12267 from client > > > > > > > > samza_producer-svc_call_join_deploy-1-1429911471254-0 on > > > > > > > > partition [sys.samza_metrics,0] failed due to Leader not > local > > > > > > > > for partition [sys.samza_metrics,0] on broker 0 > > > > > > > > (kafka.server.KafkaApis) > > > > > > > > -------- > > > > > > > > > > > > > > > > because for these two topic-partitions (sys.samza_metrics,0 > and > > > > > > > > __samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0), their > > > > > > > > lead has > > > > > > > been > > > > > > > > moved to broker id:1,host:sit320w80m7,port:9092. When the > > > > > > > > producer gets > > > > > > > the > > > > > > > > error code from the old leader, it should refresh its > metadata > > > > > > > > and get > > > > > > > the > > > > > > > > new leader as broker-1, and retry sending, but for some > reason > > > > > > > > it does > > > > > > > not > > > > > > > > refresh its metadata. Without producer logs from Samza > > container > > > > > > > > I > > > > > > cannot > > > > > > > > further investigate the issue. > > > > > > > > > > > > > > > > Which Kafka version does Samza 0.9.0 use? > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang < > > yanfang...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > >> Not sure about the Kafka side. From the Samza side, from > your > > > > > > > >> description ( "does not exit nor does it make any progress" > ), > > > > > > > >> I think the code is stuck > > > > > > in > > > > > > > >> producer.close > > > > > > > >> < > > > > > > > >> > > > > > > > > > > > > > > > https://github.com/apache/samza/blob/master/samza-kafka/src/main/sca > > > > > > la > > > > > > /org/apache/samza/system/kafka/KafkaSystemProducer.scala#L143 > > > > > > > >> >, > > > > > > > >> otherwise, it will throw SamzaException to quit the job. So > > > > > > > >> maybe some Kafka experts in this mailing list or Kafka > mailing > > > > > > > >> list can help > > > > > > > >> > > > > > > > >> Fang, Yan > > > > > > > >> yanfang...@gmail.com > > > > > > > >> > > > > > > > >> On Tue, Apr 28, 2015 at 5:35 PM, Roger Hoover > > > > > > > >> <roger.hoo...@gmail.com > > > > > > > > > > > > > > >> wrote: > > > > > > > >> > > > > > > > >> > At error level logging, this was the only entry in the > Samza > > > > log: > > > > > > > >> > > > > > > > > >> > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] > > > > > > > >> > task[Partition 2] > > > > > > > >> > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2] > > > > > > offset[9129395] > > > > > > > >> > Unable to send message from TaskName-Partition 1 to system > > > > > > > >> > kafka > > > > > > > >> > > > > > > > > >> > Here is the log from the Kafka broker that was shutdown. > > > > > > > >> > > > > > > > > >> > http://pastebin.com/afgmLyNF > > > > > > > >> > > > > > > > > >> > Thanks, > > > > > > > >> > > > > > > > > >> > Roger > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan < > > nickpa...@gmail.com> > > > > > > wrote: > > > > > > > >> > > > > > > > > >> > > Roger, could you paste the full log from Samza > container? > > > > > > > >> > > If you > > > > > > can > > > > > > > >> > figure > > > > > > > >> > > out which Kafka broker the message was sent to, it would > > be > > > > > > helpful > > > > > > > >> if we > > > > > > > >> > > get the log from the broker as well. > > > > > > > >> > > > > > > > > > >> > > On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover < > > > > > > > roger.hoo...@gmail.com > > > > > > > >> > > > > > > > > >> > > wrote: > > > > > > > >> > > > > > > > > > >> > > > Hi, > > > > > > > >> > > > > > > > > > > >> > > > I need some help figuring out what's going on. > > > > > > > >> > > > > > > > > > > >> > > > I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN. > All > > > > > > > >> > > > the > > > > > > topics > > > > > > > >> have > > > > > > > >> > > > replication factor of 2. > > > > > > > >> > > > > > > > > > > >> > > > I'm bouncing the Kafka broker using SIGTERM (with > > > > > > > >> > > > controlled.shutdown.enable=true). I see the Samza job > > > > > > > >> > > > log this > > > > > > > >> message > > > > > > > >> > > and > > > > > > > >> > > > then hang (does not exit nor does it make any > progress). > > > > > > > >> > > > > > > > > > > >> > > > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR] > > > > > > > >> > > > task[Partition > > > > > > 2] > > > > > > > >> > > > ssp[kafka,my-topic,2] offset[9129395] Unable to send > > > > > > > >> > > > message > > > > > > from > > > > > > > >> > > > TaskName-Partition 1 to system kafka > > > > > > > >> > > > > > > > > > > >> > > > The Kafka consumer (Druid Real-Time node) on the other > > > > > > > >> > > > side then > > > > > > > >> barfs > > > > > > > >> > on > > > > > > > >> > > > the message: > > > > > > > >> > > > > > > > > > > >> > > > Exception in thread "chief-svc-perf" > > > > > > > >> > > kafka.message.InvalidMessageException: > > > > > > > >> > > > Message is corrupt (stored crc = 1792882425, computed > > crc > > > > > > > >> > > > = > > > > > > > >> 3898271689) > > > > > > > >> > > > at > kafka.message.Message.ensureValid(Message.scala:166) > > > > > > > >> > > > at > > > > > > > >> > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala > > > > > > > >> :1 > > > > > > > >> 01) > > > > > > > >> > > > at > > > > > > > >> > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala > > > > > > > >> :3 > > > > > > > >> 3) > > > > > > > >> > > > at > > > > > > > >> > > > > > > > > > >> > > > > > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala > > > > > > :6 > > > > > > 6) > > > > > > > >> > > > at > > > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > > > > > > > >> > > > at > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEig > > > > > > ht > > > > > > FirehoseFactory.java:106) > > > > > > > >> > > > at > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeMana > > > > > > ge > > > > > > r.java:234) > > > > > > > >> > > > > > > > > > > >> > > > My questions are: > > > > > > > >> > > > 1) What is the right way to bounce a Kafka broker? > > > > > > > >> > > > 2) Is this a bug in Samza that the job hangs after > > > > > > > >> > > > producer > > > > > > > request > > > > > > > >> > > fails? > > > > > > > >> > > > Has anyone seen this? > > > > > > > >> > > > > > > > > > > >> > > > Thanks, > > > > > > > >> > > > > > > > > > > >> > > > Roger > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang