Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-18 Thread Jay Kreps
There have been a couple of rounds of this. Basically a bunch of complaints people have about the producer boil down to their being no limit on how long a request will block if the kafka cluster goes hard down. Some of the discussion was here, I think: https://issues.apache.org/jira/browse/KAFKA-17

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-18 Thread Joel Koshy
Actually, could you clarify this a bit (since I'm not sure which thread you are referring to) - specifically, how would this tie in with the current timeout we have for the producer (for example)? On Tue, Feb 17, 2015 at 02:55:44PM -0800, Jay Kreps wrote: > Yeah there was a separate thread on addi

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-17 Thread Jay Kreps
Yeah there was a separate thread on adding a client-side timeout to requests. We should have this in the new java clients, it just isn't there yet. When we do this the flush() call will implicitly have the same timeout as the requests (since they will complete or fail by then). I think this makes f

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-17 Thread Guozhang Wang
In the scala clients we have the socket.timeout config as we are using blocking IOs, when such timeout is reached the TimeoutException will be thrown from the socket and the client can handle it accordingly; in the java clients we are switching to non-blocking IOs and hence we will not have the soc

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-17 Thread Jiangjie Qin
I'm thinking the flush call timeout will naturally be the timeout for a produce request, No? Currently it seems we don¹t have a timeout for client requests, should we have one? ‹Jiangjie (Becket) Qin On 2/16/15, 8:19 PM, "Jay Kreps" wrote: >Yes, I think we all agree it would be good to add a c

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-16 Thread Jay Kreps
Yes, I think we all agree it would be good to add a client-side request timeout. That would effectively imply a flush timeout as well since any requests that couldn't complete in that time would be errors and hence completed in the definition we gave. -Jay On Mon, Feb 16, 2015 at 7:57 PM, Bhavesh

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-16 Thread Bhavesh Mistry
Hi All, Thanks Jay and all address concern. I am fine with just having flush() method as long as it covers failure mode and resiliency. e.g We had situation where entire Kafka cluster brokers were reachable, but upon adding new kafka node and admin migrated "leader to new brokers" that new bro

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-12 Thread Joel Koshy
Yes that is a counter-example. I'm okay either way on whether we should have just flush() or have a timeout. Bhavesh, does Jay's explanation a few replies prior address your concern? If so, shall we consider this closed? On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote: > Yeah we could do

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-10 Thread Jay Kreps
Yeah we could do that, I guess I just feel like it adds confusion because then you have to think about which timeout you want, when likely you don't want a timeout at all. I guess the pattern I was thinking of was fflush or the java equivalent, which don't have timeouts: http://docs.oracle.com/jav

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-10 Thread Joel Koshy
I think tryFlush with a timeout sounds good to me. This is really more for consistency than anything else. I cannot think of any standard blocking calls off the top of my head that don't have a timed variant. E.g., Thread.join, Object.wait, Future.get Either that, or they provide an entirely non-bl

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-10 Thread Joel Koshy
> silly. But in the absense of flush there is no way to say that. As you say > you only may that penalty on one of the get() calls, but if the linger.ms > is high (say 60 seconds) that will be a huge penalty. That makes sense - thanks for clarifying. On Mon, Feb 09, 2015 at 08:11:46PM -0800, Jay

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-10 Thread Joe Stein
Jay, The .flush() call seems like it would be the best way if you wanted to-do a clean shutdown of the new producer? So, you could in your code "stop all incoming requests && producer.flush() && system.exit(value)" and know pretty much you won't drop anything on the floor. This can be done with

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-10 Thread Jay Kreps
Hey Bhavesh, If a broker is not available a new one should be elected to take over, so although the flush might take longer it should still be quick. Even if not this should result in an error not a hang. The cases you enumerated are all covered already--if the user wants to retry that is covered

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-09 Thread Bhavesh Mistry
HI Jay, Imagine, if you have flaky network connection to brokers, and if flush() will be blocked if "one of broker is not available" ( basically How would be address failure mode and io thread not able to drain records or busy due to pending request". Do you flush() method is only to flush to in m

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-09 Thread Jay Kreps
Yeah I second the problem Guozhang flags with giving flush a timeout. In general failover in Kafka is a bounded thing unless you have brought your Kafka cluster down entirely so I think depending on that bound implicitly is okay. It is possible to make flush() be instead boolean tryFlush(long ti

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-09 Thread Jay Kreps
Hey Joel, The use case would be for something like mirror maker. You want to do something like the following: while(true) { val recs = consumer.poll(time); for(rec <- recs) producer.send(rec); producer.flush(); consumer.commit(); } If you replace flush() with just calling get() on th

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-09 Thread Joel Koshy
- WRT the motivation: "if you set linger.ms > 0 to encourage batching of messages, which is likely a good idea for this kind of use case, then the second for loop will block for a ms" -> however, in practice this will really only be for the first couple of calls right? Since the subsequent

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-09 Thread Guozhang Wang
The proposal looks good to me, will need some time to review the implementation RB later. Bhavesh, I am wondering how you will use a flush() with a timeout since such a call does not actually provide any flushing guarantees? As for close(), there is a separate JIRA for this: KAFKA-1660

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-09 Thread Bhavesh Mistry
Hi Jay, How about adding timeout for each method calls flush(timeout,TimeUnit) and close(timeout,TimeUNIT) ? We had runway io thread issue and caller thread should not blocked for ever for these methods ? Thanks, Bhavesh On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps wrote: > Well actually in t

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-08 Thread Jay Kreps
Well actually in the case of linger.ms = 0 the send is still asynchronous so calling flush() blocks until all the previously sent records have completed. It doesn't speed anything up in that case, though, since they are already available to send. -Jay On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-08 Thread Gwen Shapira
Looks good to me. I like the idea of not blocking additional sends but not guaranteeing that flush() will deliver them. I assume that with linger.ms = 0, flush will just be a noop (since the queue will be empty). Is that correct? Gwen On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps wrote: > Follow

[DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-08 Thread Jay Kreps
Following up on our previous thread on making batch send a little easier, here is a concrete proposal to add a flush() method to the producer: https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API A proposed implementation is here: https://issues.apache.