Hi Paul, I have faced similar issue, which you have faced. Our use case was bit different and we needed to aggregate events and publish to same partition for same topic. Occasionally, I have run into blocked application threads (not because of metadata but sync block for each batch). When you use new Producer with Asyn mode, application thread is responsible for : 1) compressing message (if you have compression on), b) obtaining lock for partition batch that it hash to and 3) en-queue message and/or 4) if batch is full, allocation new batch (sometime app thread have to wait for waitOnMetadata https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java) . So application thread is doing too much work as compare to old producer, which would simply add into synchronize blocking queue. The workaround that I have done is to have more Producer instance or producer instance per partition or have Global AsyncKafkaProucer as describe in following ticket, but it does not solve true root of problem, but app thread will just en-queue message into blocking queue, but worker threads will pay the cost of doing Kafka work in background as describe above.
For more info, please refer to following jira issue: https://issues.apache.org/jira/browse/KAFKA-1710 I hope this helps! Thanks, Bhavesh On Mon, Dec 29, 2014 at 1:26 PM, Paul Pearcy <ppea...@gmail.com> wrote: > FYI, here is the ticket I opened for this improvement: > https://issues.apache.org/jira/browse/KAFKA-1835 > > Feel free to add feedback on if it meets your use case and if not how > things could. > > This should make the blocking behavior explicit as long as you know all > your topics up front. Ideally a separate queue would be nice, but as Jay > explained to me ends up getting complex because messages are stored in > compact partitioned binary form currently, which requires metadata info. > > Thanks, > Paul > > > On Mon, Dec 29, 2014 at 2:48 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > I don't think a separate queue will be a very simple solution to > implement. > > > > Could you describe your use case a little bit more. It does seem to me > that > > as long as the metadata fetch happens only once and the blocking has a > > tight time bound this should be okay in any use case I can imagine. And, > of > > course, by default the client blocks anyway whenever you exhaust the > memory > > buffer space. But it sounds like you feel it isn't. Maybe you could > > describe the scenario a bit? > > > > I think one thing we could do is what was discussed in another thread, > > namely add an option like > > preinitialize.metadata=true/false > > which would default to false. When true this would cause the producer to > > just initialize metadata for all topics when it is created. Note that > this > > then brings back the opposite problem--doing remote communication during > > initialization which tends to bite a lot of people. But since this would > be > > an option that would default to false perhaps it would be less likely to > > come as a surprise. > > > > -Jay > > > > On Mon, Dec 29, 2014 at 8:38 AM, Steven Wu <stevenz...@gmail.com> wrote: > > > > > +1. it should be truly async in all cases. > > > > > > I understand some challenges that Jay listed in the other thread. But > we > > > need a solution nonetheless. e.g. can we maintain a separate > > > list/queue/buffer for pending messages without metadata. > > > > > > On Tue, Dec 23, 2014 at 12:57 PM, John Boardman < > boardmanjo...@gmail.com > > > > > > wrote: > > > > > > > I was just fighting this same situation. I never expected the new > > > producer > > > > send() method to block as it returns a Future and accepts a Callback. > > > > However, when I tried my unit test, just replacing the old producer > > with > > > > the new, I immediately started getting timeouts waiting for > metadata. I > > > > struggled with this until I went into the source code and found the > > > wait() > > > > that waits for the metadata. > > > > > > > > At that point I realized that this new "async" producer would have to > > be > > > > executed on its own thread, unlike the old producer, which > complicates > > my > > > > code unnecessarily. I totally agree with Paul that the contract of > > send() > > > > is being completely violated with internal code that can block. > > > > > > > > I did try fetching the metadata first, but that only worked for a few > > > calls > > > > before the producer decided it was time to update the metadata again. > > > > > > > > Again, I agree with Paul that this API should be fixed so that it is > > > truly > > > > asynchronous in all cases. Otherwise, it cannot be used on the main > > > thread > > > > of an application as it will block and fail. > > > > > > > > > >