I can't get initial subscriptions without poll. As far as I can tell,
I won't get updated subscriptions (because a partition was added or
another topic matching the pattern was added) without poll either,
right?
I'll take a look at those jiras.
On Mon, Mar 14, 2016 at 4:56 PM, Jason Gustafson w
The offset API is definitely a gap at the moment. I think there were some
problems with the old consumer's API and we wanted to make sure we didn't
make the same mistakes. Unfortunately, I'm not sure anyone has had the time
to give this the attention it needs. Here's a couple JIRAS if you want to
h
Sorry, by metadata I also meant the equivalent of the old
OffsetRequest api, which partitionsFor doesn't give you. I understand
why you didn't want to expose the broken "offsets before a certain
time" api, but I don't understand why equivalent functionality for
first or last offset isn't available
Ah, that makes more sense. I have no idea about the limitations of your use
case, but maybe you could expose a different interface to users.
interface RebalanceListener {
void onPartitionsAssigned(Consumer consumer,
Collection partitions);
void onPartitionsRevoked(Consumer consumer,
Collection
Regarding the rebalance listener, in the case of the spark
integration, it is possible a job can fail and be restarted from
checkpoint in a new jvm. That means that you need to be able to
reconstruct objects. Any reasonable rebalance listener can't have a
0-arg constructor, because it needs a ref
Late arrival to this discussion. I'm not really sure I see the problem with
accessing the consumer in the rebalance listener. Before we passed the
consumer instance as a separate argument, but that was only because the
rebalance listener had to be passed by classname before a reference to the
consu
Honestly the fact that everything is hidden inside poll() has been
confusing people since last year, e.g.
https://issues.apache.org/jira/browse/KAFKA-2359
I can try to formulate a KIP for this, but it seems clear that I'm not
the only one giving this feedback, and I may not understand all the
oth
Cody,
We do not have an umbrella JIRA for this, but rather a case-by-case JIRA
ticket / KIP for API changes in consumer.
If you feel strong about some specific change on the consumer API, please
feel free to create a new KIP with the detailed motivation and proposed
modifications.
Guozhang
On F
Is there a KIP or Jira related to " working on improving these cases
with improved APIs " ?
I saw that there was some discussion of it in KIP-41, but that seemed
to have been resolved in favor of keeping everything inside of poll()
On Fri, Mar 11, 2016 at 11:17 AM, Guozhang Wang wrote:
> Cody, M
Cody, Mansi:
All good points! Let me try to answer them one-by-one.
About this specific issue, as I suggested in the JIRA we can separate the
case about resetting offset upon initializing a partition to fetch, from
the case that fetching offset out-of-range in the auto.offset.reset config.
These
I second the need for having a consumer context passed to rebalance
callback. I have ran into issues several times because of that.
About - subscribe vs assign - I have not read through your spark code yet
(will do by eod), so I am not sure what you mean (other than I do agree
that new partitions
Mansi, I'd agree that the fact that everything is tied up in poll
seems like the source of the awkward behavior.
Regarding assign vs subscribe, most people using the spark integration
are just going to want to provide a topic name, not go figure out a
bunch of partitions. They're also going to be
Yeah, so I'd encourage you guys to consider fixing that while the
consumer is still in beta.
As I said, it makes it awkward to serialize or provide a zero-arg
constructor for a consumer rebalance listener, which is necessary in
our case for restarting a consumer job on behalf of a user. It also
o
Guozhang
Sorry for joining the party a little late. I have been thinking about this
whole awkward behavior of having to call poll(0) to actually make the
underlying subscriptions take effect. Is the core reason for this design
the fact that poll is also the actual heartbeat and you want to make th
In order to do anything meaningful with the consumer itself in rebalance
callback (e.g. commit offset), you would need to hold on the consumer
reference; admittedly it sounds a bit awkward, but by design we choose to
not enforce it in the interface itself.
Guozhang
On Wed, Mar 9, 2016 at 3:39 PM,
So what about my comments regarding the consumer rebalance listener
interface not providing access to a consumer? I can probably work around
it, but it seems odd.
On Mar 9, 2016 5:04 PM, "Guozhang Wang" wrote:
> One thing proposed by Jason:
>
> If you want to only reset offset upon initializatio
One thing proposed by Jason:
If you want to only reset offset upon initialization, and by initialization
you mean "no committed offset", you can do sth. like the following in
rebalance callback.
@Override
public void onPartitionsAssigned(Collection
partitions) {
Filed https://issues.apache.org/jira/browse/KAFKA-3370.
On Wed, Mar 9, 2016 at 1:11 PM, Cody Koeninger wrote:
> That sounds like an interesting way of addressing the problem, can
> continue further discussions on the JIRA
>
>
>
> On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang wrote:
> > Cody:
>
That sounds like an interesting way of addressing the problem, can
continue further discussions on the JIRA
On Wed, Mar 9, 2016 at 2:59 PM, Guozhang Wang wrote:
> Cody:
>
> More specifically, you do not need the "listTopics" function if you already
> know your subscribed topics, just use "parti
Cody:
More specifically, you do not need the "listTopics" function if you already
know your subscribed topics, just use "partitionsFor" is sufficient.
About the fix, I'm thinking of adding two more options in the
auto.offset.rest, say namely "earliest-on-start" and "latest-on-start",
which sets t
Yeah, I think I understood what you were saying. What I'm saying is
that if there were a way to just fetch metadata without doing the rest
of the work poll() does, it wouldn't be necessary. I guess I can do
listTopics to get all metadata for all topics and then parse it.
Regarding running a sing
Cody,
What I meant for a special case of `seekToXX` is that, today when the
function is called with no partition parameters. It will try to execute the
logic on all "assigned" partitions for the consumer. And once that is done,
the subsequent poll() will not throw the exception since it knows thos
Another unfortunate thing about ConsumerRebalanceListener is that in
order to do meaningful work in the callback, you need a reference to
the consumer that called it. But that reference isn't provided to the
callback, which means the listener implementation needs to hold a
reference to the consume
I thought about ConsumerRebalanceListener, but seeking to the
beginning any time there's a rebalance for whatever reason is not
necessarily the same thing as seeking to the beginning before first
starting the consumer.
On Wed, Mar 9, 2016 at 2:24 AM, Kamal C wrote:
> Cody,
>
> Use ConsumerRebalan
Cody,
Use ConsumerRebalanceListener to achieve that,
ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection
partitions) {
}
@Override
public void onPartitionsAssigned
That suggestion doesn't work, for pretty much the same reason - at the
time poll is first called, there is no reset policy and no committed
offset, so NoOffsetForPartitionException is thrown
I feel like the underlying problem isn't so much that seekToEnd needs
special case behavior. It's more tha
Hi Cody,
The problem with that code is in `seekToBeginning()` followed by
`subscribe(topic)`.
Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()`
is called no partition is assigned yet, and hence it is effectively an
no-op.
Try
consumer.subscribe(topics)
consumer.p
Using the 0.9 consumer, I would like to start consuming at the
beginning or end, without specifying auto.offset.reset.
This does not seem to be possible:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> conf.getString("kafka.brokers"),
"key.deserializer" -> classOf[St
28 matches
Mail list logo