Dear Kafka commiters.

Let’s have this API in Kafka!

> 2 дек. 2021 г., в 17:19, Christopher Shannon 
> <christopher.l.shan...@gmail.com> написал(а):
> 
> Revisiting this as this has come up for my use case again. Specifically for
> validation I need to be able to validate headers including compressed
> messages. It looks like in LogValidator the messages are already
> decompressed to validate records but the headers get skipped when loaded
> into a partial record. So as part of this change I would think there should
> be a way to read in the headers for validation even if records are
> compressed.
> 
> On Wed, Jul 7, 2021 at 3:30 AM Nikolay Izhikov <nizhi...@apache.org> wrote:
> 
>> Hello, James.
>> 
>>> One use case we would like is to require that producers are sending
>> compressed messages.
>> 
>> I think that forcing producers to send compressed messages is out of scope
>> of this KIP.
>> 
>> 
>>> 7 июля 2021 г., в 08:48, Soumyajit Sahu <soumyajit.s...@gmail.com>
>> написал(а):
>>> 
>>> Interesting point. You are correct that at least KIP-729 cannot validate
>>> that.
>>> 
>>> We could propose a different KIP for that which could enforce that in the
>>> upper layer. Personally, I would be hesitant to discard the data in that
>>> case, but just use metrics/logs to detect those and inform the producers
>>> about it.
>>> 
>>> 
>>> On Tue, Jul 6, 2021, 9:13 PM James Cheng <wushuja...@gmail.com> wrote:
>>> 
>>>> One use case we would like is to require that producers are sending
>>>> compressed messages. Would this KIP (or KIP-686) allow the broker to
>> detect
>>>> that? From looking at both KIPs, it doesn't look it would help with my
>>>> particular use case. Both of the KIPs are at the Record-level.
>>>> 
>>>> Thanks,
>>>> -James
>>>> 
>>>>> On Jun 30, 2021, at 10:05 AM, Soumyajit Sahu <soumyajit.s...@gmail.com
>>> 
>>>> wrote:
>>>>> 
>>>>> Hi Nikolay,
>>>>> Great to hear that. I'm ok with either one too.
>>>>> I had missed noticing the KIP-686. Thanks for bringing it up.
>>>>> 
>>>>> I have tried to keep this one simple, but hope it can cover all our
>>>>> enterprise needs.
>>>>> 
>>>>> Should we put this one for vote?
>>>>> 
>>>>> Regards,
>>>>> Soumyajit
>>>>> 
>>>>> 
>>>>> On Wed, Jun 30, 2021, 8:50 AM Nikolay Izhikov <nizhi...@apache.org>
>>>> wrote:
>>>>> 
>>>>>> Team, If we have support from committers for API to check records on
>> the
>>>>>> broker side let’s choose one KIP to go with and move forward to vote
>> and
>>>>>> implementation?
>>>>>> I’m ready to drive implementation of this API.
>>>>>> 
>>>>>> I’m ready to drive the implementation of this API.
>>>>>> It seems very useful to me.
>>>>>> 
>>>>>>> 30 июня 2021 г., в 18:04, Nikolay Izhikov <nizhikov....@gmail.com>
>>>>>> написал(а):
>>>>>>> 
>>>>>>> Hello.
>>>>>>> 
>>>>>>> I had a very similar proposal [1].
>>>>>>> So, yes, I think we should have one implementation of API in the
>>>> product.
>>>>>>> 
>>>>>>> [1]
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker
>>>>>>> 
>>>>>>>> 30 июня 2021 г., в 17:57, Christopher Shannon <
>>>>>> christopher.l.shan...@gmail.com> написал(а):
>>>>>>>> 
>>>>>>>> I would find this feature very useful as well as adding custom
>>>>>> validation
>>>>>>>> to incoming records would be nice to prevent bad data from making it
>>>> to
>>>>>> the
>>>>>>>> topic.
>>>>>>>> 
>>>>>>>> On Wed, Apr 7, 2021 at 7:03 PM Soumyajit Sahu <
>>>> soumyajit.s...@gmail.com
>>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Thanks Colin! Good call on the ApiRecordError. We could use
>>>>>>>>> InvalidRecordException instead, and have the broker convert it
>>>>>>>>> to ApiRecordError.
>>>>>>>>> Modified signature below.
>>>>>>>>> 
>>>>>>>>> interface BrokerRecordValidator {
>>>>>>>>> /**
>>>>>>>>> * Validate the record for a given topic-partition.
>>>>>>>>> */
>>>>>>>>> Optional<InvalidRecordException> validateRecord(TopicPartition
>>>>>>>>> topicPartition, ByteBuffer key, ByteBuffer value, Header[]
>> headers);
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> On Tue, Apr 6, 2021 at 5:09 PM Colin McCabe <cmcc...@apache.org>
>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Soumyajit,
>>>>>>>>>> 
>>>>>>>>>> The difficult thing is deciding which fields to share and how to
>>>> share
>>>>>>>>>> them.  Key and value are probably the minimum we need to make this
>>>>>>>>> useful.
>>>>>>>>>> If we do choose to go with byte buffer, it is not necessary to
>> also
>>>>>> pass
>>>>>>>>>> the size, since ByteBuffer maintains that internally.
>>>>>>>>>> 
>>>>>>>>>> ApiRecordError is also an internal class, so it can't be used in a
>>>>>> public
>>>>>>>>>> API.  I think most likely if we were going to do this, we would
>> just
>>>>>>>>> catch
>>>>>>>>>> an exception and use the exception text as the validation error.
>>>>>>>>>> 
>>>>>>>>>> best,
>>>>>>>>>> Colin
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Tue, Apr 6, 2021, at 15:57, Soumyajit Sahu wrote:
>>>>>>>>>>> Hi Tom,
>>>>>>>>>>> 
>>>>>>>>>>> Makes sense. Thanks for the explanation. I get what Colin had
>> meant
>>>>>>>>>> earlier.
>>>>>>>>>>> 
>>>>>>>>>>> Would a different signature for the interface work? Example
>> below,
>>>>>> but
>>>>>>>>>>> please feel free to suggest alternatives if there are any
>>>>>> possibilities
>>>>>>>>>> of
>>>>>>>>>>> such.
>>>>>>>>>>> 
>>>>>>>>>>> If needed, then deprecating this and introducing a new signature
>>>>>> would
>>>>>>>>> be
>>>>>>>>>>> straight-forward as both (old and new) calls could be made
>> serially
>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>> LogValidator allowing a coexistence for a transition period.
>>>>>>>>>>> 
>>>>>>>>>>> interface BrokerRecordValidator {
>>>>>>>>>>> /**
>>>>>>>>>>> * Validate the record for a given topic-partition.
>>>>>>>>>>> */
>>>>>>>>>>> Optional<ApiRecordError> validateRecord(TopicPartition
>>>>>>>>>> topicPartition,
>>>>>>>>>>> int keySize, ByteBuffer key, int valueSize, ByteBuffer value,
>>>>>> Header[]
>>>>>>>>>>> headers);
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, Apr 6, 2021 at 12:54 AM Tom Bentley <tbent...@redhat.com
>>> 
>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Soumyajit,
>>>>>>>>>>>> 
>>>>>>>>>>>> Although that class does indeed have public access at the Java
>>>>>> level,
>>>>>>>>>> it
>>>>>>>>>>>> does so only because it needs to be used by internal Kafka code
>>>>>> which
>>>>>>>>>> lives
>>>>>>>>>>>> in other packages (there isn't any more restrictive access
>>>> modifier
>>>>>>>>>> which
>>>>>>>>>>>> would work). What the project considers public Java API is
>>>>>> determined
>>>>>>>>>> by
>>>>>>>>>>>> what's included in the published Javadocs:
>>>>>>>>>>>> https://kafka.apache.org/27/javadoc/index.html, which doesn't
>>>>>>>>> include
>>>>>>>>>> the
>>>>>>>>>>>> org.apache.kafka.common.record package.
>>>>>>>>>>>> 
>>>>>>>>>>>> One of the problems with making these internal classes public is
>>>> it
>>>>>>>>>> ties
>>>>>>>>>>>> the project into supporting them as APIs, which can make
>> changing
>>>>>>>>> them
>>>>>>>>>> much
>>>>>>>>>>>> harder and in the long run that can slow, or even prevent,
>>>>>> innovation
>>>>>>>>>> in
>>>>>>>>>>>> the rest of Kafka.
>>>>>>>>>>>> 
>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>> 
>>>>>>>>>>>> Tom
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Sun, Apr 4, 2021 at 7:31 PM Soumyajit Sahu <
>>>>>>>>>> soumyajit.s...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Colin,
>>>>>>>>>>>>> I see that both the interface "Record" and the implementation
>>>>>>>>>>>>> "DefaultRecord" being used in LogValidator.java are public
>>>>>>>>>>>>> interfaces/classes.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/Records.java
>>>>>>>>>>>>> and
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
>>>>>>>>>>>>> 
>>>>>>>>>>>>> So, it should be ok to use them. Let me know what you think.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Soumyajit
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Fri, Apr 2, 2021 at 8:51 AM Colin McCabe <
>> cmcc...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Soumyajit,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I believe we've had discussions about proposals similar to
>> this
>>>>>>>>>> before,
>>>>>>>>>>>>>> although I'm having trouble finding one right now.  The issue
>>>>>>>>> here
>>>>>>>>>> is
>>>>>>>>>>>>> that
>>>>>>>>>>>>>> Record is a private class -- it is not part of any public API,
>>>>>>>>> and
>>>>>>>>>> may
>>>>>>>>>>>>>> change at any time.  So we can't expose it in public APIs.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> best,
>>>>>>>>>>>>>> Colin
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Thu, Apr 1, 2021, at 14:18, Soumyajit Sahu wrote:
>>>>>>>>>>>>>>> Hello All,
>>>>>>>>>>>>>>> I would like to start a discussion on the KIP-729.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-729%3A+Custom+validation+of+records+on+the+broker+prior+to+log+append
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>> Soumyajit
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to