On Thu, Sep 6, 2018, at 20:21, Matthias J. Sax wrote:
> I am still not sure how Samza's MessageChooser actually works and how
> this would align with KafkaConsumer fetch requests.
> 
> 
> Maybe I can give some background (conceptually); @Colin, please correct
> me if I say anything wrong:
> 
> 
> When a fetch request is send, all assigned topic partitions of the
> consumers are ordered in a list and the broker will return data starting
> with the first partition in the list and returning as many messages as
> possible (either until partition end-offset or fetch size is reached).
> If end-offset is reached but not fetch size, the next partition in the
> list is considered. This repeats until fetch size is reached. If a
> partition in the list has no data available, it's skipped.
> 
> When data is return to the consumer, the consumer moves all partitions
> for which data was returned to the end of the list. Thus, in the next
> fetch request, data from other partitions is returned (this avoid
> starving of partitions). Note, that partitions that do not return data
> (even if they are in the head of the list), stay in the head of the list.
> 
> (Note, that this topic list is actually also maintained broker side to
> allow for incremental fetch request).

Right.

> 
> Because different partitions are hosted on different brokers, the
> consumer will send fetch requests to different brokers (@Colin: how does
> this work in detail? Does the consumer just do a round robin over all
> brokers it needs to fetch from?)

The client will fetch from multiple brokers in parallel.

> 
> 
> Given the original proposal about topic priorities, it would be possible
> to have multiple lists, one per priority. If topic partitions return
> data, they would be moved to the end of their priority list. The list
> would be consider in priority order. Thus, higher priority topic
> partitions stay at the head and are consider first.
> 
> 
> If I understand MessageChooser correctly (and consider a client side
> implementation only), the MessageChooser can only pick from the data
> that was returned in a fetch request -- but it cannot alter what the
> fetch request returns.
> 
> It seems that for each fetched message, update() would be called and the
> MessageChooser buffers the message. When a message should be processed
> (ie, when Iterator.next() is called on the iterator returned from
> poll()), choose() is called to return a message from the buffer (based
> on whatever strategy the MessageChooser implements).
> 
> Thus, MessageChooser can actually not prioritize some topics over other,
> because the prioritization depends on the fetch requests that the
> MessageChooser cannot influence (MessageChooser can only prioritize
> records from different partitions that are already fetched). Thus,
> MessageChooser interface seems not to achieve what was proposed.
> 
> @Jan: please correct me, if I my understanding of MessageChooser is wrong.
> 
> 
> If my understanding is correct, I am not sure how the MessageChooser
> interface could be used to prioritize topics in fetch requests.
> 
> 
> Overall, I get the impression that topic prioritization and
> MessageChosser are orthogonal (or complementary) to each other.

I agree.

best,
Colin


> 
> 
> 
> -Matthias
> 
> 
> 
> On 9/6/18 5:24 AM, Jan Filipiak wrote:
> > 
> > On 05.09.2018 17:18, Colin McCabe wrote:
> >> Hi all,
> >>
> >> I agree that DISCUSS is more appropriate than VOTE at this point,
> >> since I don't remember the last discussion coming to a definite
> >> conclusion.
> >>
> >> I guess my concern is that this will add complexity and memory
> >> consumption on the server side.  In the case of incremental fetch
> >> requests, we will have to track at least two extra bytes per
> >> partition, to know what the priority of each partition is within each
> >> active fetch session.
> >>
> >> It would be nice to hear more about the use-cases for this feature.  I
> >> think Gwen asked about this earlier, and I don't remember reading a
> >> response.  The fact that we're now talking about Samza interfaces is a
> >> bit of a red flag.  After all, Samza didn't need partition priorities
> >> to do what it did.  You can do a lot with muting partitions and using
> >> appropriate threading in your code.
> > to show a usecase, I linked 353, especially since the threading model is
> > pretty fixed there.
> > 
> > No clue why Samza should be a red flag. They handle purely on the
> > consumer side. Which I think is reasonable. I would not try to implement
> > any broker side support for this, if I were todo it. Just don't support
> > incremental fetch then.
> > In the end if you have broker side support, you would need to ship the
> > logic of the message chooser to the broker. I don't think that will
> > allow for the flexibility I had in mind on purely consumer based
> > implementation.
> > 
> > 
> >>
> >> For example, you can hand data from a partition off to a work queue
> >> with a fixed size, which is handled by a separate service thread.  If
> >> the queue gets full, you can mute the partition until some of the
> >> buffered data is processed.  Kafka Streams uses a similar approach to
> >> avoid reading partition data that isn't immediately needed.
> >>
> >> There might be some use-cases that need priorities eventually, but I'm
> >> concerned that we're jumping the gun by trying to implement this
> >> before we know what they are.
> >>
> >> best,
> >> Colin
> >>
> >>
> >> On Wed, Sep 5, 2018, at 01:06, Jan Filipiak wrote:
> >>> On 05.09.2018 02:38, n...@afshartous.com wrote:
> >>>>> On Sep 4, 2018, at 4:20 PM, Jan Filipiak <jan.filip...@trivago.com>
> >>>>> wrote:
> >>>>>
> >>>>> what I meant is litterally this interface:
> >>>>>
> >>>>> https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html
> >>>>> <https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html>
> >>>>>
> >>>> Hi Jan,
> >>>>
> >>>> Thanks for the reply and I have a few questions.  This Samza doc
> >>>>
> >>>>    
> >>>> https://samza.apache.org/learn/documentation/0.14/container/streams.html
> >>>> <https://samza.apache.org/learn/documentation/0.14/container/streams.html>
> >>>>
> >>>>
> >>>> indicates that the chooser is set via configuration.  Are you
> >>>> suggesting adding a new configuration for Kafka ?  Seems like we
> >>>> could also have a method on KafkaConsumer
> >>>>
> >>>>       public void register(MessageChooser messageChooser)
> >>> I don't have strong opinions regarding this. I like configs, i also
> >>> don't think it would be a problem to have both.
> >>>
> >>>> to make it more dynamic.
> >>>>
> >>>> Also, the Samza MessageChooser interface has method
> >>>>
> >>>>     /* Notify the chooser that a new envelope is available for a
> >>>> processing. */
> >>>> void update(IncomingMessageEnvelope envelope)
> >>>>
> >>>> and I’m wondering how this method would be translated to Kafka API. 
> >>>> In particular what corresponds to IncomingMessageEnvelope.
> >>> I think Samza uses the envelop abstraction as they support other sources
> >>> besides kafka aswell. They are more
> >>> on the spark end of things when it comes to different input types. I
> >>> don't have strong opinions but it feels like
> >>> we wouldn't need such a thing in the kafka consumer but just use a
> >>> regular ConsumerRecord or so.
> >>>> Best,
> >>>> -- 
> >>>>         Nick
> >>>>
> >>>>
> >>>>
> >>>>
> > 
> 
> Email had 1 attachment:
> + signature.asc
>   1k (application/pgp-signature)

Reply via email to