I think tryFlush with a timeout sounds good to me. This is really more
for consistency than anything else. I cannot think of any standard
blocking calls off the top of my head that don't have a timed variant.
E.g., Thread.join, Object.wait, Future.get Either that, or they
provide an entirely non-blocking mode (e.g., socketChannel.connect
followed by finishConnect)

Thanks,

Joel

On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
> Jay,
> 
> The .flush() call seems like it would be the best way if you wanted to-do a
> clean shutdown of the new producer?
> 
> So, you could in your code "stop all incoming requests && producer.flush()
> && system.exit(value)" and know pretty much you won't drop anything on the
> floor.
> 
> This can be done with the callbacks and futures (sure) but .flush() seems
> to be the right time to block and a few lines of code, no?
> 
> ~ Joestein
> 
> On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> 
> > Hey Bhavesh,
> >
> > If a broker is not available a new one should be elected to take over, so
> > although the flush might take longer it should still be quick. Even if not
> > this should result in an error not a hang.
> >
> > The cases you enumerated are all covered already--if the user wants to
> > retry that is covered by the retry setting in the client, for all the
> > errors that is considered completion of the request. The post condition of
> > flush isn't that all sends complete successfully, just that they complete.
> > So if you try to send a message that is too big, when flush returns calling
> > .get() on the future should not block and should produce the error.
> >
> > Basically the argument I am making is that the only reason you want to call
> > flush() is to guarantee all the sends complete so if it doesn't guarantee
> > that it will be somewhat confusing. This does mean blocking, but if you
> > don't want to block on the send then you wouldn't call flush().
> >
> > This has no impact on the block.on.buffer full setting. That impacts what
> > happens when send() can't append to the buffer because it is full. flush()
> > means any message previously sent (i.e. for which send() call has returned)
> > needs to have its request completed. Hope that makes sense.
> >
> > -Jay
> >
> > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> > mistry.p.bhav...@gmail.com>
> > wrote:
> >
> > > HI Jay,
> > >
> > > Imagine, if you have flaky network connection to brokers, and if flush()
> > > will be blocked if "one of broker is not available" ( basically How would
> > > be address failure mode and io thread not able to drain records or busy
> > due
> > > to pending request". Do you flush() method is only to flush to in mem
> > queue
> > > or flush to broker over the network().
> > >
> > > Timeout helps with and pushing caller to handle what to do  ?  e.g
> > > re-enqueue records, drop entire batch or one of message is too big cross
> > > the limit of max.message.size etc...
> > >
> > > Also, according to java doc for API  "The method will block until all
> > > previously sent records have completed sending (either successfully or
> > with
> > > an error)", does this by-pass rule set by for block.on.buffer.full or
> > > batch.size
> > > when under load.
> > >
> > > That was my intention, and I am sorry I mixed-up close() method here
> > > without knowing that this is only for bulk send.
> > >
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> > >
> > > > Yeah I second the problem Guozhang flags with giving flush a timeout.
> > In
> > > > general failover in Kafka is a bounded thing unless you have brought
> > your
> > > > Kafka cluster down entirely so I think depending on that bound
> > implicitly
> > > > is okay.
> > > >
> > > > It is possible to make flush() be instead
> > > >   boolean tryFlush(long timeout, TimeUnit unit);
> > > >
> > > > But I am somewhat skeptical that people will use this correctly. I.e
> > > > consider the mirror maker code snippet I gave above, how would one
> > > actually
> > > > recover in this case other than retrying (which the client already does
> > > > automatically)? After all if you are okay losing data then you don't
> > need
> > > > to bother calling flush at all, you can just let the messages be sent
> > > > asynchronously.
> > > >
> > > > I think close() is actually different because you may well want to
> > > shutdown
> > > > immediately and just throw away unsent events.
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > The proposal looks good to me, will need some time to review the
> > > > > implementation RB later.
> > > > >
> > > > > Bhavesh, I am wondering how you will use a flush() with a timeout
> > since
> > > > > such a call does not actually provide any flushing guarantees?
> > > > >
> > > > > As for close(), there is a separate JIRA for this:
> > > > >
> > > > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660>
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > > > mistry.p.bhav...@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Jay,
> > > > > >
> > > > > > How about adding timeout for each method calls
> > > flush(timeout,TimeUnit)
> > > > > and
> > > > > > close(timeout,TimeUNIT) ?  We had runway io thread issue and caller
> > > > > thread
> > > > > > should not blocked for ever for these methods ?
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bhavesh
> > > > > >
> > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <jay.kr...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Well actually in the case of linger.ms = 0 the send is still
> > > > > > asynchronous
> > > > > > > so calling flush() blocks until all the previously sent records
> > > have
> > > > > > > completed. It doesn't speed anything up in that case, though,
> > since
> > > > > they
> > > > > > > are already available to send.
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
> > > gshap...@cloudera.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Looks good to me.
> > > > > > > >
> > > > > > > > I like the idea of not blocking additional sends but not
> > > > guaranteeing
> > > > > > > that
> > > > > > > > flush() will deliver them.
> > > > > > > >
> > > > > > > > I assume that with linger.ms = 0, flush will just be a noop
> > > (since
> > > > > the
> > > > > > > > queue will be empty). Is that correct?
> > > > > > > >
> > > > > > > > Gwen
> > > > > > > >
> > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> > jay.kr...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Following up on our previous thread on making batch send a
> > > little
> > > > > > > easier,
> > > > > > > > > here is a concrete proposal to add a flush() method to the
> > > > > producer:
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > > > > > >
> > > > > > > > > A proposed implementation is here:
> > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > > > > > >
> > > > > > > > > Thoughts?
> > > > > > > > >
> > > > > > > > > -Jay
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >

Reply via email to