Hi Nikolay, Great to hear that. I'm ok with either one too. I had missed noticing the KIP-686. Thanks for bringing it up.
I have tried to keep this one simple, but hope it can cover all our enterprise needs. Should we put this one for vote? Regards, Soumyajit On Wed, Jun 30, 2021, 8:50 AM Nikolay Izhikov <nizhi...@apache.org> wrote: > Team, If we have support from committers for API to check records on the > broker side let’s choose one KIP to go with and move forward to vote and > implementation? > I’m ready to drive implementation of this API. > > I’m ready to drive the implementation of this API. > It seems very useful to me. > > > 30 июня 2021 г., в 18:04, Nikolay Izhikov <nizhikov....@gmail.com> > написал(а): > > > > Hello. > > > > I had a very similar proposal [1]. > > So, yes, I think we should have one implementation of API in the product. > > > > [1] > https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker > > > >> 30 июня 2021 г., в 17:57, Christopher Shannon < > christopher.l.shan...@gmail.com> написал(а): > >> > >> I would find this feature very useful as well as adding custom > validation > >> to incoming records would be nice to prevent bad data from making it to > the > >> topic. > >> > >> On Wed, Apr 7, 2021 at 7:03 PM Soumyajit Sahu <soumyajit.s...@gmail.com > > > >> wrote: > >> > >>> Thanks Colin! Good call on the ApiRecordError. We could use > >>> InvalidRecordException instead, and have the broker convert it > >>> to ApiRecordError. > >>> Modified signature below. > >>> > >>> interface BrokerRecordValidator { > >>> /** > >>> * Validate the record for a given topic-partition. > >>> */ > >>> Optional<InvalidRecordException> validateRecord(TopicPartition > >>> topicPartition, ByteBuffer key, ByteBuffer value, Header[] headers); > >>> } > >>> > >>> On Tue, Apr 6, 2021 at 5:09 PM Colin McCabe <cmcc...@apache.org> > wrote: > >>> > >>>> Hi Soumyajit, > >>>> > >>>> The difficult thing is deciding which fields to share and how to share > >>>> them. Key and value are probably the minimum we need to make this > >>> useful. > >>>> If we do choose to go with byte buffer, it is not necessary to also > pass > >>>> the size, since ByteBuffer maintains that internally. > >>>> > >>>> ApiRecordError is also an internal class, so it can't be used in a > public > >>>> API. I think most likely if we were going to do this, we would just > >>> catch > >>>> an exception and use the exception text as the validation error. > >>>> > >>>> best, > >>>> Colin > >>>> > >>>> > >>>> On Tue, Apr 6, 2021, at 15:57, Soumyajit Sahu wrote: > >>>>> Hi Tom, > >>>>> > >>>>> Makes sense. Thanks for the explanation. I get what Colin had meant > >>>> earlier. > >>>>> > >>>>> Would a different signature for the interface work? Example below, > but > >>>>> please feel free to suggest alternatives if there are any > possibilities > >>>> of > >>>>> such. > >>>>> > >>>>> If needed, then deprecating this and introducing a new signature > would > >>> be > >>>>> straight-forward as both (old and new) calls could be made serially > in > >>>> the > >>>>> LogValidator allowing a coexistence for a transition period. > >>>>> > >>>>> interface BrokerRecordValidator { > >>>>> /** > >>>>> * Validate the record for a given topic-partition. > >>>>> */ > >>>>> Optional<ApiRecordError> validateRecord(TopicPartition > >>>> topicPartition, > >>>>> int keySize, ByteBuffer key, int valueSize, ByteBuffer value, > Header[] > >>>>> headers); > >>>>> } > >>>>> > >>>>> > >>>>> On Tue, Apr 6, 2021 at 12:54 AM Tom Bentley <tbent...@redhat.com> > >>> wrote: > >>>>> > >>>>>> Hi Soumyajit, > >>>>>> > >>>>>> Although that class does indeed have public access at the Java > level, > >>>> it > >>>>>> does so only because it needs to be used by internal Kafka code > which > >>>> lives > >>>>>> in other packages (there isn't any more restrictive access modifier > >>>> which > >>>>>> would work). What the project considers public Java API is > determined > >>>> by > >>>>>> what's included in the published Javadocs: > >>>>>> https://kafka.apache.org/27/javadoc/index.html, which doesn't > >>> include > >>>> the > >>>>>> org.apache.kafka.common.record package. > >>>>>> > >>>>>> One of the problems with making these internal classes public is it > >>>> ties > >>>>>> the project into supporting them as APIs, which can make changing > >>> them > >>>> much > >>>>>> harder and in the long run that can slow, or even prevent, > innovation > >>>> in > >>>>>> the rest of Kafka. > >>>>>> > >>>>>> Kind regards, > >>>>>> > >>>>>> Tom > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Sun, Apr 4, 2021 at 7:31 PM Soumyajit Sahu < > >>>> soumyajit.s...@gmail.com> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Colin, > >>>>>>> I see that both the interface "Record" and the implementation > >>>>>>> "DefaultRecord" being used in LogValidator.java are public > >>>>>>> interfaces/classes. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>> > >>> > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/Records.java > >>>>>>> and > >>>>>>> > >>>>>>> > >>>>>> > >>>> > >>> > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java > >>>>>>> > >>>>>>> So, it should be ok to use them. Let me know what you think. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Soumyajit > >>>>>>> > >>>>>>> > >>>>>>> On Fri, Apr 2, 2021 at 8:51 AM Colin McCabe <cmcc...@apache.org> > >>>> wrote: > >>>>>>> > >>>>>>>> Hi Soumyajit, > >>>>>>>> > >>>>>>>> I believe we've had discussions about proposals similar to this > >>>> before, > >>>>>>>> although I'm having trouble finding one right now. The issue > >>> here > >>>> is > >>>>>>> that > >>>>>>>> Record is a private class -- it is not part of any public API, > >>> and > >>>> may > >>>>>>>> change at any time. So we can't expose it in public APIs. > >>>>>>>> > >>>>>>>> best, > >>>>>>>> Colin > >>>>>>>> > >>>>>>>> > >>>>>>>> On Thu, Apr 1, 2021, at 14:18, Soumyajit Sahu wrote: > >>>>>>>>> Hello All, > >>>>>>>>> I would like to start a discussion on the KIP-729. > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-729%3A+Custom+validation+of+records+on+the+broker+prior+to+log+append > >>>>>>>>> > >>>>>>>>> Thanks! > >>>>>>>>> Soumyajit > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > > > >