Re: KIP-41: KafkaConsumer Max Records

2016-01-13 Thread Jason Gustafson
If there are no more concerns, I'll open the vote for this KIP later today. Thanks, Jason On Tue, Jan 12, 2016 at 5:07 PM, Jason Gustafson wrote: > Hey Aarti, > > Sorry for the late response. Your help with memory management would be > great! Since this does seem to be one of the major outstand

Re: KIP-41: KafkaConsumer Max Records

2016-01-12 Thread Jason Gustafson
Hey Aarti, Sorry for the late response. Your help with memory management would be great! Since this does seem to be one of the major outstanding issues for the consumer, I was going to start looking at it in detail. I found that Jay previously filed KAFKA-2063, which basically proposes to add a ma

Re: KIP-41: KafkaConsumer Max Records

2016-01-08 Thread Aarti Gupta
Hi Jason, +1 on the idea of adding max.poll.bytes as an optional configuration (default set to -1, would mean that the setting does not come into play) The pre-fetching optimization, (pre fetch again only those partitions with no retained data), seems slightly better(same as what we have in produ

Re: KIP-41: KafkaConsumer Max Records

2016-01-08 Thread Jason Gustafson
Thanks Jens for all of your work as well! Unless there are any more concerns, perhaps we can open the vote early next week. As a quick summary for newcomers to this thread, the problem we're trying to solve in this KIP is how to give users more predictable control over the message processing loop.

Re: KIP-41: KafkaConsumer Max Records

2016-01-08 Thread Jason Gustafson
Hi Aarti, Thanks for the feedback. I think the concern about memory overhead is valid. As Guozhang mentioned, the problem already exists in the current consumer, so this probably deserves consideration outside of this KIP. That said, it's a good question whether our prefetching strategy makes it m

Re: KIP-41: KafkaConsumer Max Records

2016-01-08 Thread Jens Rantil
Hi, I just publicly wanted to thank Jason for the work he's done with the KIP and say that I've been in touch with him privately back and forth to work out of some of its details. Thanks! Since it feels like I initiated this KIP a bit I also want to say that I'm happy with it and that its proposa

Re: KIP-41: KafkaConsumer Max Records

2016-01-07 Thread Guozhang Wang
I think it is a general issue to bound the memory footprint on the Java consumer, no matter whether we do the prefetching in this KIP as even today we do not have anyway to manage memory usage on the consumer. Today on the producer side we bound the memory usage, and we may need to do the same on

Re: KIP-41: KafkaConsumer Max Records

2016-01-07 Thread Aarti Gupta
@Jason, (apologies, got your name wrong the first time round) On Thu, Jan 7, 2016 at 5:15 PM, Aarti Gupta wrote: > Hi Json, > > I am concerned about how many records can be prefetched into consumer > memory. > Currently we control the maximum number of bytes per topic and partition > by setting

Re: KIP-41: KafkaConsumer Max Records

2016-01-07 Thread Aarti Gupta
Hi Json, I am concerned about how many records can be prefetched into consumer memory. Currently we control the maximum number of bytes per topic and partition by setting fetch.message.max.bytes The max.partition.fetch.bytes = #no of partitions * fetch.message.max.bytes However, partitions can be

Re: KIP-41: KafkaConsumer Max Records

2016-01-07 Thread Guozhang Wang
Thanks Jason. I think it is a good feature to add, +1. As suggested in KIP-32, we'd better to keep end state of the KIP wiki with finalized implementation details rather than leaving a list of options. I agree that for both fairness and pre-fetching the simpler approach would be sufficient for mos

Re: KIP-41: KafkaConsumer Max Records

2016-01-06 Thread Gwen Shapira
I like the fair-consumption approach you chose - "pull as many records as possible from each partition in a similar round-robin fashion", it is very intuitive and close enough to fair. Overall, I'm +1 on the KIP. But you'll need a formal vote :) On Wed, Jan 6, 2016 at 6:05 PM, Jason Gustafson wr

Re: KIP-41: KafkaConsumer Max Records

2016-01-06 Thread Jason Gustafson
Thanks for the suggestion, Ismael. I updated the KIP. -Jason On Wed, Jan 6, 2016 at 6:57 AM, Ismael Juma wrote: > Thanks Jason. I read the KIP and it makes sense to me. A minor suggestion: > in the "Ensuring Fair Consumption" section, there are 3 paragraphs with 2 > examples (2 partitions with

Re: KIP-41: KafkaConsumer Max Records

2016-01-06 Thread Ismael Juma
Thanks Jason. I read the KIP and it makes sense to me. A minor suggestion: in the "Ensuring Fair Consumption" section, there are 3 paragraphs with 2 examples (2 partitions with 100 max.poll.records and 3 partitions with 30 max.poll.records). I think you could simplify this by using one of the examp

Re: KIP-41: KafkaConsumer Max Records

2016-01-05 Thread Jason Gustafson
I've updated the KIP with some implementation details. I also added more discussion on the heartbeat() alternative. The short answer for why we rejected this API is that it doesn't seem to work well with offset commits. This would tend to make correct usage complicated and difficult to explain. Add

Re: KIP-41: KafkaConsumer Max Records

2016-01-04 Thread Jason Gustafson
Hi Cliff, I think we're all agreed that the current contract of poll() should be kept. The consumer wouldn't wait for max messages to become available in this proposal; it would only sure that it never returns more than max messages. -Jason On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne wrote: >

Re: KIP-41: KafkaConsumer Max Records

2016-01-04 Thread Cliff Rhyne
Instead of a heartbeat, I'd prefer poll() to return whatever messages the client has. Either a) I don't care if I get less than my max message limit or b) I do care and will set a larger timeout. Case B is less common than A and is fairly easy to handle in the application's code. On Mon, Jan 4,

Re: KIP-41: KafkaConsumer Max Records

2016-01-04 Thread Gwen Shapira
1. Agree that TCP window style scaling will be cool. I'll try to think of a good excuse to use it ;) 2. I'm very concerned about the challenges of getting the timeouts, hearbeats and max messages right. Another option could be to expose "heartbeat" API to consumers. If my app is still processing

Re: KIP-41: KafkaConsumer Max Records

2016-01-04 Thread Jason Gustafson
Hey Gwen, I was thinking along the lines of TCP window scaling in order to dynamically find a good consumption rate. Basically you'd start off consuming say 100 records and you'd let it increase until the consumption took longer than half the session timeout (for example). You /might/ be able to a

Re: KIP-41: KafkaConsumer Max Records

2016-01-04 Thread Cliff Rhyne
My team's main use-case benefits from flexibility between consumers, which both options handle well. If the poll timeout triggers before the full batch size, I'd still want to get whatever is available. On Mon, Jan 4, 2016 at 12:49 PM, Gwen Shapira wrote: > I can't speak to all use-cases, but f

Re: KIP-41: KafkaConsumer Max Records

2016-01-04 Thread Gwen Shapira
I can't speak to all use-cases, but for the database one, I think pause-resume will be necessary in any case, and therefore dynamic batch sizes are not needed. Databases are really unexpected regarding response times - load and locking can affect this. I'm not sure there's a good way to know you a

Re: KIP-41: KafkaConsumer Max Records

2016-01-04 Thread Jason Gustafson
Gwen and Ismael, I agree the configuration option is probably the way to go, but I was wondering whether there would be cases where it made sense to let the consumer dynamically set max messages to adjust for downstream slowness. For example, if the consumer is writing consumed records to another

Re: KIP-41: KafkaConsumer Max Records

2016-01-04 Thread Gwen Shapira
The wiki you pointed to is no longer maintained and fell out of sync with the code and protocol. You may want to refer to: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil wrote: > Hi guys, > > I realized I never thanke

Re: KIP-41: KafkaConsumer Max Records

2016-01-04 Thread Jens Rantil
Hi guys, I realized I never thanked yall for your input - thanks! Jason: I apologize for assuming your stance on the issue! Feels like we all agreed on the solution. +1 Follow-up: Jason made a point about defining prefetch and fairness behaviour in the KIP. I am now working on putting that down i

Re: KIP-41: KafkaConsumer Max Records

2015-12-22 Thread Ismael Juma
On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira wrote: > Given the background, it sounds like you'll generally want each call to > poll() to return the same number of events (which is the number you planned > on having enough memory / time for). It also sounds like tuning the number > of events wil

Re: KIP-41: KafkaConsumer Max Records

2015-12-22 Thread Gwen Shapira
Given the background, it sounds like you'll generally want each call to poll() to return the same number of events (which is the number you planned on having enough memory / time for). It also sounds like tuning the number of events will be closely tied to tuning the session timeout. That is - if I

Re: KIP-41: KafkaConsumer Max Records

2015-12-22 Thread Jason Gustafson
For a little more background, the problem we've found in some use cases is that it's difficult to control the processing time when handling batches of records. If processing takes longer than the consumer's session timeout, then the member is kicked out of the group, which can cause excessive rebal

KIP-41: KafkaConsumer Max Records

2015-12-22 Thread Jens Rantil
Hi Kafkaians, Me and Jason Gustafson have been working on a KIP to be able to set the maximum number of messages that a poll() call will return in the new Consumer API. You can find the KIP here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records It contains a fu