Dear Kafka commiters. Let’s have this API in Kafka!
> 2 дек. 2021 г., в 17:19, Christopher Shannon > <christopher.l.shan...@gmail.com> написал(а): > > Revisiting this as this has come up for my use case again. Specifically for > validation I need to be able to validate headers including compressed > messages. It looks like in LogValidator the messages are already > decompressed to validate records but the headers get skipped when loaded > into a partial record. So as part of this change I would think there should > be a way to read in the headers for validation even if records are > compressed. > > On Wed, Jul 7, 2021 at 3:30 AM Nikolay Izhikov <nizhi...@apache.org> wrote: > >> Hello, James. >> >>> One use case we would like is to require that producers are sending >> compressed messages. >> >> I think that forcing producers to send compressed messages is out of scope >> of this KIP. >> >> >>> 7 июля 2021 г., в 08:48, Soumyajit Sahu <soumyajit.s...@gmail.com> >> написал(а): >>> >>> Interesting point. You are correct that at least KIP-729 cannot validate >>> that. >>> >>> We could propose a different KIP for that which could enforce that in the >>> upper layer. Personally, I would be hesitant to discard the data in that >>> case, but just use metrics/logs to detect those and inform the producers >>> about it. >>> >>> >>> On Tue, Jul 6, 2021, 9:13 PM James Cheng <wushuja...@gmail.com> wrote: >>> >>>> One use case we would like is to require that producers are sending >>>> compressed messages. Would this KIP (or KIP-686) allow the broker to >> detect >>>> that? From looking at both KIPs, it doesn't look it would help with my >>>> particular use case. Both of the KIPs are at the Record-level. >>>> >>>> Thanks, >>>> -James >>>> >>>>> On Jun 30, 2021, at 10:05 AM, Soumyajit Sahu <soumyajit.s...@gmail.com >>> >>>> wrote: >>>>> >>>>> Hi Nikolay, >>>>> Great to hear that. I'm ok with either one too. >>>>> I had missed noticing the KIP-686. Thanks for bringing it up. >>>>> >>>>> I have tried to keep this one simple, but hope it can cover all our >>>>> enterprise needs. >>>>> >>>>> Should we put this one for vote? >>>>> >>>>> Regards, >>>>> Soumyajit >>>>> >>>>> >>>>> On Wed, Jun 30, 2021, 8:50 AM Nikolay Izhikov <nizhi...@apache.org> >>>> wrote: >>>>> >>>>>> Team, If we have support from committers for API to check records on >> the >>>>>> broker side let’s choose one KIP to go with and move forward to vote >> and >>>>>> implementation? >>>>>> I’m ready to drive implementation of this API. >>>>>> >>>>>> I’m ready to drive the implementation of this API. >>>>>> It seems very useful to me. >>>>>> >>>>>>> 30 июня 2021 г., в 18:04, Nikolay Izhikov <nizhikov....@gmail.com> >>>>>> написал(а): >>>>>>> >>>>>>> Hello. >>>>>>> >>>>>>> I had a very similar proposal [1]. >>>>>>> So, yes, I think we should have one implementation of API in the >>>> product. >>>>>>> >>>>>>> [1] >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker >>>>>>> >>>>>>>> 30 июня 2021 г., в 17:57, Christopher Shannon < >>>>>> christopher.l.shan...@gmail.com> написал(а): >>>>>>>> >>>>>>>> I would find this feature very useful as well as adding custom >>>>>> validation >>>>>>>> to incoming records would be nice to prevent bad data from making it >>>> to >>>>>> the >>>>>>>> topic. >>>>>>>> >>>>>>>> On Wed, Apr 7, 2021 at 7:03 PM Soumyajit Sahu < >>>> soumyajit.s...@gmail.com >>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Colin! Good call on the ApiRecordError. We could use >>>>>>>>> InvalidRecordException instead, and have the broker convert it >>>>>>>>> to ApiRecordError. >>>>>>>>> Modified signature below. >>>>>>>>> >>>>>>>>> interface BrokerRecordValidator { >>>>>>>>> /** >>>>>>>>> * Validate the record for a given topic-partition. >>>>>>>>> */ >>>>>>>>> Optional<InvalidRecordException> validateRecord(TopicPartition >>>>>>>>> topicPartition, ByteBuffer key, ByteBuffer value, Header[] >> headers); >>>>>>>>> } >>>>>>>>> >>>>>>>>> On Tue, Apr 6, 2021 at 5:09 PM Colin McCabe <cmcc...@apache.org> >>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Soumyajit, >>>>>>>>>> >>>>>>>>>> The difficult thing is deciding which fields to share and how to >>>> share >>>>>>>>>> them. Key and value are probably the minimum we need to make this >>>>>>>>> useful. >>>>>>>>>> If we do choose to go with byte buffer, it is not necessary to >> also >>>>>> pass >>>>>>>>>> the size, since ByteBuffer maintains that internally. >>>>>>>>>> >>>>>>>>>> ApiRecordError is also an internal class, so it can't be used in a >>>>>> public >>>>>>>>>> API. I think most likely if we were going to do this, we would >> just >>>>>>>>> catch >>>>>>>>>> an exception and use the exception text as the validation error. >>>>>>>>>> >>>>>>>>>> best, >>>>>>>>>> Colin >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Apr 6, 2021, at 15:57, Soumyajit Sahu wrote: >>>>>>>>>>> Hi Tom, >>>>>>>>>>> >>>>>>>>>>> Makes sense. Thanks for the explanation. I get what Colin had >> meant >>>>>>>>>> earlier. >>>>>>>>>>> >>>>>>>>>>> Would a different signature for the interface work? Example >> below, >>>>>> but >>>>>>>>>>> please feel free to suggest alternatives if there are any >>>>>> possibilities >>>>>>>>>> of >>>>>>>>>>> such. >>>>>>>>>>> >>>>>>>>>>> If needed, then deprecating this and introducing a new signature >>>>>> would >>>>>>>>> be >>>>>>>>>>> straight-forward as both (old and new) calls could be made >> serially >>>>>> in >>>>>>>>>> the >>>>>>>>>>> LogValidator allowing a coexistence for a transition period. >>>>>>>>>>> >>>>>>>>>>> interface BrokerRecordValidator { >>>>>>>>>>> /** >>>>>>>>>>> * Validate the record for a given topic-partition. >>>>>>>>>>> */ >>>>>>>>>>> Optional<ApiRecordError> validateRecord(TopicPartition >>>>>>>>>> topicPartition, >>>>>>>>>>> int keySize, ByteBuffer key, int valueSize, ByteBuffer value, >>>>>> Header[] >>>>>>>>>>> headers); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Apr 6, 2021 at 12:54 AM Tom Bentley <tbent...@redhat.com >>> >>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Soumyajit, >>>>>>>>>>>> >>>>>>>>>>>> Although that class does indeed have public access at the Java >>>>>> level, >>>>>>>>>> it >>>>>>>>>>>> does so only because it needs to be used by internal Kafka code >>>>>> which >>>>>>>>>> lives >>>>>>>>>>>> in other packages (there isn't any more restrictive access >>>> modifier >>>>>>>>>> which >>>>>>>>>>>> would work). What the project considers public Java API is >>>>>> determined >>>>>>>>>> by >>>>>>>>>>>> what's included in the published Javadocs: >>>>>>>>>>>> https://kafka.apache.org/27/javadoc/index.html, which doesn't >>>>>>>>> include >>>>>>>>>> the >>>>>>>>>>>> org.apache.kafka.common.record package. >>>>>>>>>>>> >>>>>>>>>>>> One of the problems with making these internal classes public is >>>> it >>>>>>>>>> ties >>>>>>>>>>>> the project into supporting them as APIs, which can make >> changing >>>>>>>>> them >>>>>>>>>> much >>>>>>>>>>>> harder and in the long run that can slow, or even prevent, >>>>>> innovation >>>>>>>>>> in >>>>>>>>>>>> the rest of Kafka. >>>>>>>>>>>> >>>>>>>>>>>> Kind regards, >>>>>>>>>>>> >>>>>>>>>>>> Tom >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Apr 4, 2021 at 7:31 PM Soumyajit Sahu < >>>>>>>>>> soumyajit.s...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Colin, >>>>>>>>>>>>> I see that both the interface "Record" and the implementation >>>>>>>>>>>>> "DefaultRecord" being used in LogValidator.java are public >>>>>>>>>>>>> interfaces/classes. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>> >>>> >> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/Records.java >>>>>>>>>>>>> and >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>> >>>> >> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java >>>>>>>>>>>>> >>>>>>>>>>>>> So, it should be ok to use them. Let me know what you think. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Soumyajit >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Apr 2, 2021 at 8:51 AM Colin McCabe < >> cmcc...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Soumyajit, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I believe we've had discussions about proposals similar to >> this >>>>>>>>>> before, >>>>>>>>>>>>>> although I'm having trouble finding one right now. The issue >>>>>>>>> here >>>>>>>>>> is >>>>>>>>>>>>> that >>>>>>>>>>>>>> Record is a private class -- it is not part of any public API, >>>>>>>>> and >>>>>>>>>> may >>>>>>>>>>>>>> change at any time. So we can't expose it in public APIs. >>>>>>>>>>>>>> >>>>>>>>>>>>>> best, >>>>>>>>>>>>>> Colin >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Apr 1, 2021, at 14:18, Soumyajit Sahu wrote: >>>>>>>>>>>>>>> Hello All, >>>>>>>>>>>>>>> I would like to start a discussion on the KIP-729. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-729%3A+Custom+validation+of+records+on+the+broker+prior+to+log+append >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>>> Soumyajit >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>>> >>>> >>>> >> >>