Interesting point. You are correct that at least KIP-729 cannot validate
that.

We could propose a different KIP for that which could enforce that in the
upper layer. Personally, I would be hesitant to discard the data in that
case, but just use metrics/logs to detect those and inform the producers
about it.


On Tue, Jul 6, 2021, 9:13 PM James Cheng <wushuja...@gmail.com> wrote:

> One use case we would like is to require that producers are sending
> compressed messages. Would this KIP (or KIP-686) allow the broker to detect
> that? From looking at both KIPs, it doesn't look it would help with my
> particular use case. Both of the KIPs are at the Record-level.
>
> Thanks,
> -James
>
> > On Jun 30, 2021, at 10:05 AM, Soumyajit Sahu <soumyajit.s...@gmail.com>
> wrote:
> >
> > Hi Nikolay,
> > Great to hear that. I'm ok with either one too.
> > I had missed noticing the KIP-686. Thanks for bringing it up.
> >
> > I have tried to keep this one simple, but hope it can cover all our
> > enterprise needs.
> >
> > Should we put this one for vote?
> >
> > Regards,
> > Soumyajit
> >
> >
> > On Wed, Jun 30, 2021, 8:50 AM Nikolay Izhikov <nizhi...@apache.org>
> wrote:
> >
> >> Team, If we have support from committers for API to check records on the
> >> broker side let’s choose one KIP to go with and move forward to vote and
> >> implementation?
> >> I’m ready to drive implementation of this API.
> >>
> >> I’m ready to drive the implementation of this API.
> >> It seems very useful to me.
> >>
> >>> 30 июня 2021 г., в 18:04, Nikolay Izhikov <nizhikov....@gmail.com>
> >> написал(а):
> >>>
> >>> Hello.
> >>>
> >>> I had a very similar proposal [1].
> >>> So, yes, I think we should have one implementation of API in the
> product.
> >>>
> >>> [1]
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker
> >>>
> >>>> 30 июня 2021 г., в 17:57, Christopher Shannon <
> >> christopher.l.shan...@gmail.com> написал(а):
> >>>>
> >>>> I would find this feature very useful as well as adding custom
> >> validation
> >>>> to incoming records would be nice to prevent bad data from making it
> to
> >> the
> >>>> topic.
> >>>>
> >>>> On Wed, Apr 7, 2021 at 7:03 PM Soumyajit Sahu <
> soumyajit.s...@gmail.com
> >>>
> >>>> wrote:
> >>>>
> >>>>> Thanks Colin! Good call on the ApiRecordError. We could use
> >>>>> InvalidRecordException instead, and have the broker convert it
> >>>>> to ApiRecordError.
> >>>>> Modified signature below.
> >>>>>
> >>>>> interface BrokerRecordValidator {
> >>>>> /**
> >>>>>  * Validate the record for a given topic-partition.
> >>>>>  */
> >>>>>  Optional<InvalidRecordException> validateRecord(TopicPartition
> >>>>> topicPartition, ByteBuffer key, ByteBuffer value, Header[] headers);
> >>>>> }
> >>>>>
> >>>>> On Tue, Apr 6, 2021 at 5:09 PM Colin McCabe <cmcc...@apache.org>
> >> wrote:
> >>>>>
> >>>>>> Hi Soumyajit,
> >>>>>>
> >>>>>> The difficult thing is deciding which fields to share and how to
> share
> >>>>>> them.  Key and value are probably the minimum we need to make this
> >>>>> useful.
> >>>>>> If we do choose to go with byte buffer, it is not necessary to also
> >> pass
> >>>>>> the size, since ByteBuffer maintains that internally.
> >>>>>>
> >>>>>> ApiRecordError is also an internal class, so it can't be used in a
> >> public
> >>>>>> API.  I think most likely if we were going to do this, we would just
> >>>>> catch
> >>>>>> an exception and use the exception text as the validation error.
> >>>>>>
> >>>>>> best,
> >>>>>> Colin
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Apr 6, 2021, at 15:57, Soumyajit Sahu wrote:
> >>>>>>> Hi Tom,
> >>>>>>>
> >>>>>>> Makes sense. Thanks for the explanation. I get what Colin had meant
> >>>>>> earlier.
> >>>>>>>
> >>>>>>> Would a different signature for the interface work? Example below,
> >> but
> >>>>>>> please feel free to suggest alternatives if there are any
> >> possibilities
> >>>>>> of
> >>>>>>> such.
> >>>>>>>
> >>>>>>> If needed, then deprecating this and introducing a new signature
> >> would
> >>>>> be
> >>>>>>> straight-forward as both (old and new) calls could be made serially
> >> in
> >>>>>> the
> >>>>>>> LogValidator allowing a coexistence for a transition period.
> >>>>>>>
> >>>>>>> interface BrokerRecordValidator {
> >>>>>>>  /**
> >>>>>>>   * Validate the record for a given topic-partition.
> >>>>>>>   */
> >>>>>>>  Optional<ApiRecordError> validateRecord(TopicPartition
> >>>>>> topicPartition,
> >>>>>>> int keySize, ByteBuffer key, int valueSize, ByteBuffer value,
> >> Header[]
> >>>>>>> headers);
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Apr 6, 2021 at 12:54 AM Tom Bentley <tbent...@redhat.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Soumyajit,
> >>>>>>>>
> >>>>>>>> Although that class does indeed have public access at the Java
> >> level,
> >>>>>> it
> >>>>>>>> does so only because it needs to be used by internal Kafka code
> >> which
> >>>>>> lives
> >>>>>>>> in other packages (there isn't any more restrictive access
> modifier
> >>>>>> which
> >>>>>>>> would work). What the project considers public Java API is
> >> determined
> >>>>>> by
> >>>>>>>> what's included in the published Javadocs:
> >>>>>>>> https://kafka.apache.org/27/javadoc/index.html, which doesn't
> >>>>> include
> >>>>>> the
> >>>>>>>> org.apache.kafka.common.record package.
> >>>>>>>>
> >>>>>>>> One of the problems with making these internal classes public is
> it
> >>>>>> ties
> >>>>>>>> the project into supporting them as APIs, which can make changing
> >>>>> them
> >>>>>> much
> >>>>>>>> harder and in the long run that can slow, or even prevent,
> >> innovation
> >>>>>> in
> >>>>>>>> the rest of Kafka.
> >>>>>>>>
> >>>>>>>> Kind regards,
> >>>>>>>>
> >>>>>>>> Tom
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sun, Apr 4, 2021 at 7:31 PM Soumyajit Sahu <
> >>>>>> soumyajit.s...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Colin,
> >>>>>>>>> I see that both the interface "Record" and the implementation
> >>>>>>>>> "DefaultRecord" being used in LogValidator.java are public
> >>>>>>>>> interfaces/classes.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/Records.java
> >>>>>>>>> and
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
> >>>>>>>>>
> >>>>>>>>> So, it should be ok to use them. Let me know what you think.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Soumyajit
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Fri, Apr 2, 2021 at 8:51 AM Colin McCabe <cmcc...@apache.org>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Soumyajit,
> >>>>>>>>>>
> >>>>>>>>>> I believe we've had discussions about proposals similar to this
> >>>>>> before,
> >>>>>>>>>> although I'm having trouble finding one right now.  The issue
> >>>>> here
> >>>>>> is
> >>>>>>>>> that
> >>>>>>>>>> Record is a private class -- it is not part of any public API,
> >>>>> and
> >>>>>> may
> >>>>>>>>>> change at any time.  So we can't expose it in public APIs.
> >>>>>>>>>>
> >>>>>>>>>> best,
> >>>>>>>>>> Colin
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Apr 1, 2021, at 14:18, Soumyajit Sahu wrote:
> >>>>>>>>>>> Hello All,
> >>>>>>>>>>> I would like to start a discussion on the KIP-729.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-729%3A+Custom+validation+of+records+on+the+broker+prior+to+log+append
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks!
> >>>>>>>>>>> Soumyajit
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> >>
>
>

Reply via email to