On 07.09.2018 05:21, Matthias J. Sax wrote:
I am still not sure how Samza's MessageChooser actually works and how
this would align with KafkaConsumer fetch requests.


Maybe I can give some background (conceptually); @Colin, please correct
me if I say anything wrong:


When a fetch request is send, all assigned topic partitions of the
consumers are ordered in a list and the broker will return data starting
with the first partition in the list and returning as many messages as
possible (either until partition end-offset or fetch size is reached).
If end-offset is reached but not fetch size, the next partition in the
list is considered. This repeats until fetch size is reached. If a
partition in the list has no data available, it's skipped.

When data is return to the consumer, the consumer moves all partitions
for which data was returned to the end of the list. Thus, in the next
fetch request, data from other partitions is returned (this avoid
starving of partitions). Note, that partitions that do not return data
(even if they are in the head of the list), stay in the head of the list.

(Note, that this topic list is actually also maintained broker side to
allow for incremental fetch request).

Because different partitions are hosted on different brokers, the
consumer will send fetch requests to different brokers (@Colin: how does
this work in detail? Does the consumer just do a round robin over all
brokers it needs to fetch from?)


Given the original proposal about topic priorities, it would be possible
to have multiple lists, one per priority. If topic partitions return
data, they would be moved to the end of their priority list. The list
would be consider in priority order. Thus, higher priority topic
partitions stay at the head and are consider first.


If I understand MessageChooser correctly (and consider a client side
implementation only), the MessageChooser can only pick from the data
that was returned in a fetch request -- but it cannot alter what the
fetch request returns.

It seems that for each fetched message, update() would be called and the
MessageChooser buffers the message. When a message should be processed
(ie, when Iterator.next() is called on the iterator returned from
poll()), choose() is called to return a message from the buffer (based
on whatever strategy the MessageChooser implements).

Thus, MessageChooser can actually not prioritize some topics over other,
because the prioritization depends on the fetch requests that the
MessageChooser cannot influence (MessageChooser can only prioritize
records from different partitions that are already fetched). Thus,
MessageChooser interface seems not to achieve what was proposed.

@Jan: please correct me, if I my understanding of MessageChooser is wrong.
Hi Matthias,

It is. I explicitly said, that if I were going for it the Message Chooser it could get the change to pause and resume partitions. mentioned that in the mail on ~4.9 (pausing / resuming capabilities). Probably easy missed.

Main point is: I didn't say we shall do exactly Samaza interface, that would be nonsense just get some inspirations if there isn't a more powerful
abstraction since we are tackling the problem twice anyway.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics
https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization

I am definitely against simple priority list
I am definitely against broker support.
I am happy to sacrifice incremental fetch support (mainly for MirrorMakers and replicas with tons of partitions).
I would love to see a Message Chooser.

With this said. I wish yall best luck and fun finding a good solution. *mic drop*

Bye



If my understanding is correct, I am not sure how the MessageChooser
interface could be used to prioritize topics in fetch requests.


Overall, I get the impression that topic prioritization and
MessageChosser are orthogonal (or complementary) to each other.



-Matthias



On 9/6/18 5:24 AM, Jan Filipiak wrote:
On 05.09.2018 17:18, Colin McCabe wrote:
Hi all,

I agree that DISCUSS is more appropriate than VOTE at this point,
since I don't remember the last discussion coming to a definite
conclusion.

I guess my concern is that this will add complexity and memory
consumption on the server side.  In the case of incremental fetch
requests, we will have to track at least two extra bytes per
partition, to know what the priority of each partition is within each
active fetch session.

It would be nice to hear more about the use-cases for this feature.  I
think Gwen asked about this earlier, and I don't remember reading a
response.  The fact that we're now talking about Samza interfaces is a
bit of a red flag.  After all, Samza didn't need partition priorities
to do what it did.  You can do a lot with muting partitions and using
appropriate threading in your code.
to show a usecase, I linked 353, especially since the threading model is
pretty fixed there.

No clue why Samza should be a red flag. They handle purely on the
consumer side. Which I think is reasonable. I would not try to implement
any broker side support for this, if I were todo it. Just don't support
incremental fetch then.
In the end if you have broker side support, you would need to ship the
logic of the message chooser to the broker. I don't think that will
allow for the flexibility I had in mind on purely consumer based
implementation.


For example, you can hand data from a partition off to a work queue
with a fixed size, which is handled by a separate service thread.  If
the queue gets full, you can mute the partition until some of the
buffered data is processed.  Kafka Streams uses a similar approach to
avoid reading partition data that isn't immediately needed.

There might be some use-cases that need priorities eventually, but I'm
concerned that we're jumping the gun by trying to implement this
before we know what they are.

best,
Colin


On Wed, Sep 5, 2018, at 01:06, Jan Filipiak wrote:
On 05.09.2018 02:38, n...@afshartous.com wrote:
On Sep 4, 2018, at 4:20 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

what I meant is litterally this interface:

https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html
<https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html>

Hi Jan,

Thanks for the reply and I have a few questions.  This Samza doc

https://samza.apache.org/learn/documentation/0.14/container/streams.html
<https://samza.apache.org/learn/documentation/0.14/container/streams.html>


indicates that the chooser is set via configuration.  Are you
suggesting adding a new configuration for Kafka ?  Seems like we
could also have a method on KafkaConsumer

       public void register(MessageChooser messageChooser)
I don't have strong opinions regarding this. I like configs, i also
don't think it would be a problem to have both.

to make it more dynamic.

Also, the Samza MessageChooser interface has method

     /* Notify the chooser that a new envelope is available for a
processing. */
void update(IncomingMessageEnvelope envelope)

and I’m wondering how this method would be translated to Kafka API.
In particular what corresponds to IncomingMessageEnvelope.
I think Samza uses the envelop abstraction as they support other sources
besides kafka aswell. They are more
on the spark end of things when it comes to different input types. I
don't have strong opinions but it feels like
we wouldn't need such a thing in the kafka consumer but just use a
regular ConsumerRecord or so.
Best,
--
         Nick





Reply via email to