On Thu, Sep 6, 2018, at 20:21, Matthias J. Sax wrote: > I am still not sure how Samza's MessageChooser actually works and how > this would align with KafkaConsumer fetch requests. > > > Maybe I can give some background (conceptually); @Colin, please correct > me if I say anything wrong: > > > When a fetch request is send, all assigned topic partitions of the > consumers are ordered in a list and the broker will return data starting > with the first partition in the list and returning as many messages as > possible (either until partition end-offset or fetch size is reached). > If end-offset is reached but not fetch size, the next partition in the > list is considered. This repeats until fetch size is reached. If a > partition in the list has no data available, it's skipped. > > When data is return to the consumer, the consumer moves all partitions > for which data was returned to the end of the list. Thus, in the next > fetch request, data from other partitions is returned (this avoid > starving of partitions). Note, that partitions that do not return data > (even if they are in the head of the list), stay in the head of the list. > > (Note, that this topic list is actually also maintained broker side to > allow for incremental fetch request).
Right. > > Because different partitions are hosted on different brokers, the > consumer will send fetch requests to different brokers (@Colin: how does > this work in detail? Does the consumer just do a round robin over all > brokers it needs to fetch from?) The client will fetch from multiple brokers in parallel. > > > Given the original proposal about topic priorities, it would be possible > to have multiple lists, one per priority. If topic partitions return > data, they would be moved to the end of their priority list. The list > would be consider in priority order. Thus, higher priority topic > partitions stay at the head and are consider first. > > > If I understand MessageChooser correctly (and consider a client side > implementation only), the MessageChooser can only pick from the data > that was returned in a fetch request -- but it cannot alter what the > fetch request returns. > > It seems that for each fetched message, update() would be called and the > MessageChooser buffers the message. When a message should be processed > (ie, when Iterator.next() is called on the iterator returned from > poll()), choose() is called to return a message from the buffer (based > on whatever strategy the MessageChooser implements). > > Thus, MessageChooser can actually not prioritize some topics over other, > because the prioritization depends on the fetch requests that the > MessageChooser cannot influence (MessageChooser can only prioritize > records from different partitions that are already fetched). Thus, > MessageChooser interface seems not to achieve what was proposed. > > @Jan: please correct me, if I my understanding of MessageChooser is wrong. > > > If my understanding is correct, I am not sure how the MessageChooser > interface could be used to prioritize topics in fetch requests. > > > Overall, I get the impression that topic prioritization and > MessageChosser are orthogonal (or complementary) to each other. I agree. best, Colin > > > > -Matthias > > > > On 9/6/18 5:24 AM, Jan Filipiak wrote: > > > > On 05.09.2018 17:18, Colin McCabe wrote: > >> Hi all, > >> > >> I agree that DISCUSS is more appropriate than VOTE at this point, > >> since I don't remember the last discussion coming to a definite > >> conclusion. > >> > >> I guess my concern is that this will add complexity and memory > >> consumption on the server side. In the case of incremental fetch > >> requests, we will have to track at least two extra bytes per > >> partition, to know what the priority of each partition is within each > >> active fetch session. > >> > >> It would be nice to hear more about the use-cases for this feature. I > >> think Gwen asked about this earlier, and I don't remember reading a > >> response. The fact that we're now talking about Samza interfaces is a > >> bit of a red flag. After all, Samza didn't need partition priorities > >> to do what it did. You can do a lot with muting partitions and using > >> appropriate threading in your code. > > to show a usecase, I linked 353, especially since the threading model is > > pretty fixed there. > > > > No clue why Samza should be a red flag. They handle purely on the > > consumer side. Which I think is reasonable. I would not try to implement > > any broker side support for this, if I were todo it. Just don't support > > incremental fetch then. > > In the end if you have broker side support, you would need to ship the > > logic of the message chooser to the broker. I don't think that will > > allow for the flexibility I had in mind on purely consumer based > > implementation. > > > > > >> > >> For example, you can hand data from a partition off to a work queue > >> with a fixed size, which is handled by a separate service thread. If > >> the queue gets full, you can mute the partition until some of the > >> buffered data is processed. Kafka Streams uses a similar approach to > >> avoid reading partition data that isn't immediately needed. > >> > >> There might be some use-cases that need priorities eventually, but I'm > >> concerned that we're jumping the gun by trying to implement this > >> before we know what they are. > >> > >> best, > >> Colin > >> > >> > >> On Wed, Sep 5, 2018, at 01:06, Jan Filipiak wrote: > >>> On 05.09.2018 02:38, n...@afshartous.com wrote: > >>>>> On Sep 4, 2018, at 4:20 PM, Jan Filipiak <jan.filip...@trivago.com> > >>>>> wrote: > >>>>> > >>>>> what I meant is litterally this interface: > >>>>> > >>>>> https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html > >>>>> <https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html> > >>>>> > >>>> Hi Jan, > >>>> > >>>> Thanks for the reply and I have a few questions. This Samza doc > >>>> > >>>> > >>>> https://samza.apache.org/learn/documentation/0.14/container/streams.html > >>>> <https://samza.apache.org/learn/documentation/0.14/container/streams.html> > >>>> > >>>> > >>>> indicates that the chooser is set via configuration. Are you > >>>> suggesting adding a new configuration for Kafka ? Seems like we > >>>> could also have a method on KafkaConsumer > >>>> > >>>> public void register(MessageChooser messageChooser) > >>> I don't have strong opinions regarding this. I like configs, i also > >>> don't think it would be a problem to have both. > >>> > >>>> to make it more dynamic. > >>>> > >>>> Also, the Samza MessageChooser interface has method > >>>> > >>>> /* Notify the chooser that a new envelope is available for a > >>>> processing. */ > >>>> void update(IncomingMessageEnvelope envelope) > >>>> > >>>> and I’m wondering how this method would be translated to Kafka API. > >>>> In particular what corresponds to IncomingMessageEnvelope. > >>> I think Samza uses the envelop abstraction as they support other sources > >>> besides kafka aswell. They are more > >>> on the spark end of things when it comes to different input types. I > >>> don't have strong opinions but it feels like > >>> we wouldn't need such a thing in the kafka consumer but just use a > >>> regular ConsumerRecord or so. > >>>> Best, > >>>> -- > >>>> Nick > >>>> > >>>> > >>>> > >>>> > > > > Email had 1 attachment: > + signature.asc > 1k (application/pgp-signature)