Nikolay, I'm not a committer, but perhaps I can start the discussion. I've had the urge for a similar feature after being bitten by writing a poorly formed record to a topic - it's natural to want to push schema validation into the broker, since that's the way regular databases work. But I'm a bit skeptical of the complexity it introduces. Some questions I think would have to be answered that aren't currently in the KIP: - How does the producer get notified of a failure to pass the RecordPolicy for one or more records, and how should it recover? - Assuming a RecordPolicy can be loaded by a broker without restarting it, what is the mechanism by which this happens? - Must writes to replicas also adhere to the RecordPolicy? - Must already-written written records adhere to RecordPolicy, if it is added later?
Also, the rejected alternatives section is blank - I see the status quo as at least one alternative, in particular, managing schema outside of kafka itself using something like the confluent schema registry. Maybe you can say why RecordPolicy would be better? Best, Paul On Mon, Nov 30, 2020 at 9:58 AM Nikolay Izhikov <nizhi...@apache.org> wrote: > Friendly bump. > > Please, share your feedback. > Do we need those feature in the Kafka? > > > 23 нояб. 2020 г., в 12:09, Nikolay Izhikov <nizhikov....@gmail.com> > написал(а): > > > > Hello! > > > > Any additional feedback on this KIP? > > I believe this API can be useful for Kafka users. > > > > > >> 18 нояб. 2020 г., в 14:47, Nikolay Izhikov <nizhikov....@gmail.com> > написал(а): > >> > >> Hello, Ismael. > >> > >> Thanks for the feedback. > >> You are right, I read public interfaces definition not carefully :) > >> > >> Updated KIP according to your objection. > >> I propose to expose 2 new public interfaces: > >> > >> ``` > >> package org.apache.kafka.common; > >> > >> public interface Record { > >> long timestamp(); > >> > >> boolean hasKey(); > >> > >> ByteBuffer key(); > >> > >> boolean hasValue(); > >> > >> ByteBuffer value(); > >> > >> Header[] headers(); > >> } > >> > >> package org.apache.kafka.server.policy; > >> > >> public interface RecordsPolicy extends Configurable, AutoCloseable { > >> void validate(String topic, int partition, Iterable<? extends Record> > records) throws PolicyViolationException; > >> } > >> ``` > >> > >> Data exposed in Record and in validate method itself seems to enough > for implementation of any reasonable Policy. > >> > >>> 17 нояб. 2020 г., в 19:44, Ismael Juma <ism...@juma.me.uk> написал(а): > >>> > >>> Thanks for the KIP. The policy interface is a small part of this. You > also > >>> have to describe the new public API that will be exposed as part of > this. > >>> For example, there is no public `Records` class. > >>> > >>> Ismael > >>> > >>> On Tue, Nov 17, 2020 at 8:24 AM Nikolay Izhikov <nizhi...@apache.org> > wrote: > >>> > >>>> Hello. > >>>> > >>>> I want to start discussion of the KIP-686 [1]. > >>>> I propose to introduce the new public interface for it RecordsPolicy: > >>>> > >>>> ``` > >>>> public interface RecordsPolicy extends Configurable, AutoCloseable { > >>>> void validate(String topic, Records records) throws > >>>> PolicyViolationException; > >>>> } > >>>> ``` > >>>> > >>>> and a two new configuration options: > >>>> * `records.policy.class.name: String` - sets class name of the > >>>> implementation of RecordsPolicy for the specific topic. > >>>> * `records.policy.enabled: Boolean` - enable or disable records > policy > >>>> for the topic. > >>>> > >>>> If `records.policy.enabled=true` then an instance of the > `RecordsPolicy` > >>>> should check each Records batch before applying data to the log. > >>>> If `PolicyViolationException` thrown from the > `RecordsPolicy#validate` > >>>> method then no data added to the log and the client receives an error. > >>>> > >>>> Motivation: > >>>> > >>>> During the adoption of Kafka in large enterprises, it's important to > >>>> guarantee data in some topic conforms to the specific format. > >>>> When data are written and read by the different applications > developed by > >>>> the different teams it's hard to guarantee data format using only > custom > >>>> SerDe, because malicious applications can use different SerDe. > >>>> The data format can be enforced only on the broker side. > >>>> > >>>> Please, share your feedback. > >>>> > >>>> [1] > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker > >> > > > >