Interesting point. You are correct that at least KIP-729 cannot validate that.
We could propose a different KIP for that which could enforce that in the upper layer. Personally, I would be hesitant to discard the data in that case, but just use metrics/logs to detect those and inform the producers about it. On Tue, Jul 6, 2021, 9:13 PM James Cheng <wushuja...@gmail.com> wrote: > One use case we would like is to require that producers are sending > compressed messages. Would this KIP (or KIP-686) allow the broker to detect > that? From looking at both KIPs, it doesn't look it would help with my > particular use case. Both of the KIPs are at the Record-level. > > Thanks, > -James > > > On Jun 30, 2021, at 10:05 AM, Soumyajit Sahu <soumyajit.s...@gmail.com> > wrote: > > > > Hi Nikolay, > > Great to hear that. I'm ok with either one too. > > I had missed noticing the KIP-686. Thanks for bringing it up. > > > > I have tried to keep this one simple, but hope it can cover all our > > enterprise needs. > > > > Should we put this one for vote? > > > > Regards, > > Soumyajit > > > > > > On Wed, Jun 30, 2021, 8:50 AM Nikolay Izhikov <nizhi...@apache.org> > wrote: > > > >> Team, If we have support from committers for API to check records on the > >> broker side let’s choose one KIP to go with and move forward to vote and > >> implementation? > >> I’m ready to drive implementation of this API. > >> > >> I’m ready to drive the implementation of this API. > >> It seems very useful to me. > >> > >>> 30 июня 2021 г., в 18:04, Nikolay Izhikov <nizhikov....@gmail.com> > >> написал(а): > >>> > >>> Hello. > >>> > >>> I had a very similar proposal [1]. > >>> So, yes, I think we should have one implementation of API in the > product. > >>> > >>> [1] > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker > >>> > >>>> 30 июня 2021 г., в 17:57, Christopher Shannon < > >> christopher.l.shan...@gmail.com> написал(а): > >>>> > >>>> I would find this feature very useful as well as adding custom > >> validation > >>>> to incoming records would be nice to prevent bad data from making it > to > >> the > >>>> topic. > >>>> > >>>> On Wed, Apr 7, 2021 at 7:03 PM Soumyajit Sahu < > soumyajit.s...@gmail.com > >>> > >>>> wrote: > >>>> > >>>>> Thanks Colin! Good call on the ApiRecordError. We could use > >>>>> InvalidRecordException instead, and have the broker convert it > >>>>> to ApiRecordError. > >>>>> Modified signature below. > >>>>> > >>>>> interface BrokerRecordValidator { > >>>>> /** > >>>>> * Validate the record for a given topic-partition. > >>>>> */ > >>>>> Optional<InvalidRecordException> validateRecord(TopicPartition > >>>>> topicPartition, ByteBuffer key, ByteBuffer value, Header[] headers); > >>>>> } > >>>>> > >>>>> On Tue, Apr 6, 2021 at 5:09 PM Colin McCabe <cmcc...@apache.org> > >> wrote: > >>>>> > >>>>>> Hi Soumyajit, > >>>>>> > >>>>>> The difficult thing is deciding which fields to share and how to > share > >>>>>> them. Key and value are probably the minimum we need to make this > >>>>> useful. > >>>>>> If we do choose to go with byte buffer, it is not necessary to also > >> pass > >>>>>> the size, since ByteBuffer maintains that internally. > >>>>>> > >>>>>> ApiRecordError is also an internal class, so it can't be used in a > >> public > >>>>>> API. I think most likely if we were going to do this, we would just > >>>>> catch > >>>>>> an exception and use the exception text as the validation error. > >>>>>> > >>>>>> best, > >>>>>> Colin > >>>>>> > >>>>>> > >>>>>> On Tue, Apr 6, 2021, at 15:57, Soumyajit Sahu wrote: > >>>>>>> Hi Tom, > >>>>>>> > >>>>>>> Makes sense. Thanks for the explanation. I get what Colin had meant > >>>>>> earlier. > >>>>>>> > >>>>>>> Would a different signature for the interface work? Example below, > >> but > >>>>>>> please feel free to suggest alternatives if there are any > >> possibilities > >>>>>> of > >>>>>>> such. > >>>>>>> > >>>>>>> If needed, then deprecating this and introducing a new signature > >> would > >>>>> be > >>>>>>> straight-forward as both (old and new) calls could be made serially > >> in > >>>>>> the > >>>>>>> LogValidator allowing a coexistence for a transition period. > >>>>>>> > >>>>>>> interface BrokerRecordValidator { > >>>>>>> /** > >>>>>>> * Validate the record for a given topic-partition. > >>>>>>> */ > >>>>>>> Optional<ApiRecordError> validateRecord(TopicPartition > >>>>>> topicPartition, > >>>>>>> int keySize, ByteBuffer key, int valueSize, ByteBuffer value, > >> Header[] > >>>>>>> headers); > >>>>>>> } > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Apr 6, 2021 at 12:54 AM Tom Bentley <tbent...@redhat.com> > >>>>> wrote: > >>>>>>> > >>>>>>>> Hi Soumyajit, > >>>>>>>> > >>>>>>>> Although that class does indeed have public access at the Java > >> level, > >>>>>> it > >>>>>>>> does so only because it needs to be used by internal Kafka code > >> which > >>>>>> lives > >>>>>>>> in other packages (there isn't any more restrictive access > modifier > >>>>>> which > >>>>>>>> would work). What the project considers public Java API is > >> determined > >>>>>> by > >>>>>>>> what's included in the published Javadocs: > >>>>>>>> https://kafka.apache.org/27/javadoc/index.html, which doesn't > >>>>> include > >>>>>> the > >>>>>>>> org.apache.kafka.common.record package. > >>>>>>>> > >>>>>>>> One of the problems with making these internal classes public is > it > >>>>>> ties > >>>>>>>> the project into supporting them as APIs, which can make changing > >>>>> them > >>>>>> much > >>>>>>>> harder and in the long run that can slow, or even prevent, > >> innovation > >>>>>> in > >>>>>>>> the rest of Kafka. > >>>>>>>> > >>>>>>>> Kind regards, > >>>>>>>> > >>>>>>>> Tom > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Sun, Apr 4, 2021 at 7:31 PM Soumyajit Sahu < > >>>>>> soumyajit.s...@gmail.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Colin, > >>>>>>>>> I see that both the interface "Record" and the implementation > >>>>>>>>> "DefaultRecord" being used in LogValidator.java are public > >>>>>>>>> interfaces/classes. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >> > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/Records.java > >>>>>>>>> and > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >> > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java > >>>>>>>>> > >>>>>>>>> So, it should be ok to use them. Let me know what you think. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Soumyajit > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Fri, Apr 2, 2021 at 8:51 AM Colin McCabe <cmcc...@apache.org> > >>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Soumyajit, > >>>>>>>>>> > >>>>>>>>>> I believe we've had discussions about proposals similar to this > >>>>>> before, > >>>>>>>>>> although I'm having trouble finding one right now. The issue > >>>>> here > >>>>>> is > >>>>>>>>> that > >>>>>>>>>> Record is a private class -- it is not part of any public API, > >>>>> and > >>>>>> may > >>>>>>>>>> change at any time. So we can't expose it in public APIs. > >>>>>>>>>> > >>>>>>>>>> best, > >>>>>>>>>> Colin > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Thu, Apr 1, 2021, at 14:18, Soumyajit Sahu wrote: > >>>>>>>>>>> Hello All, > >>>>>>>>>>> I would like to start a discussion on the KIP-729. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-729%3A+Custom+validation+of+records+on+the+broker+prior+to+log+append > >>>>>>>>>>> > >>>>>>>>>>> Thanks! > >>>>>>>>>>> Soumyajit > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > >> > >> > >