server-side message filter had been talked for a very very long time, I
notice serveral years ago kafka community already  discuss this feature.
kafka community still no plan implement it for some reason:
- they dont want to break the fetch zero-copy advantage
- kafka think stream project is enough to do this work
- for now kafka keep data use recordBatch, filter exactly records may
difficult.

we have already implement this filter feature in our company, we met
some specific
problem you may be interested in:
- it's hard to filter compressed data. if we uncompress data, it would
become a huge performance problem with server.-> so we think split
record head & record data (may be a new record verson called version_v2)
may be a good idea
- we havn't create "head index" so that we filter message when receive the
fetch request. we should calculate the exact data size(but if save all
record in memory,
it may cause gc problem) or we couldn't set the response length attribute.
-> may be your design could ignore this, but i guess your "head index"
should be sparse index
- fetch request has no attribute indicate "which group send this request",
so we add a extra filed so that server could know what filter should be
chosen.

however, we hope kafka would supprt this feature, it may reduce network
flow cost. thx.

yang chen <shanchua...@gmail.com> 于2022年1月20日周四 21:31写道:

> Hi, Talat,
> We also need the filter feature, just like apache rocketmq filter feature (
> https://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/
> ).
>
> Talat Uyarer <tuya...@paloaltonetworks.com> 于2021年11月30日周二 02:49写道:
>
> > Hi All,
> >
> > I want to get your advice about one subject. I want to create a KIP for
> > message header base filtering on Fetch API.
> >
> > Our current use case We have 1k+ topics and per topic, have 10+ consumers
> > for different use cases. However all consumers are interested in
> different
> > sets of messages on the same topic. Currently  We read all messages from
> a
> > given topic and drop logs on the consumer side. To reduce our stream
> > processing cost I want to drop logs on the broker side. So far my
> > understanding
> >
> > *Broker send messages as is (No serilization cost) -> Network Transfer ->
> > > Consumer Deserialize Messages(User side deserilization cost) -> User
> > Space
> > > drop or use messages (User Sidefiltering cost)*
> >
> >
> > If I can drop messages based on their headers without serialization and
> > deserialization messages. It will help us save network bandwidth and as
> > well as consumer side cpu cost.
> >
> > My approach is building a header index. Consumer clients will define
> > their filter in the fetch call. If the filter is matching, the broker
> will
> > send the messages. I would like to hear your suggestions about my
> solution.
> >
> > Thanks
> >
>

Reply via email to