Thanks for the details Jungtaek!

I tend to agree with Colin, that using the AdminClient seems to be the
better choice.

You can get all topics via `listTopics()` (and you can refresh this
information on regular intervals) and match any pattern against the list
of available topics in the driver.

As you use `assignment()` and store offsets in the Spark checkpoint, it
seems that using consumer group management is not a good fit for the use
case.


Thoughts?



-Matthias

On 8/12/19 8:22 AM, Colin McCabe wrote:
> Hi,
> 
> If there’s no need to consume records in the Spark driver, then the Consumer 
> is probably the wrong thing to use. Instead, Spark should use AdminClient to 
> find out what partitions exist and where, manage their offsets, and so on. 
> There are some KIPs under discussion now that would add the necessary APIs 
> for managing offsets.
> 
> Best,
> Colin
> 
> On Mon, Aug 12, 2019, at 07:39, Jungtaek Lim wrote:
>> My feeling is that I didn't explain the use case for Spark properly and
>> hence fail to explain the needs. Sorry about this.
>>
>> Spark leverages the single instance of KafkaConsumer in the driver which is
>> registered solely on the consumer group. This is used in the plan phase for
>> each micro-batch to calculate the overall topicpartitions with its offset
>> ranges for this batch, and split and assign (topicpartition, fromOffset,
>> untilOffset) to each input partition. After the planning is done and tasks
>> are being distributed to executors, consumer per each input partition will
>> be initialized from some executor (being assigned to the single
>> topicpartition), and pull the actual records. (Pooling consumers is applied
>> for sure.) As plan phase is to determine the overall topicpartitions and
>> offset ranges to process, Spark is never interested on pulling the records
>> in driver side.
>>
>> Spark mainly leverages poll(0) to get the latest assigned partitions and
>> adopt the changes or validate the expectation. That's not only use case for
>> poll(0). Spark is also seeking the offset per topicpartition to the
>> earliest or the latest, or specific one (either provided by end user or the
>> last committed offset) so that Spark can have actual offset or validate the
>> provided offset. According to the javadoc (if I understand correctly), to
>> get the offset immediately it seems to be required to call `poll` or
>> `position`.
>>
>> The way Spark interacts with Kafka in this plan phase in driver is
>> synchronous, as the phase should finish ASAP to run the next phase.
>> Registering ConsumerRebalanceListener and tracking the change will require
>> some asynchronous handling which sounds to add unnecessary complexity.
>> Spark may be OK with deal with synchronous with timeout (that's what
>> methods in KafkaConsumer have been providing - they're not asynchronous, at
>> least for callers) but dealing with asynchronous is another level of
>> interest. I can see the benefit where continuous thread runs and the
>> consumer is busy with something continuously, relying on listener to hear
>> the news on reassignment. Unfortunately that's not the case.
>>
>> Unit tests in Spark have similar needs: looks like Kafka test code also
>> leverages `updateAssignmentMetadataIfNeeded` and `poll(0)` in many places
>> as it's appropriate to the place which blocking (+timeout) call is
>> preferred - so I can see the similar needs from here as well.
>>
>> On Mon, Aug 12, 2019 at 4:46 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> Hi Guys,
>>>
>>> Please see the actual implementation, pretty sure it explains the situation
>>> well:
>>>
>>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
>>>
>>> To answer one question/assumption which popped up from all of you Spark not
>>> only uses KafkaConsumer#subscribe but pattern subscribe +
>>> KafkaConsumer#assign as well.
>>> Please see here:
>>>
>>> https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Mon, Aug 12, 2019 at 6:38 AM Satish Duggana <satish.dugg...@gmail.com>
>>> wrote:
>>>
>>>> Hi Jungtaek,
>>>> Thanks for the KIP. I have a couple of questions here.
>>>> Is not Spark using Kafka's consumer group management across multiple
>>>> consumers?
>>>>
>>>> Is Spark using KafkaConsumer#subscribe(Pattern pattern,
>>>> ConsumerRebalanceListener listener) only to get all the topics for a
>>>> pattern based subscription and Spark manually assigns those
>>>> topic-partitions across consumers on workers?
>>>>
>>>> Thanks,
>>>> Satish.
>>>>
>>>> On Mon, Aug 12, 2019 at 4:17 AM Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>
>>>>> If am not sure if I fully understand yet.
>>>>>
>>>>> The fact, that Spark does not stores offsets in Kafka but as part of
>>> its
>>>>> own checkpoint mechanism seems to be orthogonal. Maybe I am missing
>>>>> something here.
>>>>>
>>>>> As you are using subscribe(), you use Kafka consumer group mechanism,
>>>>> that takes care of the assignment of partitions to clients within the
>>>>> group. Therefore, I am not sure what you mean by:
>>>>>
>>>>>> which Spark needs to
>>>>>>> know to coordinate multiple consumers to pull correctly.
>>>>>
>>>>> Multiple thoughts that may help:
>>>>>
>>>>> - if Spark needs more control about the partition assignment, you can
>>>>> provide a custom `ConsumerPartitionAssignor` (via the consumer
>>>>> configuration)
>>>>>
>>>>> - you may also want to register `ConsumerRebalanceListener` via
>>>>> `subscribe()` to get informed when the group rebalances
>>>>>
>>>>> As you pointed out, using pattern subscription metadata can change if
>>>>> topic are added/deleted. However, each metadata change will triggering
>>> a
>>>>> rebalance and thus you would get corresponding calls to you rebalance
>>>>> listener to learn about it and react accordingly.
>>>>>
>>>>> Maybe you can explain why neither of both approaches works and what gap
>>>>> the new API would close?
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 8/11/19 5:11 AM, Jungtaek Lim wrote:
>>>>>> Let me elaborate my explanation a bit more. Here we say about Apache
>>>>> Spark,
>>>>>> but this will apply for everything which want to control offset of
>>>> Kafka
>>>>>> consumers.
>>>>>>
>>>>>> Spark is managing the committed offsets and the offsets which should
>>> be
>>>>>> polled now. Topics and partitions as well. This is required as Spark
>>>>> itself
>>>>>> has its own general checkpoint mechanism and Kafka is just a one of
>>>>>> source/sink (though it's considered as very important).
>>>>>>
>>>>>> To pull records from Kafka, Spark provides to Kafka which topics and
>>>>>> partitions it wants to subscribe(, and do seek and poll), but as
>>> Spark
>>>>> can
>>>>>> also provide "patterns" of topics, as well as subscription can be
>>>> changed
>>>>>> in Kafka side (topic added/dropped, partitions added) which Spark
>>> needs
>>>>> to
>>>>>> know to coordinate multiple consumers to pull correctly.
>>>>>>
>>>>>> Looks like assignment() doesn't update the assignment information in
>>>>>> consumer. It just returns known one. There's only one known approach
>>>>> doing
>>>>>> this, calling `poll`, but Spark is not interested on returned
>>> records,
>>>> so
>>>>>> there's a need for a hack `poll(0)`, and Kafka deprecated the API.
>>> This
>>>>> KIP
>>>>>> proposes to support this as official approach.
>>>>>>
>>>>>>
>>>>>> On Sun, Aug 11, 2019 at 8:18 PM Jungtaek Lim <kabh...@gmail.com>
>>>> wrote:
>>>>>>
>>>>>>> Sorry I didn't recognize you're also asking it here as well. I'm in
>>>>> favor
>>>>>>> of describing it in this discussion thread so the discussion itself
>>>> can
>>>>> go
>>>>>>> forward. So copying my answer here:
>>>>>>>
>>>>>>> We have some use case which we don't just rely on everything what
>>>> Kafka
>>>>>>> consumer provides. We want to know current assignment on this
>>>> consumer,
>>>>> and
>>>>>>> to get the latest assignment, we called the hack `poll(0)`.
>>>>>>>
>>>>>>> That said, we don't want to pull any records here, and if I'm not
>>>>> missing
>>>>>>> here, there's no way to accomplish this. Please guide me if I'm
>>>> missing
>>>>>>> something.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Aug 10, 2019 at 2:11 AM Matthias J. Sax <
>>>> matth...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the KIP.
>>>>>>>>
>>>>>>>> Can you elaborate a little bit more on the use case for this
>>> feature?
>>>>>>>> Why would a consumer need to update it's metadata explicitly?
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 8/8/19 8:46 PM, Jungtaek Lim wrote:
>>>>>>>>> Hi devs,
>>>>>>>>>
>>>>>>>>> I'd like to initiate discussion around KIP-505, exposing new
>>> public
>>>>>>>> method
>>>>>>>>> to only update assignment metadata in consumer.
>>>>>>>>>
>>>>>>>>> `poll(0)` has been misused as according to Kafka doc it doesn't
>>>>>>>> guarantee
>>>>>>>>> that it doesn't pull any records, and new method `poll(Duration)`
>>>>>>>> doesn't
>>>>>>>>> have same semantic, so would like to propose new public API which
>>>> only
>>>>>>>> does
>>>>>>>>> the desired behavior.
>>>>>>>>>
>>>>>>>>> KIP page: https://cwiki.apache.org/confluence/x/z5NiBw
>>>>>>>>>
>>>>>>>>> Please feel free to suggest any improvements on proposal, as I'm
>>> new
>>>>> to
>>>>>>>>> Kafka community and may not catch preferences (like
>>> TimeoutException
>>>>> vs
>>>>>>>>> boolean, etc.) on Kafka project.
>>>>>>>>>
>>>>>>>>> Thanks in advance!
>>>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Name : Jungtaek Lim
>>>>>>> Blog : http://medium.com/@heartsavior
>>>>>>> Twitter : http://twitter.com/heartsavior
>>>>>>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> -- 
>> Name : Jungtaek Lim
>> Blog : http://medium.com/@heartsavior
>> Twitter : http://twitter.com/heartsavior
>> LinkedIn : http://www.linkedin.com/in/heartsavior
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to