Hi Jungtaek,
Thanks for the KIP. I have a couple of questions here.
Is not Spark using Kafka's consumer group management across multiple
consumers?

Is Spark using KafkaConsumer#subscribe(Pattern pattern,
ConsumerRebalanceListener listener) only to get all the topics for a
pattern based subscription and Spark manually assigns those
topic-partitions across consumers on workers?

Thanks,
Satish.

On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> If am not sure if I fully understand yet.
>
> The fact, that Spark does not stores offsets in Kafka but as part of its
> own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> something here.
>
> As you are using subscribe(), you use Kafka consumer group mechanism,
> that takes care of the assignment of partitions to clients within the
> group. Therefore, I am not sure what you mean by:
>
> > which Spark needs to
> >> know to coordinate multiple consumers to pull correctly.
>
> Multiple thoughts that may help:
>
>  - if Spark needs more control about the partition assignment, you can
> provide a custom `ConsumerPartitionAssignor` (via the consumer
> configuration)
>
>  - you may also want to register `ConsumerRebalanceListener` via
> `subscribe()` to get informed when the group rebalances
>
> As you pointed out, using pattern subscription metadata can change if
> topic are added/deleted. However, each metadata change will triggering a
> rebalance and thus you would get corresponding calls to you rebalance
> listener to learn about it and react accordingly.
>
> Maybe you can explain why neither of both approaches works and what gap
> the new API would close?
>
>
> -Matthias
>
> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> > Let me elaborate my explanation a bit more. Here we say about Apache
> Spark,
> > but this will apply for everything which want to control offset of Kafka
> > consumers.
> >
> > Spark is managing the committed offsets and the offsets which should be
> > polled now. Topics and partitions as well. This is required as Spark
> itself
> > has its own general checkpoint mechanism and Kafka is just a one of
> > source/sink (though it's considered as very important).
> >
> > To pull records from Kafka, Spark provides to Kafka which topics and
> > partitions it wants to subscribe(, and do seek and poll), but as Spark
> can
> > also provide "patterns" of topics, as well as subscription can be changed
> > in Kafka side (topic added/dropped, partitions added) which Spark needs
> to
> > know to coordinate multiple consumers to pull correctly.
> >
> > Looks like assignment() doesn't update the assignment information in
> > consumer. It just returns known one. There's only one known approach
> doing
> > this, calling `poll`, but Spark is not interested on returned records, so
> > there's a need for a hack `poll(0)`, and Kafka deprecated the API. This
> KIP
> > proposes to support this as official approach.
> >
> >
> > On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com> wrote:
> >
> >> Sorry I didn't recognize you're also asking it here as well. I'm in
> favor
> >> of describing it in this discussion thread so the discussion itself can
> go
> >> forward. So copying my answer here:
> >>
> >> We have some use case which we don't just rely on everything what Kafka
> >> consumer provides. We want to know current assignment on this consumer,
> and
> >> to get the latest assignment, we called the hack `poll(0)`.
> >>
> >> That said, we don't want to pull any records here, and if I'm not
> missing
> >> here, there's no way to accomplish this. Please guide me if I'm missing
> >> something.
> >>
> >> Thanks,
> >> Jungtaek Lim (HeartSaVioR)
> >>
> >>
> >>
> >> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <matth...@confluent.io>
> >> wrote:
> >>
> >>> Thanks for the KIP.
> >>>
> >>> Can you elaborate a little bit more on the use case for this feature?
> >>> Why would a consumer need to update it's metadata explicitly?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> >>>> Hi devs,
> >>>>
> >>>> I'd like to initiate discussion around KIP-505, exposing new public
> >>> method
> >>>> to only update assignment metadata in consumer.
> >>>>
> >>>> `poll(0)` has been misused as according to Kafka doc it doesn't
> >>> guarantee
> >>>> that it doesn't pull any records, and new method `poll(Duration)`
> >>> doesn't
> >>>> have same semantic, so would like to propose new public API which only
> >>> does
> >>>> the desired behavior.
> >>>>
> >>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> >>>>
> >>>> Please feel free to suggest any improvements on proposal, as I'm new
> to
> >>>> Kafka community and may not catch preferences (like TimeoutException
> vs
> >>>> boolean, etc.) on Kafka project.
> >>>>
> >>>> Thanks in advance!
> >>>> Jungtaek Lim (HeartSaVioR)
> >>>>
> >>>
> >>>
> >>
> >> --
> >> Name : Jungtaek Lim
> >> Blog : http://medium.com/@heartsavior
> >> Twitter : http://twitter.com/heartsavior
> >> LinkedIn : http://www.linkedin.com/in/heartsavior
> >>
> >
> >
>
>

Reply via email to