Yeah if you want to file and JIRA and post a patch for a new option its
possible others would want it. Maybe something like
  pre.initialize.topics=x,y,z
  pre.initialize.timeout=x

The metadata fetch timeout is a bug...that behavior is inherited from
Object.wait which defines zero to mean infinite but I think that is not
very intuitive. If you file a ticket on that we could just fix it. I think
being able to set 0 is actually useful for this case you are trying for.

WRT to stopping the producer I think what you are saying is that you want
it to be the case that calling close() on the producer immediately fails
all outstanding requests with some exception, right?

-Jay

On Fri, Dec 19, 2014 at 1:55 PM, Paul Pearcy <ppea...@gmail.com> wrote:
>
> Hi Jay,
>   I have implemented a wrapper around the producer to behave like I want it
> to. Where it diverges from current 0.8.2 producer is that it accepts three
> new inputs:
> - A list of expected topics
> - A timeout value to init meta for those topics during producer creationg
> - An option to blow up if we fail to init topic meta within some amount of
> time
>
> I also needed to set  metadata.fetch.timeout.ms=1, as 0 means it will
> block
> forever and kick off a thread to do the topic meta data init in the
> background.
>
> On the send side, things do fail fast, now. Only current hiccup(not
> completely done re-working my tests, though) I am hitting now is that
> messages accepted by the producer after the server have been stopped never
> return a status if the producer is stopped, think this is a bug.
>
> Are you sure you wouldn't want any of this behavior in client by default
> which would give out of the box choices to be made on blocking behavior?
> Happy to share code or send a PR.
>
> Thanks,
> Paul
>
> On Fri, Dec 19, 2014 at 2:05 PM, Jay Kreps <j...@confluent.io> wrote:
>
> > Hey Paul,
> >
> > I agree we should document this better.
> >
> > We allow and encourage using partitions to semantically distribute data.
> So
> > unfortunately we can't just arbitrarily assign a partition (say 0) as
> that
> > would actually give incorrect answers for any consumer that made use of
> the
> > partitioning. It is true that the user can change the partitioning, but
> we
> > can't ignore the partitioning they have set.
> >
> > I get the use case you have--you basically want a hard guarantee that
> > send() will never block (so presumably you have set to also drop data if
> > the buffer fills up). As I said the blocking only occurs on the first
> > request for a given topic and you can avoid it by pre-initializing the
> > topic metadata.
> >
> > I think the option you describe is actually possible now. Basically you
> can
> > initialize the metadata for topics you care about using that
> > partitionsFor() call. If you set the property metadata.fetch.timeout.ms
> =0
> > then any send calls prior to the completion of metadata initialization
> will
> > fail immediately rather than block.
> >
> > -Jay
> >
> >
> > On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy <ppea...@gmail.com> wrote:
> > >
> > > Hi Jay,
> > >   Many thanks for the info. All that makes sense, but from an API
> > > standpoint when something is labelled async and returns a Future, this
> > will
> > > be misconstrued and developers will place async sends in critical
> client
> > > facing request/response pathways of code that should never block. If
> the
> > > app comes up with a bad config, it will hang all incoming connections.
> > >
> > > Obviously, there is a spectrum of use cases with regard to message loss
> > and
> > > the defaults cannot cater to all use cases. I like that the defaults
> tend
> > > towards best effort guarantees, but I am not sure it justifies the
> > > inconsistency in the API.
> > >
> > > 1) It sounds like the client is already structured to handle changes in
> > > partitions on the fly, I am sure I am over simplifying but in the case
> > > where no meta is available, but my naive approach would be assume some
> > > number of partitions and then when there is metadata treat it as a
> > > partition change event. If there are more unknown than just partition
> > > count, probably won't work.
> > > 2) Pretty much makes sense, especially now that I see people on this
> > > discussion list wanting a million topics (good luck)
> > > 3) I agree client creation shouldn't fail, but any sends should
> probably
> > > fast fail or have it explicit on the call the choice you are making.
> > >
> > > I'm still thinking about how I am going to make the client behave as
> I'd
> > > like. I think I need a background process kicked off on startup to
> prime
> > > the topics I am interested in. Until that process completes, any sends
> > > through the producer will need to fast fail instead of hang. This would
> > > still leave the window for blocking if you send to a topic your app
> > wasn't
> > > aware it would send to, but now we're getting into corner corner cases.
> > >
> > > Would having something like that be a baked in option be accepted into
> > > Kafka clients mainline?
> > >
> > > A quick win might be to clarify the documentation so that it is clear
> > that
> > > this API will block in cases XYZ (maybe this is mentioned somewhere
> and I
> > > missed it).
> > >
> > > Thanks,
> > > Paul
> > >
> > >
> > > On Thu, Dec 18, 2014 at 1:17 PM, Jay Kreps <j...@confluent.io> wrote:
> > > >
> > > > Hey Paul,
> > > >
> > > > Here are the constraints:
> > > > 1. We wanted the storage of messages to be in their compact binary
> form
> > > so
> > > > we could bound memory usage. This implies partitioning prior to
> > enqueue.
> > > > And as you note partitioning requires having metadata (even stale
> > > metadata)
> > > > about topics.
> > > > 2. We wanted to avoid prefetching metadata for all topics since there
> > may
> > > > be quite a lot of topics.
> > > > 3. We wanted to make metadata fetching lazy so that it would be
> > possible
> > > to
> > > > create a client without having an active network connection. This
> tends
> > > to
> > > > be important when services are brought up in development or test
> > > > environments where it is annoying to have to control the dependency
> > graph
> > > > when starting things.
> > > >
> > > > This blocking isn't too bad as it only occurs on the first request
> for
> > > each
> > > > topic. Our feeling was that many things tend to get setup on a first
> > > > request (DB connections are established, caches populated, etc) so
> this
> > > was
> > > > not unreasonable.
> > > >
> > > > If you want to pre-initialize the metadata to avoid blocking on the
> > first
> > > > request you can do so by fetching the metadata using the
> > > > producer.partitionsFor(topic) api at start-up.
> > > >
> > > > -Jay
> > > >
> > > > On Thu, Dec 18, 2014 at 9:07 AM, Paul Pearcy <ppea...@gmail.com>
> > wrote:
> > > > >
> > > > > Hello,
> > > > >
> > > > >   Playing around with the 0.8.2-beta producer client. One of my
> test
> > > > cases
> > > > > is to ensure producers can deal with Kafka being down when the
> > producer
> > > > is
> > > > > created. My tests failed miserably because of the default blocking
> in
> > > the
> > > > > producer with regard to metadata.fetch.timeout.ms. The first line
> of
> > > new
> > > > > producer is waitOnMetadata which is blocking.
> > > > >
> > > > > I can handle this case by loading topic meta on init and setting
> the
> > > > > timeout value to very low metadata.fetch.timeout.ms and either
> > > throwing
> > > > > away messages or creating my own internal queue to buffer.
> > > > >
> > > > > I’m surprised the metasync isn’t done async. If it fails, return
> that
> > > in
> > > > > the future/callback. This way the API could actually be considered
> > > safely
> > > > > async and the producer buffer could try to hold on to things until
> > > > > block.on.buffer.full kicks in to either drop messages or block.
> You’d
> > > > > probably need a partition callback since numPartitions wouldn’t be
> > > > > available.
> > > > >
> > > > > The implication is that people's apps will work fine if first
> > messages
> > > > are
> > > > > sent while kafka server is up, however, if kafka is down and they
> > > restart
> > > > > their app, the new producer will block all sends and blow things up
> > if
> > > > you
> > > > > haven't written your app to be aware of this edge case.
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Paul
> > > > >
> > > >
> > >
> >
>

Reply via email to