Yeah, so I'd encourage you guys to consider fixing that while the consumer is still in beta.
As I said, it makes it awkward to serialize or provide a zero-arg constructor for a consumer rebalance listener, which is necessary in our case for restarting a consumer job on behalf of a user. It also opens the door for strange behavior - what if the consumer that the listener is holding is not the one that sent it a rebalance event? On Wed, Mar 9, 2016 at 10:35 PM, Guozhang Wang <wangg...@gmail.com> wrote: > In order to do anything meaningful with the consumer itself in rebalance > callback (e.g. commit offset), you would need to hold on the consumer > reference; admittedly it sounds a bit awkward, but by design we choose to > not enforce it in the interface itself. > > Guozhang > > On Wed, Mar 9, 2016 at 3:39 PM, Cody Koeninger <c...@koeninger.org> wrote: > >> So what about my comments regarding the consumer rebalance listener >> interface not providing access to a consumer? I can probably work around >> it, but it seems odd. >> On Mar 9, 2016 5:04 PM, "Guozhang Wang" <wangg...@gmail.com> wrote: >> >> > One thing proposed by Jason: >> > >> > If you want to only reset offset upon initialization, and by >> initialization >> > you mean "no committed offset", you can do sth. like the following in >> > rebalance callback. >> > >> > @Override >> > >> > public void >> onPartitionsAssigned(Collection<TopicPartition> >> > partitions) { >> > >> > for (TopicPartition partition : partitions) >> > >> > if (consumer.committed(partition) == null) >> > >> > consumer.seekToBeginning(partition); >> > >> > } >> > >> > >> > Guozhang >> > >> > On Wed, Mar 9, 2016 at 2:11 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> > > Filed https://issues.apache.org/jira/browse/KAFKA-3370. >> > > >> > > On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger <c...@koeninger.org> >> > wrote: >> > > >> > >> That sounds like an interesting way of addressing the problem, can >> > >> continue further discussions on the JIRA >> > >> >> > >> >> > >> >> > >> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang <wangg...@gmail.com> >> > wrote: >> > >> > Cody: >> > >> > >> > >> > More specifically, you do not need the "listTopics" function if you >> > >> already >> > >> > know your subscribed topics, just use "partitionsFor" is sufficient. >> > >> > >> > >> > About the fix, I'm thinking of adding two more options in the >> > >> > auto.offset.rest, say namely "earliest-on-start" and >> > "latest-on-start", >> > >> > which sets the reset position ONLY at starting up. The reason is >> that >> > >> the >> > >> > seekToXX was actually not designed to do such initialization but for >> > >> > calling during the lifetime of the consumer, and we'd better provide >> > the >> > >> > right solution to do so. >> > >> > >> > >> > I can file the JIRA right away and start further discussions there. >> > But >> > >> let >> > >> > me know if you have any other ideas. >> > >> > >> > >> > Guozhang >> > >> > >> > >> > On Wed, Mar 9, 2016 at 12:25 PM, Cody Koeninger <c...@koeninger.org >> > >> > >> wrote: >> > >> > >> > >> >> Yeah, I think I understood what you were saying. What I'm saying >> is >> > >> >> that if there were a way to just fetch metadata without doing the >> > rest >> > >> >> of the work poll() does, it wouldn't be necessary. I guess I can >> do >> > >> >> listTopics to get all metadata for all topics and then parse it. >> > >> >> >> > >> >> Regarding running a single instance, that is the case for what I'm >> > >> >> talking about. >> > >> >> >> > >> >> On Wed, Mar 9, 2016 at 2:02 PM, Guozhang Wang <wangg...@gmail.com> >> > >> wrote: >> > >> >> > Cody, >> > >> >> > >> > >> >> > What I meant for a special case of `seekToXX` is that, today when >> > the >> > >> >> > function is called with no partition parameters. It will try to >> > >> execute >> > >> >> the >> > >> >> > logic on all "assigned" partitions for the consumer. And once >> that >> > is >> > >> >> done, >> > >> >> > the subsequent poll() will not throw the exception since it knows >> > >> those >> > >> >> > partitions needs to reset offsets. >> > >> >> > >> > >> >> > However for your case, there is no assigned partitions yet, and >> > hence >> > >> >> > `seekToXX` will not take effects on any partitions. The >> assignment >> > is >> > >> >> > wrapped in the poll() call as you mentioned. And one way to solve >> > it >> > >> is >> > >> >> to >> > >> >> > let the `seekToXX()` with no parameters do the coordination and >> get >> > >> the >> > >> >> > assigned partitions if there are any subscribed topics, so that >> the >> > >> >> > subsequent poll() will know those partitions need resetting >> > offsets. >> > >> Does >> > >> >> > that make sense? >> > >> >> > >> > >> >> > As for now another way I can think of is to get the partition >> info >> > >> >> > beforehand and call `seekToBeginning` on all partitions. But that >> > >> only >> > >> >> > works if the consumer knows it is going to get all the partitions >> > >> >> assigned >> > >> >> > to itself (i.e. you are only running a single instance). >> > >> >> > >> > >> >> > Guozhang >> > >> >> > >> > >> >> > >> > >> >> > On Wed, Mar 9, 2016 at 6:22 AM, Cody Koeninger < >> c...@koeninger.org >> > > >> > >> >> wrote: >> > >> >> > >> > >> >> >> Another unfortunate thing about ConsumerRebalanceListener is >> that >> > in >> > >> >> >> order to do meaningful work in the callback, you need a >> reference >> > to >> > >> >> >> the consumer that called it. But that reference isn't provided >> to >> > >> the >> > >> >> >> callback, which means the listener implementation needs to hold >> a >> > >> >> >> reference to the consumer. Seems like this makes it >> unnecessarily >> > >> >> >> awkward to serialize or provide a 0 arg constructor for the >> > >> listener. >> > >> >> >> >> > >> >> >> On Wed, Mar 9, 2016 at 7:28 AM, Cody Koeninger < >> > c...@koeninger.org> >> > >> >> wrote: >> > >> >> >> > I thought about ConsumerRebalanceListener, but seeking to the >> > >> >> >> > beginning any time there's a rebalance for whatever reason is >> > not >> > >> >> >> > necessarily the same thing as seeking to the beginning before >> > >> first >> > >> >> >> > starting the consumer. >> > >> >> >> > >> > >> >> >> > On Wed, Mar 9, 2016 at 2:24 AM, Kamal C < >> kamaltar...@gmail.com> >> > >> >> wrote: >> > >> >> >> >> Cody, >> > >> >> >> >> >> > >> >> >> >> Use ConsumerRebalanceListener to achieve that, >> > >> >> >> >> >> > >> >> >> >> ConsumerRebalanceListener listener = new >> > >> ConsumerRebalanceListener() >> > >> >> { >> > >> >> >> >> >> > >> >> >> >> @Override >> > >> >> >> >> public void >> > >> >> onPartitionsRevoked(Collection<TopicPartition> >> > >> >> >> >> partitions) { >> > >> >> >> >> } >> > >> >> >> >> >> > >> >> >> >> @Override >> > >> >> >> >> public void >> > >> >> onPartitionsAssigned(Collection<TopicPartition> >> > >> >> >> >> partitions) { >> > >> >> >> >> >> consumer.seekToBeginning(partitions.toArray(new >> > >> >> >> >> TopicPartition[0])); >> > >> >> >> >> } >> > >> >> >> >> }; >> > >> >> >> >> >> > >> >> >> >> consumer.subscribe(topics, listener); >> > >> >> >> >> >> > >> >> >> >> On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger < >> > >> c...@koeninger.org> >> > >> >> >> wrote: >> > >> >> >> >> >> > >> >> >> >>> That suggestion doesn't work, for pretty much the same >> reason >> > - >> > >> at >> > >> >> the >> > >> >> >> >>> time poll is first called, there is no reset policy and no >> > >> committed >> > >> >> >> >>> offset, so NoOffsetForPartitionException is thrown >> > >> >> >> >>> >> > >> >> >> >>> I feel like the underlying problem isn't so much that >> > seekToEnd >> > >> >> needs >> > >> >> >> >>> special case behavior. It's more that topic metadata >> > fetches, >> > >> >> >> >>> consumer position fetches, and message fetches are all >> lumped >> > >> >> together >> > >> >> >> >>> under a single poll() call, with no way to do them >> > individually >> > >> if >> > >> >> >> >>> necessary. >> > >> >> >> >>> >> > >> >> >> >>> What does "work" in this situation is to just catch the >> > >> exception >> > >> >> >> >>> (which leaves the consumer in a state where topics are >> > >> assigned) and >> > >> >> >> >>> then seek. But that is not exactly an elegant interface. >> > >> >> >> >>> >> > >> >> >> >>> consumer.subscribe(topics) >> > >> >> >> >>> try { >> > >> >> >> >>> consumer.poll(0) >> > >> >> >> >>> } catch { >> > >> >> >> >>> case x: Throwable => >> > >> >> >> >>> } >> > >> >> >> >>> consumer.seekToBeginning() >> > >> >> >> >>> consumer.poll(0) >> > >> >> >> >>> >> > >> >> >> >>> >> > >> >> >> >>> >> > >> >> >> >>> >> > >> >> >> >>> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang < >> > >> wangg...@gmail.com> >> > >> >> >> wrote: >> > >> >> >> >>> > Hi Cody, >> > >> >> >> >>> > >> > >> >> >> >>> > The problem with that code is in `seekToBeginning()` >> > followed >> > >> by >> > >> >> >> >>> > `subscribe(topic)`. >> > >> >> >> >>> > >> > >> >> >> >>> > Since `subscribe` call is lazy evaluated, by the time >> > >> >> >> `seekToBeginning()` >> > >> >> >> >>> > is called no partition is assigned yet, and hence it is >> > >> >> effectively >> > >> >> >> an >> > >> >> >> >>> > no-op. >> > >> >> >> >>> > >> > >> >> >> >>> > Try >> > >> >> >> >>> > >> > >> >> >> >>> > consumer.subscribe(topics) >> > >> >> >> >>> > consumer.poll(0); // get assigned partitions >> > >> >> >> >>> > consumer.seekToBeginning() >> > >> >> >> >>> > consumer.poll(0) >> > >> >> >> >>> > >> > >> >> >> >>> > to see if that works. >> > >> >> >> >>> > >> > >> >> >> >>> > I think it is a valid issue that can be fixed in the new >> > >> consumer >> > >> >> >> that, >> > >> >> >> >>> > upon calling seekToEnd/Beginning with no parameter, while >> no >> > >> >> >> assigned is >> > >> >> >> >>> > done yet, do the coordination behind the scene; it will >> > though >> > >> >> >> change the >> > >> >> >> >>> > behavior of the functions as they are no longer always >> > lazily >> > >> >> >> evaluated. >> > >> >> >> >>> > >> > >> >> >> >>> > >> > >> >> >> >>> > Guozhang >> > >> >> >> >>> > >> > >> >> >> >>> > >> > >> >> >> >>> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger < >> > >> >> c...@koeninger.org> >> > >> >> >> >>> wrote: >> > >> >> >> >>> > >> > >> >> >> >>> >> Using the 0.9 consumer, I would like to start consuming >> at >> > >> the >> > >> >> >> >>> >> beginning or end, without specifying auto.offset.reset. >> > >> >> >> >>> >> >> > >> >> >> >>> >> This does not seem to be possible: >> > >> >> >> >>> >> >> > >> >> >> >>> >> val kafkaParams = Map[String, Object]( >> > >> >> >> >>> >> "bootstrap.servers" -> >> > conf.getString("kafka.brokers"), >> > >> >> >> >>> >> "key.deserializer" -> classOf[StringDeserializer], >> > >> >> >> >>> >> "value.deserializer" -> >> classOf[StringDeserializer], >> > >> >> >> >>> >> "group.id" -> "example", >> > >> >> >> >>> >> "auto.offset.reset" -> "none" >> > >> >> >> >>> >> ).asJava >> > >> >> >> >>> >> val topics = >> > >> >> >> conf.getString("kafka.topics").split(",").toList.asJava >> > >> >> >> >>> >> val consumer = new KafkaConsumer[String, >> > >> String](kafkaParams) >> > >> >> >> >>> >> consumer.subscribe(topics) >> > >> >> >> >>> >> consumer.seekToBeginning() >> > >> >> >> >>> >> consumer.poll(0) >> > >> >> >> >>> >> >> > >> >> >> >>> >> >> > >> >> >> >>> >> Results in: >> > >> >> >> >>> >> >> > >> >> >> >>> >> Exception in thread "main" >> > >> >> >> >>> >> >> > >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException: >> > >> >> >> >>> >> Undefined offset with no reset policy for partition: >> > >> testtwo-4 >> > >> >> >> >>> >> at >> > >> >> >> >>> >> >> > >> >> >> >>> >> > >> >> >> >> > >> >> >> > >> >> > >> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288) >> > >> >> >> >>> >> at >> > >> >> >> >>> >> >> > >> >> >> >>> >> > >> >> >> >> > >> >> >> > >> >> > >> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167) >> > >> >> >> >>> >> at >> > >> >> >> >>> >> >> > >> >> >> >>> >> > >> >> >> >> > >> >> >> > >> >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302) >> > >> >> >> >>> >> at >> > >> >> >> >>> >> >> > >> >> >> >>> >> > >> >> >> >> > >> >> >> > >> >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895) >> > >> >> >> >>> >> at >> > >> >> >> >>> >> >> > >> >> >> >>> >> > >> >> >> >> > >> >> >> > >> >> > >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) >> > >> >> >> >>> >> at >> > >> >> >> example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25) >> > >> >> >> >>> >> >> > >> >> >> >>> >> >> > >> >> >> >>> >> I'm assuming this is because, at the time >> seekToBeginning() >> > >> is >> > >> >> >> called, >> > >> >> >> >>> >> subscriptions.assignedPartitions isn't populated. But >> > >> polling in >> > >> >> >> >>> >> order to assign topicpartitions results in an error, >> which >> > >> >> creates a >> > >> >> >> >>> >> chicken-or-the-egg situation. >> > >> >> >> >>> >> >> > >> >> >> >>> >> I don't want to set auto.offset.reset, because I want a >> > hard >> > >> >> error >> > >> >> >> if >> > >> >> >> >>> >> the offsets are out of range at any other time during >> > >> >> consumption. >> > >> >> >> >>> >> >> > >> >> >> >>> > >> > >> >> >> >>> > >> > >> >> >> >>> > >> > >> >> >> >>> > -- >> > >> >> >> >>> > -- Guozhang >> > >> >> >> >>> >> > >> >> >> >> > >> >> > >> > >> >> > >> > >> >> > >> > >> >> > -- >> > >> >> > -- Guozhang >> > >> >> >> > >> > >> > >> > >> > >> > >> > >> > -- >> > >> > -- Guozhang >> > >> >> > > >> > > >> > > >> > > -- >> > > -- Guozhang >> > > >> > >> > >> > >> > -- >> > -- Guozhang >> > >> > > > > -- > -- Guozhang