I added this scenario to KAFKA-955. I'm thinking that this scenario could be a problem for ack=0 in general (even without controlled shutdown). If we do an "uncontrolled" shutdown, it seems that some topics won't ever know there could have been a leader change. Would it make sense to force a meta-data refresh for all topics on a broker, any time an IOException happens on a socket (e.g. "connection reset")? Currently, it looks like only the topic that experiences the failure will have a metadata refresh issued for it.
Maybe this should be a separate jira issue, now that I think about it. Jason On Mon, Jun 24, 2013 at 10:52 PM, Jason Rosenberg <j...@squareup.com> wrote: > Also, looking back at my logs, I'm wondering if a producer will reuse the > same socket to send data to the same broker, for multiple topics (I'm > guessing yes). In which case, it looks like I'm seeing this scenario: > > 1. producer1 is happily sending messages for topicX and topicY to serverA > (serverA is the leader for both topics, only 1 partition for each topic for > simplicity). > 2. serverA is restarted, and in the process, serverB becomes the new > leader for both topicX and topicY. > 3. producer1 decides to send a new message to topicX to serverA. > 3a. this results in an exception ("Connection reset by peer"). > producer1's connection to serverA is invalidated. > 3b. producer1 makes a new metadata request for topicX, and learns that > serverB is now the leader for topicX. > 3c. producer1 resends the message to topicX, on serverB. > 4. producer1 decides to send a new message to topicY to serverA. > 4a. producer1 notes that it's socket to serverA is invalid, so it creates > a new connection to serverA. > 4b. producer1 successfully sends it's message to serverA (without > realizing that serverA is no longer the leader for topicY). > 4c. serverA logs to it's console: > 2013-06-23 08:28:46,770 WARN [kafka-request-handler-2] server.KafkaApis - > [KafkaApi-508818741] Produce request with correlation id 7136261 from > client on partition [mytopic,0] failed due to Leader not local for > partition [mytopic,0] on broker 508818741 > 5. producer1 continues to send messages for topicY to serverA, and serverA > continues to log the same messages. > 6. 10 minutes later, producer1 decides to update it's metadata for topicY, > and learns that serverB is now the leader for topidY. > 7. the warning messages finally stop in the console for serverA. > > I am pretty sure this scenario, or one very close to it, is what I'm > seeing in my logs, after doing a rolling restart, with controlled shutdown. > > Does this scenario make sense? > > One thing I notice, is that in the steady state, every 10 minutes the > producer refreshes it's metadata for all topics. However, when sending a > message to a specific topic fails, only the metadata for that topic is > refreshed, even though the ramifications should be that all topics which > have the same leader might need to be refreshed, especially in response to > a "connection reset by peer". > > Jason > > > > On Mon, Jun 24, 2013 at 10:14 PM, Jason Rosenberg <j...@squareup.com>wrote: > >> Jun, >> >> To be clear, this whole discussion was started, because I am clearly >> seeing "failed due to Leader not local" on the last broker restarted, >> after all the controlled shutting down has completed and all brokers >> restarted. >> >> This leads me to believe that a client made a meta data request and found >> out that server A was the leader for it's partition, and then server A was >> restarted, and then the client makes repeated producer requests to server >> A, without encountering a broken socket. Thus, I'm not sure it's correct >> that the socket is invalidated in that case after a restart. >> >> Alternatively, could it be that the client (which sends messages to >> multiple topics), gets metadata updates for multiple topics, but doesn't >> attempt to send a message to topicX until after the leader has changed and >> server A has been restarted. In this case, if it's the first time the >> producer sends to topicX, does it only then create a new socket? >> >> Jason >> >> >> On Mon, Jun 24, 2013 at 10:00 PM, Jun Rao <jun...@gmail.com> wrote: >> >>> That should be fine since the old socket in the producer will no longer >>> be >>> usable after a broker is restarted. >>> >>> Thanks, >>> >>> Jun >>> >>> >>> On Mon, Jun 24, 2013 at 9:50 PM, Jason Rosenberg <j...@squareup.com> >>> wrote: >>> >>> > What about a non-controlled shutdown, and a restart, but the producer >>> never >>> > attempts to send anything during the time the broker was down? That >>> could >>> > have caused a leader change, but without the producer knowing to >>> refresh >>> > it's metadata, no? >>> > >>> > >>> > On Mon, Jun 24, 2013 at 9:05 PM, Jun Rao <jun...@gmail.com> wrote: >>> > >>> > > Other than controlled shutdown, the only other case that can cause >>> the >>> > > leader to change when the underlying broker is alive is when the >>> broker >>> > > expires its ZK session (likely due to GC), which should be rare. That >>> > being >>> > > said, forwarding in the broker may not be a bad idea. Could you file >>> a >>> > jira >>> > > to track this? >>> > > >>> > > Thanks, >>> > > >>> > > Jun >>> > > >>> > > >>> > > On Mon, Jun 24, 2013 at 2:50 PM, Jason Rosenberg <j...@squareup.com> >>> > wrote: >>> > > >>> > > > Yeah, >>> > > > >>> > > > I see that with ack=0, the producer will be in a bad state anytime >>> the >>> > > > leader for it's partition has changed, while the broker that it >>> thinks >>> > is >>> > > > the leader is still up. So this is a problem in general, not only >>> for >>> > > > controlled shutdown, but even for the case where you've restarted a >>> > > server >>> > > > (without controlled shutdown), which in and of itself can force a >>> > leader >>> > > > change. If the producer doesn't attempt to send a message during >>> the >>> > > time >>> > > > the broker was down, it will never get a connection failure, and >>> never >>> > > get >>> > > > fresh metadata, and subsequently start sending messages to the >>> > > non-leader. >>> > > > >>> > > > Thus, I'd say this is a problem with ack=0, regardless of >>> controlled >>> > > > shutdown. Any time there's a leader change, the producer will send >>> > > > messages into the ether. I think this is actually a severe >>> condition, >>> > > that >>> > > > could be considered a bug. How hard would it be to have the >>> receiving >>> > > > broker forward on to the leader, in this case? >>> > > > >>> > > > Jason >>> > > > >>> > > > >>> > > > On Mon, Jun 24, 2013 at 8:44 AM, Joel Koshy <jjkosh...@gmail.com> >>> > wrote: >>> > > > >>> > > > > I think Jason was suggesting quiescent time as a possibility >>> only if >>> > > the >>> > > > > broker did request forwarding if it is not the leader. >>> > > > > >>> > > > > On Monday, June 24, 2013, Jun Rao wrote: >>> > > > > >>> > > > > > Jason, >>> > > > > > >>> > > > > > The quiescence time that you proposed won't work. The reason is >>> > that >>> > > > with >>> > > > > > ack=0, the producer starts losing data silently from the >>> moment the >>> > > > > leader >>> > > > > > is moved (by controlled shutdown) until the broker is shut >>> down. >>> > So, >>> > > > the >>> > > > > > sooner that you can shut down the broker, the better. What we >>> > > realized >>> > > > is >>> > > > > > that if you can use a larger batch size, ack=1 can still >>> deliver >>> > very >>> > > > > good >>> > > > > > throughput. >>> > > > > > >>> > > > > > Thanks, >>> > > > > > >>> > > > > > Jun >>> > > > > > >>> > > > > > >>> > > > > > On Mon, Jun 24, 2013 at 12:22 AM, Jason Rosenberg < >>> > j...@squareup.com >>> > > > > <javascript:;>> >>> > > > > > wrote: >>> > > > > > >>> > > > > > > Yeah I am using ack = 0, so that makes sense. I'll need to >>> > rethink >>> > > > > that, >>> > > > > > > it would seem. It would be nice, wouldn't it, in this case, >>> for >>> > > the >>> > > > > > broker >>> > > > > > > to realize this and just forward the messages to the correct >>> > > leader. >>> > > > > > Would >>> > > > > > > that be possible? >>> > > > > > > >>> > > > > > > Also, it would be nice to have a second option to the >>> controlled >>> > > > > shutdown >>> > > > > > > (e.g. controlled.shutdown.quiescence.ms), to allow the >>> broker to >>> > > > wait >>> > > > > > > after >>> > > > > > > the controlled shutdown, a prescribed amount of time before >>> > > actually >>> > > > > > > shutting down the server. Then, I could set this value to >>> > > something a >>> > > > > > > little greater than the producer's ' >>> > > > topic.metadata.refresh.interval.ms >>> > > > > '. >>> > > > > > > This would help with hitless rolling restarts too. >>> Currently, >>> > > every >>> > > > > > > producer gets a very loud "Connection Reset" with a tall >>> stack >>> > > trace >>> > > > > each >>> > > > > > > time I restart a broker. Would be nicer to have the >>> producers >>> > > still >>> > > > be >>> > > > > > > able to produce until the metadata refresh interval expires, >>> then >>> > > get >>> > > > > the >>> > > > > > > word that the leader has moved due to the controlled >>> shutdown, >>> > and >>> > > > then >>> > > > > > > start producing to the new leader, all before the shutting >>> down >>> > > > server >>> > > > > > > actually shuts down. Does that seem feasible? >>> > > > > > > >>> > > > > > > Jason >>> > > > > > > >>> > > > > > > >>> > > > > > > On Sun, Jun 23, 2013 at 8:23 PM, Jun Rao <jun...@gmail.com >>> > > > > <javascript:;>> >>> > > > > > wrote: >>> > > > > > > >>> > > > > > > > Jason, >>> > > > > > > > >>> > > > > > > > Are you using ack = 0 in the producer? This mode doesn't >>> work >>> > > well >>> > > > > with >>> > > > > > > > controlled shutdown (this is explained in FAQ i*n >>> > > > > > > > >>> > > > > >>> > https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#)* >>> > > > > > > > * >>> > > > > > > > * >>> > > > > > > > Thanks, >>> > > > > > > > >>> > > > > > > > Jun >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > On Sun, Jun 23, 2013 at 1:45 AM, Jason Rosenberg < >>> > > j...@squareup.com >>> > > > > <javascript:;> >>> > > > > > > >>> > > > > > > wrote: >>> > > > > > > > >>> > > > > > > > > I'm working on trying on having seamless rolling >>> restarts for >>> > > my >>> > > > > > kafka >>> > > > > > > > > servers, running 0.8. I have it so that each server >>> will be >>> > > > > > restarted >>> > > > > > > > > sequentially. Each server takes itself out of the load >>> > > balancer >>> > > > > > (e.g. >>> > > > > > > > sets >>> > > > > > > > > a status that the lb will recognize, and then waits more >>> than >>> > > > long >>> > > > > > > enough >>> > > > > > > > > for the lb to stop sending meta-data requests to that >>> > server). >>> > > > > Then >>> > > > > > I >>> > > > > > > > > initiate the shutdown (with >>> controlled.shutdown.enable=true). >>> > > > This >>> > > > > > > seems >>> > > > > > > > > to work well, however, I occasionally see warnings like >>> this >>> > in >>> > > > the >>> > > > > > log >>> > > > > > > > > from the server, after restart: >>> > > > > > > > > >>> > > > > > > > > 2013-06-23 08:28:46,770 WARN [kafka-request-handler-2] >>> > > > > > > server.KafkaApis >>> > > > > > > > - >>> > > > > > > > > [KafkaApi-508818741] Produce request with correlation id >>> > > 7136261 >>> > > > > from >>> > > > > > > > > client on partition [mytopic,0] failed due to Leader not >>> > local >>> > > > for >>> > > > > > > > > partition [mytopic,0] on broker 508818741 >>> > > > > > > > > >>> > > > > > > > > This WARN seems to persistently repeat, until the >>> producer >>> > > client >>> > > > > > > > initiates >>> > > > > > > > > a new meta-data request (e.g. every 10 minutes, by >>> default). >>> > > > > > However, >>> > > > > > > > the >>> > > > > > > > > producer doesn't log any errors/exceptions when the >>> server is >>> > > > > logging >>> > > > > > > > this >>> > > > > > > > > WARN. >>> > > > > > > > > >>> > > > > > > > > What's happening here? Is the message silently being >>> > forwarded >>> > > > on >>> > > > > to >>> > > > > > > the >>> > > > > > > > > correct leader for the partition? Is the message >>> dropped? >>> > Are >>> > > > > these >>> > > > > > > > WARNS >>> > > > > > > > > particularly useful? >>> > > > > > > > > >>> > > > > > > > > Thanks, >>> > > > > > > > > >>> > > > > > > > > Jason >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> >> >> >