Hi Guys,

Please see the actual implementation, pretty sure it explains the situation
well:
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala

To answer one question/assumption which popped up from all of you Spark not
only uses KafkaConsumer#subscribe but pattern subscribe +
KafkaConsumer#assign as well.
Please see here:
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala

BR,
G


On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <satish.dugg...@gmail.com>
wrote:

> Hi Jungtaek,
> Thanks for the KIP. I have a couple of questions here.
> Is not Spark using Kafka's consumer group management across multiple
> consumers?
>
> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
> ConsumerRebalanceListener listener) only to get all the topics for a
> pattern based subscription and Spark manually assigns those
> topic-partitions across consumers on workers?
>
> Thanks,
> Satish.
>
> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > If am not sure if I fully understand yet.
> >
> > The fact, that Spark does not stores offsets in Kafka but as part of its
> > own checkpoint mechanism seems to be orthogonal. Maybe I am missing
> > something here.
> >
> > As you are using subscribe(), you use Kafka consumer group mechanism,
> > that takes care of the assignment of partitions to clients within the
> > group. Therefore, I am not sure what you mean by:
> >
> > > which Spark needs to
> > >> know to coordinate multiple consumers to pull correctly.
> >
> > Multiple thoughts that may help:
> >
> >  - if Spark needs more control about the partition assignment, you can
> > provide a custom `ConsumerPartitionAssignor` (via the consumer
> > configuration)
> >
> >  - you may also want to register `ConsumerRebalanceListener` via
> > `subscribe()` to get informed when the group rebalances
> >
> > As you pointed out, using pattern subscription metadata can change if
> > topic are added/deleted. However, each metadata change will triggering a
> > rebalance and thus you would get corresponding calls to you rebalance
> > listener to learn about it and react accordingly.
> >
> > Maybe you can explain why neither of both approaches works and what gap
> > the new API would close?
> >
> >
> > -Matthias
> >
> > On 8/11/19 5:11 AM, Jungtaek Lim wrote:
> > > Let me elaborate my explanation a bit more. Here we say about Apache
> > Spark,
> > > but this will apply for everything which want to control offset of
> Kafka
> > > consumers.
> > >
> > > Spark is managing the committed offsets and the offsets which should be
> > > polled now. Topics and partitions as well. This is required as Spark
> > itself
> > > has its own general checkpoint mechanism and Kafka is just a one of
> > > source/sink (though it's considered as very important).
> > >
> > > To pull records from Kafka, Spark provides to Kafka which topics and
> > > partitions it wants to subscribe(, and do seek and poll), but as Spark
> > can
> > > also provide "patterns" of topics, as well as subscription can be
> changed
> > > in Kafka side (topic added/dropped, partitions added) which Spark needs
> > to
> > > know to coordinate multiple consumers to pull correctly.
> > >
> > > Looks like assignment() doesn't update the assignment information in
> > > consumer. It just returns known one. There's only one known approach
> > doing
> > > this, calling `poll`, but Spark is not interested on returned records,
> so
> > > there's a need for a hack `poll(0)`, and Kafka deprecated the API. This
> > KIP
> > > proposes to support this as official approach.
> > >
> > >
> > > On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com>
> wrote:
> > >
> > >> Sorry I didn't recognize you're also asking it here as well. I'm in
> > favor
> > >> of describing it in this discussion thread so the discussion itself
> can
> > go
> > >> forward. So copying my answer here:
> > >>
> > >> We have some use case which we don't just rely on everything what
> Kafka
> > >> consumer provides. We want to know current assignment on this
> consumer,
> > and
> > >> to get the latest assignment, we called the hack `poll(0)`.
> > >>
> > >> That said, we don't want to pull any records here, and if I'm not
> > missing
> > >> here, there's no way to accomplish this. Please guide me if I'm
> missing
> > >> something.
> > >>
> > >> Thanks,
> > >> Jungtaek Lim (HeartSaVioR)
> > >>
> > >>
> > >>
> > >> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
> matth...@confluent.io>
> > >> wrote:
> > >>
> > >>> Thanks for the KIP.
> > >>>
> > >>> Can you elaborate a little bit more on the use case for this feature?
> > >>> Why would a consumer need to update it's metadata explicitly?
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
> > >>>> Hi devs,
> > >>>>
> > >>>> I'd like to initiate discussion around KIP-505, exposing new public
> > >>> method
> > >>>> to only update assignment metadata in consumer.
> > >>>>
> > >>>> `poll(0)` has been misused as according to Kafka doc it doesn't
> > >>> guarantee
> > >>>> that it doesn't pull any records, and new method `poll(Duration)`
> > >>> doesn't
> > >>>> have same semantic, so would like to propose new public API which
> only
> > >>> does
> > >>>> the desired behavior.
> > >>>>
> > >>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
> > >>>>
> > >>>> Please feel free to suggest any improvements on proposal, as I'm new
> > to
> > >>>> Kafka community and may not catch preferences (like TimeoutException
> > vs
> > >>>> boolean, etc.) on Kafka project.
> > >>>>
> > >>>> Thanks in advance!
> > >>>> Jungtaek Lim (HeartSaVioR)
> > >>>>
> > >>>
> > >>>
> > >>
> > >> --
> > >> Name : Jungtaek Lim
> > >> Blog : http://medium.com/@heartsavior
> > >> Twitter : http://twitter.com/heartsavior
> > >> LinkedIn : http://www.linkedin.com/in/heartsavior
> > >>
> > >
> > >
> >
> >
>

Reply via email to