I am still not sure how Samza's MessageChooser actually works and how
this would align with KafkaConsumer fetch requests.


Maybe I can give some background (conceptually); @Colin, please correct
me if I say anything wrong:


When a fetch request is send, all assigned topic partitions of the
consumers are ordered in a list and the broker will return data starting
with the first partition in the list and returning as many messages as
possible (either until partition end-offset or fetch size is reached).
If end-offset is reached but not fetch size, the next partition in the
list is considered. This repeats until fetch size is reached. If a
partition in the list has no data available, it's skipped.

When data is return to the consumer, the consumer moves all partitions
for which data was returned to the end of the list. Thus, in the next
fetch request, data from other partitions is returned (this avoid
starving of partitions). Note, that partitions that do not return data
(even if they are in the head of the list), stay in the head of the list.

(Note, that this topic list is actually also maintained broker side to
allow for incremental fetch request).

Because different partitions are hosted on different brokers, the
consumer will send fetch requests to different brokers (@Colin: how does
this work in detail? Does the consumer just do a round robin over all
brokers it needs to fetch from?)


Given the original proposal about topic priorities, it would be possible
to have multiple lists, one per priority. If topic partitions return
data, they would be moved to the end of their priority list. The list
would be consider in priority order. Thus, higher priority topic
partitions stay at the head and are consider first.


If I understand MessageChooser correctly (and consider a client side
implementation only), the MessageChooser can only pick from the data
that was returned in a fetch request -- but it cannot alter what the
fetch request returns.

It seems that for each fetched message, update() would be called and the
MessageChooser buffers the message. When a message should be processed
(ie, when Iterator.next() is called on the iterator returned from
poll()), choose() is called to return a message from the buffer (based
on whatever strategy the MessageChooser implements).

Thus, MessageChooser can actually not prioritize some topics over other,
because the prioritization depends on the fetch requests that the
MessageChooser cannot influence (MessageChooser can only prioritize
records from different partitions that are already fetched). Thus,
MessageChooser interface seems not to achieve what was proposed.

@Jan: please correct me, if I my understanding of MessageChooser is wrong.


If my understanding is correct, I am not sure how the MessageChooser
interface could be used to prioritize topics in fetch requests.


Overall, I get the impression that topic prioritization and
MessageChosser are orthogonal (or complementary) to each other.



-Matthias



On 9/6/18 5:24 AM, Jan Filipiak wrote:
> 
> On 05.09.2018 17:18, Colin McCabe wrote:
>> Hi all,
>>
>> I agree that DISCUSS is more appropriate than VOTE at this point,
>> since I don't remember the last discussion coming to a definite
>> conclusion.
>>
>> I guess my concern is that this will add complexity and memory
>> consumption on the server side.  In the case of incremental fetch
>> requests, we will have to track at least two extra bytes per
>> partition, to know what the priority of each partition is within each
>> active fetch session.
>>
>> It would be nice to hear more about the use-cases for this feature.  I
>> think Gwen asked about this earlier, and I don't remember reading a
>> response.  The fact that we're now talking about Samza interfaces is a
>> bit of a red flag.  After all, Samza didn't need partition priorities
>> to do what it did.  You can do a lot with muting partitions and using
>> appropriate threading in your code.
> to show a usecase, I linked 353, especially since the threading model is
> pretty fixed there.
> 
> No clue why Samza should be a red flag. They handle purely on the
> consumer side. Which I think is reasonable. I would not try to implement
> any broker side support for this, if I were todo it. Just don't support
> incremental fetch then.
> In the end if you have broker side support, you would need to ship the
> logic of the message chooser to the broker. I don't think that will
> allow for the flexibility I had in mind on purely consumer based
> implementation.
> 
> 
>>
>> For example, you can hand data from a partition off to a work queue
>> with a fixed size, which is handled by a separate service thread.  If
>> the queue gets full, you can mute the partition until some of the
>> buffered data is processed.  Kafka Streams uses a similar approach to
>> avoid reading partition data that isn't immediately needed.
>>
>> There might be some use-cases that need priorities eventually, but I'm
>> concerned that we're jumping the gun by trying to implement this
>> before we know what they are.
>>
>> best,
>> Colin
>>
>>
>> On Wed, Sep 5, 2018, at 01:06, Jan Filipiak wrote:
>>> On 05.09.2018 02:38, n...@afshartous.com wrote:
>>>>> On Sep 4, 2018, at 4:20 PM, Jan Filipiak <jan.filip...@trivago.com>
>>>>> wrote:
>>>>>
>>>>> what I meant is litterally this interface:
>>>>>
>>>>> https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html
>>>>> <https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html>
>>>>>
>>>> Hi Jan,
>>>>
>>>> Thanks for the reply and I have a few questions.  This Samza doc
>>>>
>>>>    
>>>> https://samza.apache.org/learn/documentation/0.14/container/streams.html
>>>> <https://samza.apache.org/learn/documentation/0.14/container/streams.html>
>>>>
>>>>
>>>> indicates that the chooser is set via configuration.  Are you
>>>> suggesting adding a new configuration for Kafka ?  Seems like we
>>>> could also have a method on KafkaConsumer
>>>>
>>>>       public void register(MessageChooser messageChooser)
>>> I don't have strong opinions regarding this. I like configs, i also
>>> don't think it would be a problem to have both.
>>>
>>>> to make it more dynamic.
>>>>
>>>> Also, the Samza MessageChooser interface has method
>>>>
>>>>     /* Notify the chooser that a new envelope is available for a
>>>> processing. */
>>>> void update(IncomingMessageEnvelope envelope)
>>>>
>>>> and I’m wondering how this method would be translated to Kafka API. 
>>>> In particular what corresponds to IncomingMessageEnvelope.
>>> I think Samza uses the envelop abstraction as they support other sources
>>> besides kafka aswell. They are more
>>> on the spark end of things when it comes to different input types. I
>>> don't have strong opinions but it feels like
>>> we wouldn't need such a thing in the kafka consumer but just use a
>>> regular ConsumerRecord or so.
>>>> Best,
>>>> -- 
>>>>         Nick
>>>>
>>>>
>>>>
>>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to