Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-15 Thread Cody Koeninger
I can't get initial subscriptions without poll. As far as I can tell, I won't get updated subscriptions (because a partition was added or another topic matching the pattern was added) without poll either, right? I'll take a look at those jiras. On Mon, Mar 14, 2016 at 4:56 PM, Jason Gustafson w

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-14 Thread Jason Gustafson
The offset API is definitely a gap at the moment. I think there were some problems with the old consumer's API and we wanted to make sure we didn't make the same mistakes. Unfortunately, I'm not sure anyone has had the time to give this the attention it needs. Here's a couple JIRAS if you want to h

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-14 Thread Cody Koeninger
Sorry, by metadata I also meant the equivalent of the old OffsetRequest api, which partitionsFor doesn't give you. I understand why you didn't want to expose the broken "offsets before a certain time" api, but I don't understand why equivalent functionality for first or last offset isn't available

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-14 Thread Jason Gustafson
Ah, that makes more sense. I have no idea about the limitations of your use case, but maybe you could expose a different interface to users. interface RebalanceListener { void onPartitionsAssigned(Consumer consumer, Collection partitions); void onPartitionsRevoked(Consumer consumer, Collection

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-14 Thread Cody Koeninger
Regarding the rebalance listener, in the case of the spark integration, it is possible a job can fail and be restarted from checkpoint in a new jvm. That means that you need to be able to reconstruct objects. Any reasonable rebalance listener can't have a 0-arg constructor, because it needs a ref

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-14 Thread Jason Gustafson
Late arrival to this discussion. I'm not really sure I see the problem with accessing the consumer in the rebalance listener. Before we passed the consumer instance as a separate argument, but that was only because the rebalance listener had to be passed by classname before a reference to the consu

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-14 Thread Cody Koeninger
Honestly the fact that everything is hidden inside poll() has been confusing people since last year, e.g. https://issues.apache.org/jira/browse/KAFKA-2359 I can try to formulate a KIP for this, but it seems clear that I'm not the only one giving this feedback, and I may not understand all the oth

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-13 Thread Guozhang Wang
Cody, We do not have an umbrella JIRA for this, but rather a case-by-case JIRA ticket / KIP for API changes in consumer. If you feel strong about some specific change on the consumer API, please feel free to create a new KIP with the detailed motivation and proposed modifications. Guozhang On F

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-11 Thread Cody Koeninger
Is there a KIP or Jira related to " working on improving these cases with improved APIs " ? I saw that there was some discussion of it in KIP-41, but that seemed to have been resolved in favor of keeping everything inside of poll() On Fri, Mar 11, 2016 at 11:17 AM, Guozhang Wang wrote: > Cody, M

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-11 Thread Guozhang Wang
Cody, Mansi: All good points! Let me try to answer them one-by-one. About this specific issue, as I suggested in the JIRA we can separate the case about resetting offset upon initializing a partition to fetch, from the case that fetching offset out-of-range in the auto.offset.reset config. These

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-10 Thread Mansi Shah
I second the need for having a consumer context passed to rebalance callback. I have ran into issues several times because of that. About - subscribe vs assign - I have not read through your spark code yet (will do by eod), so I am not sure what you mean (other than I do agree that new partitions

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-10 Thread Cody Koeninger
Mansi, I'd agree that the fact that everything is tied up in poll seems like the source of the awkward behavior. Regarding assign vs subscribe, most people using the spark integration are just going to want to provide a topic name, not go figure out a bunch of partitions. They're also going to be

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-10 Thread Cody Koeninger
Yeah, so I'd encourage you guys to consider fixing that while the consumer is still in beta. As I said, it makes it awkward to serialize or provide a zero-arg constructor for a consumer rebalance listener, which is necessary in our case for restarting a consumer job on behalf of a user. It also o

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-10 Thread Mansi Shah
Guozhang Sorry for joining the party a little late. I have been thinking about this whole awkward behavior of having to call poll(0) to actually make the underlying subscriptions take effect. Is the core reason for this design the fact that poll is also the actual heartbeat and you want to make th

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-09 Thread Guozhang Wang
In order to do anything meaningful with the consumer itself in rebalance callback (e.g. commit offset), you would need to hold on the consumer reference; admittedly it sounds a bit awkward, but by design we choose to not enforce it in the interface itself. Guozhang On Wed, Mar 9, 2016 at 3:39 PM,

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-09 Thread Cody Koeninger
So what about my comments regarding the consumer rebalance listener interface not providing access to a consumer? I can probably work around it, but it seems odd. On Mar 9, 2016 5:04 PM, "Guozhang Wang" wrote: > One thing proposed by Jason: > > If you want to only reset offset upon initializatio

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-09 Thread Guozhang Wang
One thing proposed by Jason: If you want to only reset offset upon initialization, and by initialization you mean "no committed offset", you can do sth. like the following in rebalance callback. @Override public void onPartitionsAssigned(Collection partitions) {

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-09 Thread Guozhang Wang
Filed https://issues.apache.org/jira/browse/KAFKA-3370. On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger wrote: > That sounds like an interesting way of addressing the problem, can > continue further discussions on the JIRA > > > > On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang wrote: > > Cody: >

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-09 Thread Cody Koeninger
That sounds like an interesting way of addressing the problem, can continue further discussions on the JIRA On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang wrote: > Cody: > > More specifically, you do not need the "listTopics" function if you already > know your subscribed topics, just use "parti

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-09 Thread Guozhang Wang
Cody: More specifically, you do not need the "listTopics" function if you already know your subscribed topics, just use "partitionsFor" is sufficient. About the fix, I'm thinking of adding two more options in the auto.offset.rest, say namely "earliest-on-start" and "latest-on-start", which sets t

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-09 Thread Cody Koeninger
Yeah, I think I understood what you were saying. What I'm saying is that if there were a way to just fetch metadata without doing the rest of the work poll() does, it wouldn't be necessary. I guess I can do listTopics to get all metadata for all topics and then parse it. Regarding running a sing

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-09 Thread Guozhang Wang
Cody, What I meant for a special case of `seekToXX` is that, today when the function is called with no partition parameters. It will try to execute the logic on all "assigned" partitions for the consumer. And once that is done, the subsequent poll() will not throw the exception since it knows thos

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-09 Thread Cody Koeninger
Another unfortunate thing about ConsumerRebalanceListener is that in order to do meaningful work in the callback, you need a reference to the consumer that called it. But that reference isn't provided to the callback, which means the listener implementation needs to hold a reference to the consume

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-09 Thread Cody Koeninger
I thought about ConsumerRebalanceListener, but seeking to the beginning any time there's a rebalance for whatever reason is not necessarily the same thing as seeking to the beginning before first starting the consumer. On Wed, Mar 9, 2016 at 2:24 AM, Kamal C wrote: > Cody, > > Use ConsumerRebalan

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-09 Thread Kamal C
Cody, Use ConsumerRebalanceListener to achieve that, ConsumerRebalanceListener listener = new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection partitions) { } @Override public void onPartitionsAssigned

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-08 Thread Cody Koeninger
That suggestion doesn't work, for pretty much the same reason - at the time poll is first called, there is no reset policy and no committed offset, so NoOffsetForPartitionException is thrown I feel like the underlying problem isn't so much that seekToEnd needs special case behavior. It's more tha

Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-08 Thread Guozhang Wang
Hi Cody, The problem with that code is in `seekToBeginning()` followed by `subscribe(topic)`. Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()` is called no partition is assigned yet, and hence it is effectively an no-op. Try consumer.subscribe(topics) consumer.p

seekToBeginning doesn't work without auto.offset.reset

2016-03-08 Thread Cody Koeninger
Using the 0.9 consumer, I would like to start consuming at the beginning or end, without specifying auto.offset.reset. This does not seem to be possible: val kafkaParams = Map[String, Object]( "bootstrap.servers" -> conf.getString("kafka.brokers"), "key.deserializer" -> classOf[St