Hi Brian, I think I buy the complexity and extra end-to-end-latency argument :) I'm fine with delaying the asynchronous tech fetching to future works and keep the current KIP's scope as-is for now. Under that case can we consider just a minor implementation detail (since it is not affecting public APIs we probably do not even need to list it, but just thinking loud here):
In your proposal when we request for a topic of unknown metadata, we are going to directly set the topic name as that singleton in the request. I'm wondering for the scenario that KAFKA-8904 described, if the producer#send for thousands of new topics are triggered sequentially by a single thread or concurrent threads? If it's the latter, and we expect in such scenarios we may have multiple topics being requests within a very short time, then we can probably do sth. like this internally in a synchronized manner: 1) put the topic name into a list, as "unknown topics", then 2) exhaust the list, and put all topics from that list to the request; if the list is empty, it means it has been emptied by another thread so we skip sending a new request and just wait for the returned metadata refresh. In most cases the list would just be a singleton with the one that thread has just enqueued, but under extreme scenarios it can help batching a few topic names probably (of course, I'm thinking about very extreme cases here, assuming that's was what we've seen in 8904). Since these two steps are very light-weighted, doing that in a synchronized block would not hurt the concurrency too much. Guozhang On Tue, Jan 21, 2020 at 9:39 AM Brian Byrne <bby...@confluent.io> wrote: > Hi Guozhang, > > Your understanding of the rationale is accurate, and what you suggest is > completely plausible, however I have a slightly different take on the > situation. > > When the KIP was originally drafted, making KafkaProducer#send asynchronous > was one element of the changes (this is a little more general than (a), but > has similar implications). As you're aware, doing so would allow new topics > to aggregate since the producer could continue to push new records, whereas > today the producer thread is blocked waiting for resolution. > > However, there were concerns about changing client behavior unexpectedly in > this manner, and the change isn't as trivial as one would hope. For > example, we'd have to introduce an intermediate queue of records for topics > without metadata, and have that play well with the buffer pool which > ensures the memory limit isn't exceeded. A side effect is that a producer > could hit 'memory full' conditions easier, which could have unintended > consequences if, say, the model was setup such that different producer > threads produced to a disjoint set of topics. Where one producer thread was > blocked waiting for new metadata, it could now push enough data to block > all producer threads due to memory limits, so we'd need to be careful here. > > For case (a) described, another concern would be adding additional a new > source of latency (possibly seconds) for new topics. Not a huge issue, but > it is new behavior to existing clients and adds to the complexity of > verifying no major regressions. > > It also wouldn't resolve all cases we're interested in. One behavior we're > witnessing is the following: a producer generates to a very large number of > topics (several thousand), however the period of consecutive records for a > topic can often be beyond the current hard-coded expiry of 5 minutes. > Therefore, when the producer does submit a request for this topic after 5 > minutes, it has to request all of the topic metadata again. While batching > new topics in the start-up case would definitely help, here it would likely > be lost effort. > > Being able to increase the metadata eviction for the above case would > improve things, but there's no perfect value when consecutive produce times > can be modelled as a probability distribution. Rather, the better solution > could be what we discussed earlier, where we'd lower the eviction timeout, > but make the cost of fetching uncached topic metadata much smaller. > > The changes in the KIP were meant to improve the general case without > affecting external client behavior, and then the plan was to fix the > asynchronous send in the next iteration, if necessary. Point (b) is along > the lines of the latest revision: only send requests for uncached topics, > if they exist, otherwise request the full working set. Piggy-backing was > originally included, but removed because its utility was in doubt. > > So to summarize, you're correct in that asynchronous topic fetching would > be a big improvement, and should be an item of future work. However what > this KIP proposes should be the safest/easiest set of changes to resolve > the current pain points. Please let me know if you agree/disagree with this > assessment. > > Thanks, > Brian > > > On Mon, Jan 20, 2020 at 10:52 AM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Brian, > > > > I looked at the new proposal again, and I'd like to reason about its > > rationale from the listed motivations in your wiki: > > > > 1) more RPCs: we may send metadata requests more frequently than > > appropriate. This is especially the case during producer start-up, where > > the more topics it needs to send to, the more metadata requests it needs > to > > send. This the original reported issue as in KAFKA-8904. > > > > 2) large RPCs: we including all topics in the work set when sending > > metadata request. But I think our conjecture (as Colin has pointed out) > is > > that this alone is fine most of the time, assuming e.g. you are sending > > such large RPC only once every 10 minutes. It is only because of 1) where > > you are sending large RPC too frequently which is a common issue. > > > > 3) we want to have a configurable eviction period than hard-coded > values. I > > consider it as a semi-orthogonal motivation compared with 2) / 3) but we > > wanted to piggy-back this fix along with the KIP. > > > > So from there, 1) and 2) does not contradict to each other since our > belief > > is that large RPCs is usually okay as long as it is not > large-and-frequent > > RPCs, and we actually prefer large-infrequent RPC > smaller-frequent RPC > > > > large-and-frequent RPC (of course). > > > > The current proposal basically tries to un-tangle 2) from 1), i.e. for > the > > scenario of KAFKA-8904 it would result in smaller-frequent RPC during > > startup than large-and-frequent RPC. But I'm wondering why don't we just > do > > even better and make it large-infrequent RPC? More specifically, just > like > > Lucas suggested in the ticket: > > > > a. when there's new topic with unknown metadata enqueued, instead of > > requesting a metadata immediately just delay it for a short period (no > more > > than seconds) hoping that more unknown topics would be requested in the > > period; during this period we would not know which partition it would go > to > > of course, so we buffer it in a different manner. > > > > b. when we are about to send metadata, if there are unknown topic(s) -- > > consider them "urgent topics" -- just send them without other topics; > > otherwise, send the work set in the request. If we want to go even > fancier, > > we can still piggy-back some non-urgent along with urgent ones but it is > > more complicated to reason about the trade-off so a simpler approach is > > fine too. > > > > c. fixing 3) with a new config, which is relatively orthogonal to a) and > > b). > > > > > > > > Guozhang > > > > > > > > > > On Tue, Jan 14, 2020 at 10:39 AM Brian Byrne <bby...@confluent.io> > wrote: > > > > > Hello all, > > > > > > After further offline discussion, I've removed any efforts to control > > > metadata RPC sizes. There are now only two items proposed in this KIP: > > > > > > (1) When encountering a new topic, only issue a metadata request for > that > > > particular topic. For all other cases, continue as it does today with a > > > full working set refresh. > > > > > > (2) Introduces client configuration flag "metadata.eviction.period.ms" > > to > > > control cache eviction duration. I've reset the default back to the > > current > > > (hard-coded) value of 5 minutes since we can identify cases where > > changing > > > it would cause surprises. > > > > > > The votes have been cleared. My apologies for continually interrupting > > and > > > making changes to the KIP, but hopefully this is an agreeable minimum > > > solution to move forward. > > > > > > Thanks, > > > Brian > > > > > > On Mon, Jan 6, 2020 at 5:23 PM Colin McCabe <cmcc...@apache.org> > wrote: > > > > > > > On Mon, Jan 6, 2020, at 14:40, Brian Byrne wrote: > > > > > So the performance of a metadata RPC that occurs every once every > 10 > > > > > seconds should not be measured strictly in CPU cost, but rather the > > > > effect > > > > > on the 95-99%. The larger the request is, the more opportunity > there > > is > > > > to > > > > > put a burst stress on the producer and broker, and the larger the > > > > response > > > > > payload to push through the control plane socket. Maybe that's not > at > > > 5k > > > > > topics, but there are groups that are 10k+ topics and pushing > > further. > > > > > > > > KAFKA-7019 made reading the metadata lock-free. There is no a priori > > > > reason to prefer lots of small requests to a few big requests (within > > > > reason!) In fact, it's quite the opposite: when we make lots of > small > > > > requests, it uses more network bandwidth than when we make a few big > > > ones. > > > > There are a few reasons for this: the request and response headers > > have a > > > > fixed overhead, one big array takes less space when serialized than > > > several > > > > small ones, etc. There is also TCP and IP overhead, etc. > > > > > > > > The broker can only push a few tens of thousands of metadata > requests a > > > > second, due to the overhead of message processing. This is why > almost > > > all > > > > of the admin commands support batching. So if you need to create > 1,000 > > > > topics, you make one request, not 1,000 requests, for example. > > > > > > > > It's definitely reasonable to limit the number of topics made per > > > metadata > > > > request. But the reason for this is not improving performance, but > > > > preventing certain bad corner cases that happen when RPCs get too > big. > > > For > > > > example, one problem that can happen when a metadata response gets > too > > > big > > > > is that the client could time out before it finishes reading the > > > response. > > > > Or if the response got way too big, it could even exceed the maximum > > > > response size. > > > > > > > > So I think the limit should be pretty high here. We might also > > consider > > > > putting the limit in terms of number of partitions rather than number > > of > > > > topics, since that's what really matters here (this is harder to > > > implement, > > > > I realize...) If I had to put a rough number on it, I'd say we don't > > > want > > > > more than like 50 MB of response data. This is vaguely in line with > > how > > > we > > > > do fetch responses as well (although I think the limit there is > > higher). > > > > > > > > We should also keep in mind that anyone with a wildcard subscription > is > > > > making full metadata requests, which will return back information > about > > > > every topic in the system. > > > > > > > > > > > > > > There's definitely weight to the metadata RPCs. Looking at a > previous > > > > > local, non-loaded test I ran, I calculate about 2 microseconds per > > > > > partition latency to the producer. At 10,000 topics with 100 > > partitions > > > > > each, that's a full 2-second bubble in the best case. I can rerun a > > > more > > > > > targeted performance test, but I feel that's missing the point. > > > > > > > > > > > > > If the metadata is fetched in the background, there should be no > impact > > > on > > > > producer latency, right? > > > > > > > > It would be good to talk more about the importance of background > > metadata > > > > fetching in the KIP. The fact that we don't do this is actually a > big > > > > problem with the current implementation. As I understand it, when > the > > > > metadata gets too old, we slam on the brakes and wait for a metadata > > > fetch > > > > to complete, rather than starting the metadata fetch BEFORE we need > it. > > > > It's just bad scheduling. > > > > > > > > best, > > > > Colin > > > > > > > > > > > > > > Brian > > > > > > > > > > On Mon, Jan 6, 2020 at 1:31 PM Colin McCabe <cmcc...@apache.org> > > > wrote: > > > > > > > > > > > On Mon, Jan 6, 2020, at 13:07, Brian Byrne wrote: > > > > > > > Hi Colin, > > > > > > > > > > > > > > Thanks again for the feedback! > > > > > > > > > > > > > > On Mon, Jan 6, 2020 at 12:07 PM Colin McCabe < > cmcc...@apache.org > > > > > > > wrote: > > > > > > > > > > > > > > > Metadata requests don't (always) go to the controller, right? > > We > > > > > > should > > > > > > > > fix the wording in this section. > > > > > > > > > > > > > > > > > > > > > > You're correct, s/controller/broker(s)/. > > > > > > > > > > > > > > I feel like "Proposed Changes" should come before "Public > > > Interfaces" > > > > > > > > here. The new configuration won't make sense to the reader > > until > > > > he > > > > > > or she > > > > > > > > has read the "changes" section. Also, it's not clear from > the > > > name > > > > > > that > > > > > > > > "metadata evict" refers to a span of time. What do you think > > > > about " > > > > > > > > metadata.eviction.period.ms" as a configuration name? > > > > > > > > > > > > > > > > > > > > > > Sure, makes sense. Updated order and config name. > > > > > > > > > > > > > > > > > > > > > > Where is the "10 seconds" coming from here? The current > > default > > > > for > > > > > > > > metadata.max.age.ms is 5 * 60 * 1000 ms, which implies that > we > > > > want to > > > > > > > > refresh every 5 minutes. Definitely not every 10 seconds. > > > > > > > > > > > > > > > > > > > > > > The 10 seconds is another arbitrary value to prevent a large > > number > > > > of > > > > > > RPCs > > > > > > > if the target batch size were fixed at 20. For example, if > > there's > > > > 5,000 > > > > > > > topics with a 5-minute interval, then instead of issuing an RPC > > > every > > > > > > > 1.2 seconds with batch size of 20, it would issue an RPC every > 10 > > > > seconds > > > > > > > with batch size of 167. > > > > > > > > > > > > > > > > > > > Hmm. This will lead to many more RPCs compared to the current > > > > situation > > > > > > of issuing an RPC every 5 minutes with 5,000 topics, right? See > > > below > > > > for > > > > > > more discussion. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Stepping back a little bit, it seems like the big problem you > > > > > > identified > > > > > > > > is the O(N^2) behavior of producing to X, then Y, then Z, > etc. > > > etc. > > > > > > where > > > > > > > > each new produce to a fresh topic triggers a metadata request > > > with > > > > all > > > > > > the > > > > > > > > preceding topics included. > > > > > > > > > > > > > > > > Of course we need to send out a metadata request before > > producing > > > > to X, > > > > > > > > then Y, then Z, but that request could just specify X, or > just > > > > specify > > > > > > Y, > > > > > > > > etc. etc. In other words, we could decouple decouple the > > routine > > > > > > metadata > > > > > > > > fetch which happens on a 5 minute timer from the need to > fetch > > > > > > metadata for > > > > > > > > something specific right now. > > > > > > > > > > > > > > > > I guess my question is, do we really need to allow routine > > > metadata > > > > > > > > fetches to "piggyback" on the emergency metadata fetches? It > > > adds > > > > a > > > > > > lot of > > > > > > > > complexity, and we don't have any benchmarks proving that > it's > > > > better. > > > > > > > > Also, as I understand it, whether we piggyback or not, the > > number > > > > of > > > > > > > > metadata fetches is the same, right? > > > > > > > > > > > > > > > > > > > > > > So it's possible to do as you suggest, but I would argue that > > it'd > > > be > > > > > > more > > > > > > > complex with how the code is structured and wouldn't add any > > extra > > > > > > > complexity. The derived metadata class effectively respond to a > > > > > > > notification that a metadata RPC is going to be issued, where > > they > > > > return > > > > > > > the metadata request structure with topics to refresh, which is > > > > decoupled > > > > > > > from what generated the event (new topic, stale metadata, > > periodic > > > > > > refresh > > > > > > > alarm). There is also a strict implementation detail that only > > one > > > > > > metadata > > > > > > > request can be outstanding at any time, which lends itself to > > > > consolidate > > > > > > > complexity in the base metadata class and have the derived use > > the > > > > > > "listen > > > > > > > for next update" model. > > > > > > > > > > > > > > By maintaining an ordered list of topics by their last metadata > > > > refresh > > > > > > > time (0 for new topics), it's a matter of pulling from the > front > > of > > > > the > > > > > > > list to see which topics should be included in the next update. > > > > Always > > > > > > > include all urgent topics, then include non-urgent (i.e. need > to > > be > > > > > > > refreshed soon-ish) up to the target batch size. > > > > > > > > > > > > > > The number of metadata fetches could be reduced. Assuming a > > target > > > > batch > > > > > > > size of 20, a new topic might also refresh 19 other "refresh > > soon" > > > > topics > > > > > > > in the same RPC, as opposed to those 19 being handled in a > > > subsequent > > > > > > RPC. > > > > > > > > > > > > > > Although to counter the above, the batching/piggybacking logic > > > isn't > > > > > > > necessarily about reducing the total number of RPCs, but rather > > to > > > > > > > distribute the load more evenly over time. For example, if a > > large > > > > number > > > > > > > of topics need to be refreshed at the approximate same time > > (common > > > > for > > > > > > > startups cases that hit a large number of topics), the updates > > are > > > > more > > > > > > > evenly distributed to avoid a flood. > > > > > > > > > > > > It wouldn't be a flood in the current case, right? It would just > > be > > > a > > > > > > single metadata request for a lot of topics. > > > > > > > > > > > > Let's compare the two cases. In the current scenario, we have 1 > > > > metadata > > > > > > request every 5 minutes. This request is for 5,000 topics (let's > > > > say). In > > > > > > the new scenario, we have a request every 10 seconds for 167 > topics > > > > each. > > > > > > > > > > > > Which do you think will be more expensive? I think the second > > > scenario > > > > > > certainly will because of the overhead of 30x as many requests > send > > > > over > > > > > > the wire. Metadata accesses are now lockless, so the big > metadata > > > > request > > > > > > just isn't that much of a problem. I bet if you benchmark it, > > > sending > > > > back > > > > > > metadata for 167 topics won't be that much cheaper than sending > > back > > > > > > metadata for 5k. Certainly not 30x cheaper. There will > eventually > > > be > > > > a > > > > > > point where we need to split metadata requests, but it's > definitely > > > > not at > > > > > > 5,000 topics. > > > > > > > > > > > > regards, > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > Brian > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Jan 6, 2020, at 10:26, Lucas Bradstreet wrote: > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > > > > > On Thu, Jan 2, 2020 at 11:15 AM Brian Byrne < > > > bby...@confluent.io > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hello all, > > > > > > > > > > > > > > > > > > > > After further discussion and improvements, I'd like to > > > > reinstate > > > > > > the > > > > > > > > voting > > > > > > > > > > process. > > > > > > > > > > > > > > > > > > > > The updated KIP: > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-526 > > > > > > > > > > > > > %3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The continued discussion: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E > > > > > > > > > > > > > > > > > > > > I'd be happy to address any further comments/feedback. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Brian > > > > > > > > > > > > > > > > > > > > On Mon, Dec 9, 2019 at 11:02 PM Guozhang Wang < > > > > wangg...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > With the concluded summary on the other discussion > > thread, > > > > I'm > > > > > > +1 on > > > > > > > > the > > > > > > > > > > > proposal. > > > > > > > > > > > > > > > > > > > > > > Thanks Brian! > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 19, 2019 at 8:00 PM deng ziming < > > > > > > > > dengziming1...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For new (uncached) topics, one problem here is that > > we > > > > don't > > > > > > know > > > > > > > > > > which > > > > > > > > > > > > > partition to map a record to in the event that it > > has a > > > > key > > > > > > or > > > > > > > > custom > > > > > > > > > > > > > partitioner, so the RecordAccumulator wouldn't know > > > which > > > > > > > > > > batch/broker > > > > > > > > > > > it > > > > > > > > > > > > > belongs. We'd need an intermediate record queue > that > > > > > > subsequently > > > > > > > > > > moved > > > > > > > > > > > > the > > > > > > > > > > > > > records into RecordAccumulators once metadata > > > resolution > > > > was > > > > > > > > > > complete. > > > > > > > > > > > > For > > > > > > > > > > > > > known topics, we don't currently block at all in > > > > > > waitOnMetadata. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > You are right, I forget this fact, and the > intermediate > > > > record > > > > > > > > queue > > > > > > > > > > will > > > > > > > > > > > > help, but I have some questions > > > > > > > > > > > > > > > > > > > > > > > > if we add an intermediate record queue in > > KafkaProducer, > > > > when > > > > > > > > should we > > > > > > > > > > > > move the records into RecordAccumulators? > > > > > > > > > > > > only NetworkClient is aware of the MetadataResponse, > > here > > > > is > > > > > > the > > > > > > > > > > > > hierarchical structure of the related classes: > > > > > > > > > > > > KafkaProducer > > > > > > > > > > > > Accumulator > > > > > > > > > > > > Sender > > > > > > > > > > > > NetworkClient > > > > > > > > > > > > > > > metadataUpdater.handleCompletedMetadataResponse > > > > > > > > > > > > > > > > > > > > > > > > so > > > > > > > > > > > > 1. we should also add a metadataUpdater to > > KafkaProducer? > > > > > > > > > > > > 2. if the topic really does not exists? the > > intermediate > > > > record > > > > > > > > queue > > > > > > > > > > > will > > > > > > > > > > > > become too large? > > > > > > > > > > > > 3. and should we `block` when the intermediate record > > > > queue is > > > > > > too > > > > > > > > > > large? > > > > > > > > > > > > and this will again bring the blocking problem? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Nov 20, 2019 at 12:40 AM Brian Byrne < > > > > > > bby...@confluent.io> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Deng, > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the feedback. > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 18, 2019 at 6:56 PM deng ziming < > > > > > > > > > > dengziming1...@gmail.com> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > hi, I reviewed the current code, the > > ProduceMetadata > > > > > > maintains > > > > > > > > an > > > > > > > > > > > > expiry > > > > > > > > > > > > > > threshold for every topic, every time when we > write > > > to > > > > a > > > > > > topic > > > > > > > > we > > > > > > > > > > > will > > > > > > > > > > > > > set > > > > > > > > > > > > > > the expiry time to -1 to indicate it should be > > > updated, > > > > > > this > > > > > > > > does > > > > > > > > > > > work > > > > > > > > > > > > to > > > > > > > > > > > > > > reduce the size of the topic working set, but the > > > > producer > > > > > > will > > > > > > > > > > > > continue > > > > > > > > > > > > > > fetching metadata for these topics in every > > metadata > > > > > > request > > > > > > > > for > > > > > > > > > > the > > > > > > > > > > > > full > > > > > > > > > > > > > > expiry duration. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Indeed, you are correct, I terribly misread the > code > > > > here. > > > > > > > > > > Fortunately > > > > > > > > > > > > this > > > > > > > > > > > > > was only a minor optimization in the KIP that's no > > > longer > > > > > > > > necessary. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > and we can improve the situation by 2 means: > > > > > > > > > > > > > > 1. we maintain a refresh threshold for every > > > topic > > > > > > which > > > > > > > > is for > > > > > > > > > > > > > example > > > > > > > > > > > > > > 0.8 * expiry_threshold, and when we send > > > > `MetadataRequest` > > > > > > to > > > > > > > > > > brokers > > > > > > > > > > > > we > > > > > > > > > > > > > > just request unknownLeaderTopics + > > > > unknownPartitionTopics + > > > > > > > > topics > > > > > > > > > > > > > > reach refresh threshold. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Right, this is similar to what I suggested, with a > > > larger > > > > > > window > > > > > > > > on > > > > > > > > > > the > > > > > > > > > > > > > "staleness" that permits for batching to an > > appropriate > > > > size > > > > > > > > (except > > > > > > > > > > if > > > > > > > > > > > > > there's any unknown topics, you'd want to issue the > > > > request > > > > > > > > > > > immediately). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. we don't invoke > KafkaProducer#waitOnMetadata > > > > when we > > > > > > > > call > > > > > > > > > > > > > > KafkaProducer#send because of we just send data > to > > > > > > > > > > RecordAccumulator, > > > > > > > > > > > > and > > > > > > > > > > > > > > before we send data to brokers we will invoke > > > > > > > > > > > > RecordAccumulator#ready(), > > > > > > > > > > > > > so > > > > > > > > > > > > > > we can only invoke waitOnMetadata to block when > > > (number > > > > > > topics > > > > > > > > > > > > > > reach refresh threshold)>(number of all known > > > > topics)*0.2. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For new (uncached) topics, one problem here is that > > we > > > > don't > > > > > > know > > > > > > > > > > which > > > > > > > > > > > > > partition to map a record to in the event that it > > has a > > > > key > > > > > > or > > > > > > > > custom > > > > > > > > > > > > > partitioner, so the RecordAccumulator wouldn't know > > > which > > > > > > > > > > batch/broker > > > > > > > > > > > it > > > > > > > > > > > > > belongs. We'd need an intermediate record queue > that > > > > > > subsequently > > > > > > > > > > moved > > > > > > > > > > > > the > > > > > > > > > > > > > records into RecordAccumulators once metadata > > > resolution > > > > was > > > > > > > > > > complete. > > > > > > > > > > > > For > > > > > > > > > > > > > known topics, we don't currently block at all in > > > > > > waitOnMetadata. > > > > > > > > > > > > > > > > > > > > > > > > > > The last major point of minimizing producer startup > > > > metadata > > > > > > > > RPCs may > > > > > > > > > > > > still > > > > > > > > > > > > > need to be improved, but this would be a large > > > > improvement > > > > > > on the > > > > > > > > > > > current > > > > > > > > > > > > > situation. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Brian > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think the above 2 ways are enough to solve the > > > > current > > > > > > > > problem. > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 19, 2019 at 3:20 AM Colin McCabe < > > > > > > > > cmcc...@apache.org> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 18, 2019, at 10:05, Brian Byrne > > wrote: > > > > > > > > > > > > > > > > On Fri, Nov 15, 2019 at 5:08 PM Colin McCabe > < > > > > > > > > > > cmcc...@apache.org > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Two seconds doesn't seem like a reasonable > > > > amount of > > > > > > > > time to > > > > > > > > > > > > leave > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > metadata fetch. Fetching halfway through > the > > > > > > expiration > > > > > > > > > > period > > > > > > > > > > > > > seems > > > > > > > > > > > > > > > more > > > > > > > > > > > > > > > > > reasonable. It also doesn't require us to > > > > create a > > > > > > new > > > > > > > > > > > > > configuration > > > > > > > > > > > > > > > key, > > > > > > > > > > > > > > > > > which is nice. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Another option is to just do the metadata > > fetch > > > > every > > > > > > > > > > > > > > > metadata.max.age.ms, > > > > > > > > > > > > > > > > > but not expire the topic until we can't > fetch > > > the > > > > > > > > metadata > > > > > > > > > > for > > > > > > > > > > > 2 > > > > > > > > > > > > * > > > > > > > > > > > > > > > > > metadata.max.age.ms. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd expect two seconds to be reasonable in > the > > > > common > > > > > > case. > > > > > > > > > > Keep > > > > > > > > > > > in > > > > > > > > > > > > > > mind > > > > > > > > > > > > > > > > that this doesn't affect correctness, and a > > > control > > > > > > > > operation > > > > > > > > > > > > > returning > > > > > > > > > > > > > > > > cached metadata should be on the order of > > > > milliseconds. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Brian, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks again for the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think the issue here is not the common case, > > but > > > > the > > > > > > > > uncommon > > > > > > > > > > > case > > > > > > > > > > > > > > where > > > > > > > > > > > > > > > the metadata fetch takes longer than expected. > > In > > > > that > > > > > > > > case, we > > > > > > > > > > > > don't > > > > > > > > > > > > > > want > > > > > > > > > > > > > > > to be in the position of having our metadata > > expire > > > > > > because > > > > > > > > we > > > > > > > > > > > waited > > > > > > > > > > > > > too > > > > > > > > > > > > > > > long to renew it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This is one reason why I think that the > metadata > > > > > > expiration > > > > > > > > time > > > > > > > > > > > > should > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > longer than the metadata refresh time. In > fact, > > it > > > > > > might be > > > > > > > > > > worth > > > > > > > > > > > > > having > > > > > > > > > > > > > > > two separate configuration keys for these two > > > > values. I > > > > > > > > could > > > > > > > > > > > > imagine > > > > > > > > > > > > > a > > > > > > > > > > > > > > > user who is having trouble with metadata > > expiration > > > > > > wanting > > > > > > > > to > > > > > > > > > > > > increase > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > metadata expiration time, but without > increasing > > > the > > > > > > metadata > > > > > > > > > > > refresh > > > > > > > > > > > > > > > period. In a sense, the metadata expiration > time > > > is > > > > like > > > > > > > > the ZK > > > > > > > > > > > > > session > > > > > > > > > > > > > > > expiration time. You might want to turn it up > if > > > the > > > > > > > > cluster is > > > > > > > > > > > > > > > experiencing load spikes. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > But to the general > > > > > > > > > > > > > > > > point, defining the algorithm would mean > > > enforcing > > > > it > > > > > > to > > > > > > > > fair > > > > > > > > > > > > > accuracy, > > > > > > > > > > > > > > > > whereas if the suggestion is that it'll be > > > > performed > > > > > > at a > > > > > > > > > > > > reasonable > > > > > > > > > > > > > > > time, > > > > > > > > > > > > > > > > it allows for batching and other > optimizations. > > > > > > Perhaps I > > > > > > > > > > > shouldn't > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > regarding what's defined in a KIP to be > > > > contractual in > > > > > > > > these > > > > > > > > > > > cases, > > > > > > > > > > > > > but > > > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > could consider a first implementation to > > collect > > > > topics > > > > > > > > whose > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > has > > > > > > > > > > > > > > > > exceeded (metadata.max.age.ms / 2), and > > sending > > > > the > > > > > > batch > > > > > > > > > > once a > > > > > > > > > > > > > > > > constituent topic's metadata is near the > > expiry, > > > > or a > > > > > > > > > > sufficient > > > > > > > > > > > > > number > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > topics have been collected (10? 100? 1000?). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm concerned that if we change the metadata > > > caching > > > > > > strategy > > > > > > > > > > > without > > > > > > > > > > > > > > > discussing it first, it may improve certain > > > > workloads but > > > > > > > > make > > > > > > > > > > > others > > > > > > > > > > > > > > > worse. We need to be concrete about what the > > > > proposed > > > > > > > > strategy > > > > > > > > > > is > > > > > > > > > > > so > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > we can really evaluate it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We should be specific about what happens if > > the > > > > > > first few > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > fetches > > > > > > > > > > > > > > > > > fail. Do we use exponential backoff to > > decide > > > > when > > > > > > to > > > > > > > > > > resend? > > > > > > > > > > > > It > > > > > > > > > > > > > > > seems > > > > > > > > > > > > > > > > > like we really should, for all the usual > > > reasons > > > > > > (reduce > > > > > > > > the > > > > > > > > > > > load > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > > brokers, ride out temporary service > > > disruptions, > > > > > > etc.) > > > > > > > > Maybe > > > > > > > > > > > we > > > > > > > > > > > > > > could > > > > > > > > > > > > > > > have > > > > > > > > > > > > > > > > > an exponential retry backoff for each > broker > > > (in > > > > > > other > > > > > > > > words, > > > > > > > > > > > we > > > > > > > > > > > > > > > should try > > > > > > > > > > > > > > > > > to contact a different broker before > applying > > > the > > > > > > > > backoff.) > > > > > > > > > > I > > > > > > > > > > > > > think > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > already sort of happens with the disconnect > > > > timeout, > > > > > > but > > > > > > > > we > > > > > > > > > > > might > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > > more general solution. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I don't plan to change this behavior. > Currently > > > it > > > > > > retries > > > > > > > > > > after > > > > > > > > > > > a > > > > > > > > > > > > > > fixed > > > > > > > > > > > > > > > > value of 'retry.backoff.ms' (defaults to 100 > > > ms). > > > > It's > > > > > > > > > > possible > > > > > > > > > > > > that > > > > > > > > > > > > > > > > different brokers are attempted, but I > haven't > > > dug > > > > > > into it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think it's critical to understand what the > > > current > > > > > > > > behavior is > > > > > > > > > > > > before > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > try to change it. The difference between > > retrying > > > > the > > > > > > same > > > > > > > > > > broker > > > > > > > > > > > > and > > > > > > > > > > > > > > > trying a different one has a large impact it > has > > on > > > > > > cluster > > > > > > > > load > > > > > > > > > > > and > > > > > > > > > > > > > > > latency. For what it's worth, I believe the > > > > behavior is > > > > > > the > > > > > > > > > > second > > > > > > > > > > > > > one, > > > > > > > > > > > > > > > but it has been a while since I checked. Let's > > > > figure > > > > > > this > > > > > > > > out. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the clarification. Fully > > > > asynchronous is > > > > > > the > > > > > > > > way > > > > > > > > > > to > > > > > > > > > > > > > go, I > > > > > > > > > > > > > > > > > agree. I'm having trouble understanding > how > > > > > > timeouts are > > > > > > > > > > > handled > > > > > > > > > > > > > in > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > KIP. It seems like if we can't fetch the > > > > metadata > > > > > > > > within the > > > > > > > > > > > > > > > designated > > > > > > > > > > > > > > > > > metadata timeout, the future / callback > > should > > > > > > receive a > > > > > > > > > > > > > > > TimeoutException > > > > > > > > > > > > > > > > > right? We do not want the send call to be > > > > deferred > > > > > > > > forever > > > > > > > > > > if > > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > > > can't be fetched. Eventually it should > fail > > if > > > > it > > > > > > can't > > > > > > > > be > > > > > > > > > > > > > > performed. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I do think this is something that will have > > to > > > be > > > > > > > > mentioned > > > > > > > > > > in > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > compatibility section. There is some code > > out > > > > there > > > > > > > > that is > > > > > > > > > > > > > probably > > > > > > > > > > > > > > > > > prepared to handle a timeout exception from > > the > > > > send > > > > > > > > > > function, > > > > > > > > > > > > > which > > > > > > > > > > > > > > > may > > > > > > > > > > > > > > > > > need to be updated to check for a timeout > > from > > > > the > > > > > > > > future or > > > > > > > > > > > > > > callback. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Correct, a timeout exception would be > delivered > > > in > > > > the > > > > > > > > future. > > > > > > > > > > > > Sure, > > > > > > > > > > > > > I > > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > add that note to the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It seems like this is an existing problem. > > You > > > > may > > > > > > fire > > > > > > > > off > > > > > > > > > > a > > > > > > > > > > > > lot > > > > > > > > > > > > > of > > > > > > > > > > > > > > > send > > > > > > > > > > > > > > > > > calls that get blocked because the broker > > that > > > > is the > > > > > > > > leader > > > > > > > > > > > for > > > > > > > > > > > > a > > > > > > > > > > > > > > > certain > > > > > > > > > > > > > > > > > partition is not responding. I'm not sure > > that > > > > we > > > > > > need > > > > > > > > to do > > > > > > > > > > > > > > anything > > > > > > > > > > > > > > > > > special here. On the other hand, we could > > make > > > > the > > > > > > case > > > > > > > > for > > > > > > > > > > a > > > > > > > > > > > > > > generic > > > > > > > > > > > > > > > "max > > > > > > > > > > > > > > > > > number of outstanding sends" configuration > to > > > > prevent > > > > > > > > > > surprise > > > > > > > > > > > > OOMs > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > existing cases, plus the new one we're > > adding. > > > > But > > > > > > this > > > > > > > > > > feels > > > > > > > > > > > > > like a > > > > > > > > > > > > > > > bit > > > > > > > > > > > > > > > > > of a scope expansion. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Right, this is an existing problem, however > the > > > > > > > > asynchronous > > > > > > > > > > send > > > > > > > > > > > > > could > > > > > > > > > > > > > > > > cause unexpected behavior. For example, if a > > > client > > > > > > pinned > > > > > > > > > > > > > > > > topics/partitions to individual send threads, > > > then > > > > > > memory > > > > > > > > > > > couldn't > > > > > > > > > > > > be > > > > > > > > > > > > > > > > exhausted by a single topic since a blocking > > send > > > > would > > > > > > > > prevent > > > > > > > > > > > > > further > > > > > > > > > > > > > > > > records from being buffered on that topic. > The > > > > > > compromise > > > > > > > > could > > > > > > > > > > > be > > > > > > > > > > > > > that > > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > only ever permit one outstanding record batch > > > for a > > > > > > topic, > > > > > > > > > > which > > > > > > > > > > > > > would > > > > > > > > > > > > > > > keep > > > > > > > > > > > > > > > > the code simple and wouldn't permit a single > > > topic > > > > to > > > > > > > > consume > > > > > > > > > > all > > > > > > > > > > > > > > > available > > > > > > > > > > > > > > > > memory. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > They may be connected, but I'm not sure > they > > > > should > > > > > > be > > > > > > > > the > > > > > > > > > > > same. > > > > > > > > > > > > > > > Perhaps > > > > > > > > > > > > > > > > > expiry should be 4x the typical fetch rate, > > for > > > > > > example. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > That's true. You could also make the case > for a > > > > faster > > > > > > > > expiry > > > > > > > > > > > than > > > > > > > > > > > > > > > refresh > > > > > > > > > > > > > > > > as well. Makes sense to separate this out. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hmm.... are you sure this is an N^2 > problem? > > > If > > > > you > > > > > > > > have T1 > > > > > > > > > > > and > > > > > > > > > > > > > T2, > > > > > > > > > > > > > > > you > > > > > > > > > > > > > > > > > fetch metadata for T1 and T2, right? I > guess > > > you > > > > > > could > > > > > > > > argue > > > > > > > > > > > > that > > > > > > > > > > > > > we > > > > > > > > > > > > > > > often > > > > > > > > > > > > > > > > > fetch metadata for partitions we don't care > > > > about, > > > > > > but > > > > > > > > that > > > > > > > > > > > > doesn't > > > > > > > > > > > > > > > make it > > > > > > > > > > > > > > > > > O(N^2). Maybe there's something about the > > > > > > implementation > > > > > > > > > > that > > > > > > > > > > > > I'm > > > > > > > > > > > > > > > missing. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > My apologies, I left out the context. One > issue > > > the > > > > > > KIP is > > > > > > > > > > trying > > > > > > > > > > > > to > > > > > > > > > > > > > > > > resolve is the metadata storm that's caused > by > > > > > > producers > > > > > > > > > > starting > > > > > > > > > > > > up. > > > > > > > > > > > > > > In > > > > > > > > > > > > > > > > the worst case, where the send call is only > > > > performed > > > > > > from > > > > > > > > a > > > > > > > > > > > single > > > > > > > > > > > > > > > thread > > > > > > > > > > > > > > > > (i.e. no possible batching), fetching > metadata > > > for > > > > 1K > > > > > > > > topics > > > > > > > > > > will > > > > > > > > > > > > > > > generate > > > > > > > > > > > > > > > > 1K RPCs, with payload 1+2+...+1K topics' > > > metadata. > > > > > > Being > > > > > > > > smart > > > > > > > > > > > > about > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > topics being refreshed would still generate > 1K > > > RPCs > > > > > > for 1 > > > > > > > > topic > > > > > > > > > > > > each, > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > asynchronous behavior would permit batching > > (note > > > > > > > > steady-state > > > > > > > > > > > > > > refreshing > > > > > > > > > > > > > > > > doesn't require the asynchronous behavior to > > > > batch). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In general, we need to take advantage of > > > > batching to > > > > > > do > > > > > > > > this > > > > > > > > > > > well > > > > > > > > > > > > > > (one > > > > > > > > > > > > > > > > > reason why I think we should steer clear of > > > > > > > > ultra-granular > > > > > > > > > > > > timeout > > > > > > > > > > > > > > > > > tracking). It's best to do 1 RPC asking > for > > 10 > > > > > > topics, > > > > > > > > not > > > > > > > > > > 10 > > > > > > > > > > > > RPCs > > > > > > > > > > > > > > > asking > > > > > > > > > > > > > > > > > for a single topic each, even if that means > > > some > > > > of > > > > > > the > > > > > > > > topic > > > > > > > > > > > > > > timeouts > > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > not *exactly* aligned with the configured > > > value. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Absolutely, agreed. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Brian > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Brian > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 11, 2019 at 11:47 AM Colin > > > McCabe < > > > > > > > > > > > > > cmcc...@apache.org> > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Brian, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Starting the metadata fetch before we > > need > > > > the > > > > > > > > result is > > > > > > > > > > > > > > > definitely a > > > > > > > > > > > > > > > > > > > great idea. This way, the metadata > fetch > > > > can be > > > > > > > > done in > > > > > > > > > > > > > parallel > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > > all > > > > > > > > > > > > > > > > > > > the other stuff e producer is doing, > > rather > > > > than > > > > > > > > forcing > > > > > > > > > > > the > > > > > > > > > > > > > > > producer > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > periodically come to a halt > periodically > > > > while > > > > > > > > metadata > > > > > > > > > > is > > > > > > > > > > > > > > fetched. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Maybe I missed it, but there seemed to > be > > > > some > > > > > > > > details > > > > > > > > > > > > missing > > > > > > > > > > > > > > > here. > > > > > > > > > > > > > > > > > When > > > > > > > > > > > > > > > > > > > do we start the metadata fetch? For > > > > example, if > > > > > > > > topic > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > expires > > > > > > > > > > > > > > > > > > > every 5 minutes, perhaps we should > wait 4 > > > > > > minutes, > > > > > > > > then > > > > > > > > > > > > > starting > > > > > > > > > > > > > > > > > fetching > > > > > > > > > > > > > > > > > > > the new metadata, which we would expect > > to > > > > > > arrive by > > > > > > > > the > > > > > > > > > > 5 > > > > > > > > > > > > > minute > > > > > > > > > > > > > > > > > > > deadline. Or perhaps we should start > the > > > > fetch > > > > > > even > > > > > > > > > > > earlier, > > > > > > > > > > > > > > > around > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > 2.5 minute mark. In any case, there > > should > > > > be > > > > > > some > > > > > > > > > > > > discussion > > > > > > > > > > > > > > > about > > > > > > > > > > > > > > > > > what > > > > > > > > > > > > > > > > > > > the actual policy is. Given that > > > > > > > > metadata.max.age.ms is > > > > > > > > > > > > > > > configurable, > > > > > > > > > > > > > > > > > > > maybe that policy needs to be > expressed > > in > > > > > > terms of > > > > > > > > a > > > > > > > > > > > > > percentage > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > refresh period rather than in terms of > an > > > > > > absolute > > > > > > > > delay. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The KIP correctly points out that the > > > current > > > > > > > > metadata > > > > > > > > > > > > fetching > > > > > > > > > > > > > > > policy > > > > > > > > > > > > > > > > > > > causes us to "[block] in a function > > that's > > > > > > > > advertised as > > > > > > > > > > > > > > > asynchronous." > > > > > > > > > > > > > > > > > > > However, the KIP doesn't seem to spell > > out > > > > > > whether we > > > > > > > > > > will > > > > > > > > > > > > > > > continue to > > > > > > > > > > > > > > > > > > > block if metadata can't be found, or if > > > this > > > > > > will be > > > > > > > > > > > > abolished. > > > > > > > > > > > > > > > > > Clearly, > > > > > > > > > > > > > > > > > > > starting the metadata fetch early will > > > reduce > > > > > > > > blocking in > > > > > > > > > > > the > > > > > > > > > > > > > > > common > > > > > > > > > > > > > > > > > case, > > > > > > > > > > > > > > > > > > > but will there still be blocking in the > > > > uncommon > > > > > > case > > > > > > > > > > where > > > > > > > > > > > > the > > > > > > > > > > > > > > > early > > > > > > > > > > > > > > > > > fetch > > > > > > > > > > > > > > > > > > > doesn't succeed in time? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > To address (2), the producer > currently > > > > > > maintains > > > > > > > > an > > > > > > > > > > > expiry > > > > > > > > > > > > > > > threshold > > > > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > every topic, which is used to > remove a > > > > topic > > > > > > from > > > > > > > > the > > > > > > > > > > > > > working > > > > > > > > > > > > > > > set > > > > > > > > > > > > > > > > > at a > > > > > > > > > > > > > > > > > > > > future time (currently hard-coded > to 5 > > > > > > minutes, > > > > > > > > this > > > > > > > > > > > > should > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > modified > > > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > use metadata.max.age.ms). While > this > > > does > > > > > > work to > > > > > > > > > > > reduce > > > > > > > > > > > > > the > > > > > > > > > > > > > > > size > > > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > topic working set, the producer will > > > > continue > > > > > > > > fetching > > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > these > > > > > > > > > > > > > > > > > > > > topics in every metadata request for > > the > > > > full > > > > > > > > expiry > > > > > > > > > > > > > duration. > > > > > > > > > > > > > > > This > > > > > > > > > > > > > > > > > > > logic > > > > > > > > > > > > > > > > > > > > can be made more intelligent by > > managing > > > > the > > > > > > > > expiry > > > > > > > > > > from > > > > > > > > > > > > > when > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > topic > > > > > > > > > > > > > > > > > > > > was last used, enabling the expiry > > > > duration > > > > > > to be > > > > > > > > > > > reduced > > > > > > > > > > > > to > > > > > > > > > > > > > > > improve > > > > > > > > > > > > > > > > > > > cases > > > > > > > > > > > > > > > > > > > > where a large number of topics are > > > touched > > > > > > > > > > > intermittently. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can you clarify this part a bit? It > > seems > > > > like > > > > > > we > > > > > > > > have a > > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > > > > > expiration policy now for topics, and > we > > > will > > > > > > have > > > > > > > > one > > > > > > > > > > > after > > > > > > > > > > > > > this > > > > > > > > > > > > > > > KIP, > > > > > > > > > > > > > > > > > but > > > > > > > > > > > > > > > > > > > they will be somewhat different. But > > it's > > > > not > > > > > > clear > > > > > > > > to > > > > > > > > > > me > > > > > > > > > > > > what > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > differences are. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In general, if load is a problem, we > > should > > > > > > probably > > > > > > > > > > > consider > > > > > > > > > > > > > > > adding > > > > > > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > > > kind of jitter on the client side. > There > > > are > > > > > > > > definitely > > > > > > > > > > > > cases > > > > > > > > > > > > > > > where > > > > > > > > > > > > > > > > > people > > > > > > > > > > > > > > > > > > > start up a lot of clients at the same > > time > > > in > > > > > > > > parallel > > > > > > > > > > and > > > > > > > > > > > > > there > > > > > > > > > > > > > > > is a > > > > > > > > > > > > > > > > > > > thundering herd problem with metadata > > > > updates. > > > > > > > > Adding > > > > > > > > > > > jitter > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > spread > > > > > > > > > > > > > > > > > > > the load across time rather than > > creating a > > > > spike > > > > > > > > every 5 > > > > > > > > > > > > > minutes > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > case. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 8, 2019, at 08:59, Ismael > > Juma > > > > wrote: > > > > > > > > > > > > > > > > > > > > I think this KIP affects when we > block > > > > which is > > > > > > > > > > actually > > > > > > > > > > > > user > > > > > > > > > > > > > > > visible > > > > > > > > > > > > > > > > > > > > behavior. Right? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 8, 2019, 8:50 AM Brian > > Byrne > > > < > > > > > > > > > > > > > bby...@confluent.io> > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Guozhang, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regarding metadata expiry, no > access > > > > times > > > > > > other > > > > > > > > than > > > > > > > > > > > the > > > > > > > > > > > > > > > initial > > > > > > > > > > > > > > > > > > > lookup[1] > > > > > > > > > > > > > > > > > > > > > are used for determining when to > > expire > > > > > > producer > > > > > > > > > > > > metadata. > > > > > > > > > > > > > > > > > Therefore, > > > > > > > > > > > > > > > > > > > > > frequently used topics' metadata > will > > > be > > > > aged > > > > > > > > out and > > > > > > > > > > > > > > > subsequently > > > > > > > > > > > > > > > > > > > > > refreshed (in a blocking manner) > > every > > > > five > > > > > > > > minutes, > > > > > > > > > > > and > > > > > > > > > > > > > > > > > infrequently > > > > > > > > > > > > > > > > > > > used > > > > > > > > > > > > > > > > > > > > > topics will be retained for a > minimum > > > of > > > > five > > > > > > > > minutes > > > > > > > > > > > and > > > > > > > > > > > > > > > currently > > > > > > > > > > > > > > > > > > > > > refetched on every metadata update > > > during > > > > > > that > > > > > > > > time > > > > > > > > > > > > period. > > > > > > > > > > > > > > The > > > > > > > > > > > > > > > > > > > sentence is > > > > > > > > > > > > > > > > > > > > > suggesting that we could reduce the > > > > expiry > > > > > > time > > > > > > > > to > > > > > > > > > > > > improve > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > latter > > > > > > > > > > > > > > > > > > > > > without affecting (rather slightly > > > > > > improving) the > > > > > > > > > > > former. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Keep in mind that in most all > cases, > > I > > > > > > wouldn't > > > > > > > > > > > > anticipate > > > > > > > > > > > > > > > much of > > > > > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > > > > > > difference with producer behavior, > > and > > > > the > > > > > > extra > > > > > > > > > > logic > > > > > > > > > > > > can > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > > implemented > > > > > > > > > > > > > > > > > > > > > to have insignificant cost. It's > the > > > > > > > > large/dynamic > > > > > > > > > > > topic > > > > > > > > > > > > > > corner > > > > > > > > > > > > > > > > > cases > > > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > > we're trying to improve. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It'd be convenient if the KIP is no > > > > longer > > > > > > > > necessary. > > > > > > > > > > > > > You're > > > > > > > > > > > > > > > right > > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > > > > there's no public API changes and > the > > > > > > behavioral > > > > > > > > > > > changes > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > entirely > > > > > > > > > > > > > > > > > > > > > internal. I'd be happy to continue > > the > > > > > > discussion > > > > > > > > > > > around > > > > > > > > > > > > > the > > > > > > > > > > > > > > > KIP, > > > > > > > > > > > > > > > > > but > > > > > > > > > > > > > > > > > > > > > unless otherwise objected, it can > be > > > > retired. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] Not entirely accurate, it's > > > actually > > > > the > > > > > > > > first > > > > > > > > > > time > > > > > > > > > > > > > when > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > client > > > > > > > > > > > > > > > > > > > > > calculates whether to retain the > > topic > > > > in its > > > > > > > > > > metadata. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > Brian > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Nov 7, 2019 at 4:48 PM > > Guozhang > > > > Wang > > > > > > < > > > > > > > > > > > > > > > wangg...@gmail.com> > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hello Brian, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Could you elaborate a bit more on > > > this > > > > > > > > sentence: > > > > > > > > > > > "This > > > > > > > > > > > > > > logic > > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > > made > > > > > > > > > > > > > > > > > > > > > > more intelligent by managing the > > > expiry > > > > > > from > > > > > > > > when > > > > > > > > > > the > > > > > > > > > > > > > topic > > > > > > > > > > > > > > > was > > > > > > > > > > > > > > > > > last > > > > > > > > > > > > > > > > > > > > > used, > > > > > > > > > > > > > > > > > > > > > > enabling the expiry duration to > be > > > > reduced > > > > > > to > > > > > > > > > > improve > > > > > > > > > > > > > cases > > > > > > > > > > > > > > > > > where a > > > > > > > > > > > > > > > > > > > large > > > > > > > > > > > > > > > > > > > > > > number of topics are touched > > > > > > intermittently." > > > > > > > > Not > > > > > > > > > > > sure > > > > > > > > > > > > I > > > > > > > > > > > > > > > fully > > > > > > > > > > > > > > > > > > > understand > > > > > > > > > > > > > > > > > > > > > > the proposal. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Also since now this KIP did not > > make > > > > any > > > > > > > > public API > > > > > > > > > > > > > changes > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > > behavioral changes are not > > > considered a > > > > > > public > > > > > > > > API > > > > > > > > > > > > > contract > > > > > > > > > > > > > > > (i.e. > > > > > > > > > > > > > > > > > > > how we > > > > > > > > > > > > > > > > > > > > > > maintain the topic metadata in > > > producer > > > > > > cache > > > > > > > > is > > > > > > > > > > > never > > > > > > > > > > > > > > > committed > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > users), > > > > > > > > > > > > > > > > > > > > > > I wonder if we still need a KIP > for > > > the > > > > > > > > proposed > > > > > > > > > > > change > > > > > > > > > > > > > any > > > > > > > > > > > > > > > more? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Nov 7, 2019 at 12:43 PM > > Brian > > > > > > Byrne < > > > > > > > > > > > > > > > bby...@confluent.io > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hello all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to propose a vote for > a > > > > producer > > > > > > > > change > > > > > > > > > > to > > > > > > > > > > > > > > improve > > > > > > > > > > > > > > > > > > > producer > > > > > > > > > > > > > > > > > > > > > > > behavior when dealing with a > > large > > > > > > number of > > > > > > > > > > > topics, > > > > > > > > > > > > in > > > > > > > > > > > > > > > part by > > > > > > > > > > > > > > > > > > > > > reducing > > > > > > > > > > > > > > > > > > > > > > > the amount of metadata fetching > > > > > > performed. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The full KIP is provided here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > And the discussion thread: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > Brian > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang