Yeah if you want to file and JIRA and post a patch for a new option its possible others would want it. Maybe something like pre.initialize.topics=x,y,z pre.initialize.timeout=x
The metadata fetch timeout is a bug...that behavior is inherited from Object.wait which defines zero to mean infinite but I think that is not very intuitive. If you file a ticket on that we could just fix it. I think being able to set 0 is actually useful for this case you are trying for. WRT to stopping the producer I think what you are saying is that you want it to be the case that calling close() on the producer immediately fails all outstanding requests with some exception, right? -Jay On Fri, Dec 19, 2014 at 1:55 PM, Paul Pearcy <ppea...@gmail.com> wrote: > > Hi Jay, > I have implemented a wrapper around the producer to behave like I want it > to. Where it diverges from current 0.8.2 producer is that it accepts three > new inputs: > - A list of expected topics > - A timeout value to init meta for those topics during producer creationg > - An option to blow up if we fail to init topic meta within some amount of > time > > I also needed to set metadata.fetch.timeout.ms=1, as 0 means it will > block > forever and kick off a thread to do the topic meta data init in the > background. > > On the send side, things do fail fast, now. Only current hiccup(not > completely done re-working my tests, though) I am hitting now is that > messages accepted by the producer after the server have been stopped never > return a status if the producer is stopped, think this is a bug. > > Are you sure you wouldn't want any of this behavior in client by default > which would give out of the box choices to be made on blocking behavior? > Happy to share code or send a PR. > > Thanks, > Paul > > On Fri, Dec 19, 2014 at 2:05 PM, Jay Kreps <j...@confluent.io> wrote: > > > Hey Paul, > > > > I agree we should document this better. > > > > We allow and encourage using partitions to semantically distribute data. > So > > unfortunately we can't just arbitrarily assign a partition (say 0) as > that > > would actually give incorrect answers for any consumer that made use of > the > > partitioning. It is true that the user can change the partitioning, but > we > > can't ignore the partitioning they have set. > > > > I get the use case you have--you basically want a hard guarantee that > > send() will never block (so presumably you have set to also drop data if > > the buffer fills up). As I said the blocking only occurs on the first > > request for a given topic and you can avoid it by pre-initializing the > > topic metadata. > > > > I think the option you describe is actually possible now. Basically you > can > > initialize the metadata for topics you care about using that > > partitionsFor() call. If you set the property metadata.fetch.timeout.ms > =0 > > then any send calls prior to the completion of metadata initialization > will > > fail immediately rather than block. > > > > -Jay > > > > > > On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy <ppea...@gmail.com> wrote: > > > > > > Hi Jay, > > > Many thanks for the info. All that makes sense, but from an API > > > standpoint when something is labelled async and returns a Future, this > > will > > > be misconstrued and developers will place async sends in critical > client > > > facing request/response pathways of code that should never block. If > the > > > app comes up with a bad config, it will hang all incoming connections. > > > > > > Obviously, there is a spectrum of use cases with regard to message loss > > and > > > the defaults cannot cater to all use cases. I like that the defaults > tend > > > towards best effort guarantees, but I am not sure it justifies the > > > inconsistency in the API. > > > > > > 1) It sounds like the client is already structured to handle changes in > > > partitions on the fly, I am sure I am over simplifying but in the case > > > where no meta is available, but my naive approach would be assume some > > > number of partitions and then when there is metadata treat it as a > > > partition change event. If there are more unknown than just partition > > > count, probably won't work. > > > 2) Pretty much makes sense, especially now that I see people on this > > > discussion list wanting a million topics (good luck) > > > 3) I agree client creation shouldn't fail, but any sends should > probably > > > fast fail or have it explicit on the call the choice you are making. > > > > > > I'm still thinking about how I am going to make the client behave as > I'd > > > like. I think I need a background process kicked off on startup to > prime > > > the topics I am interested in. Until that process completes, any sends > > > through the producer will need to fast fail instead of hang. This would > > > still leave the window for blocking if you send to a topic your app > > wasn't > > > aware it would send to, but now we're getting into corner corner cases. > > > > > > Would having something like that be a baked in option be accepted into > > > Kafka clients mainline? > > > > > > A quick win might be to clarify the documentation so that it is clear > > that > > > this API will block in cases XYZ (maybe this is mentioned somewhere > and I > > > missed it). > > > > > > Thanks, > > > Paul > > > > > > > > > On Thu, Dec 18, 2014 at 1:17 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > > Hey Paul, > > > > > > > > Here are the constraints: > > > > 1. We wanted the storage of messages to be in their compact binary > form > > > so > > > > we could bound memory usage. This implies partitioning prior to > > enqueue. > > > > And as you note partitioning requires having metadata (even stale > > > metadata) > > > > about topics. > > > > 2. We wanted to avoid prefetching metadata for all topics since there > > may > > > > be quite a lot of topics. > > > > 3. We wanted to make metadata fetching lazy so that it would be > > possible > > > to > > > > create a client without having an active network connection. This > tends > > > to > > > > be important when services are brought up in development or test > > > > environments where it is annoying to have to control the dependency > > graph > > > > when starting things. > > > > > > > > This blocking isn't too bad as it only occurs on the first request > for > > > each > > > > topic. Our feeling was that many things tend to get setup on a first > > > > request (DB connections are established, caches populated, etc) so > this > > > was > > > > not unreasonable. > > > > > > > > If you want to pre-initialize the metadata to avoid blocking on the > > first > > > > request you can do so by fetching the metadata using the > > > > producer.partitionsFor(topic) api at start-up. > > > > > > > > -Jay > > > > > > > > On Thu, Dec 18, 2014 at 9:07 AM, Paul Pearcy <ppea...@gmail.com> > > wrote: > > > > > > > > > > Hello, > > > > > > > > > > Playing around with the 0.8.2-beta producer client. One of my > test > > > > cases > > > > > is to ensure producers can deal with Kafka being down when the > > producer > > > > is > > > > > created. My tests failed miserably because of the default blocking > in > > > the > > > > > producer with regard to metadata.fetch.timeout.ms. The first line > of > > > new > > > > > producer is waitOnMetadata which is blocking. > > > > > > > > > > I can handle this case by loading topic meta on init and setting > the > > > > > timeout value to very low metadata.fetch.timeout.ms and either > > > throwing > > > > > away messages or creating my own internal queue to buffer. > > > > > > > > > > I’m surprised the metasync isn’t done async. If it fails, return > that > > > in > > > > > the future/callback. This way the API could actually be considered > > > safely > > > > > async and the producer buffer could try to hold on to things until > > > > > block.on.buffer.full kicks in to either drop messages or block. > You’d > > > > > probably need a partition callback since numPartitions wouldn’t be > > > > > available. > > > > > > > > > > The implication is that people's apps will work fine if first > > messages > > > > are > > > > > sent while kafka server is up, however, if kafka is down and they > > > restart > > > > > their app, the new producer will block all sends and blow things up > > if > > > > you > > > > > haven't written your app to be aware of this edge case. > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Paul > > > > > > > > > > > > > > >