Bump. Let’s have this API in the open source?
Отправлено с iPhone > 2 дек. 2020 г., в 16:43, Nikolay Izhikov <nizhi...@apache.org> написал(а): > > Hello, Paul. > > Thanks for the feedback! > >> How does the producer get notified of a failure to pass the RecordPolicy for >> one or more records, > > The producer will receive `PolicyViolationException`. > >> how should it recover? > > Obvious answers are > > Producer should switch to the correct schema OR producer should be stopped > abnormally. > >> Assuming a RecordPolicy can be loaded by a broker without restarting it, >> what is the mechanism by which this happens? > > Thanks for the good question: > > Think we should choose from one of the following alternatives: > > 1. We allow the users to use any `RecordsPolicy` implementation. > In this case, Kafka administrator is responsible for putting a custom > jar with the `RecordsPolicy` implementation to every Kafka Broker > classpath(libs directory). > AFAIK this selected as a base scenario for an `Authorizer` > implementation. > > 2. We allow the users to select implementation from some predefined list > that Kafka developers included in some release. > In this case, every Kafka broker will have a specific implementation > from the Kafka release itself. > We can go with this because wrong `RecordsPolicy` implementation can > affect broker stability and performance. > > I, personally, prefer first choice. > >> Must writes to replicas also adhere to the RecordPolicy? > > I think we should check only on the leader. > >> Must already-written written records adhere to RecordPolicy, if it is added >> later? > > No. > >> managing schema outside of kafka itself using something like the confluent >> schema registry. >> Maybe you can say why RecordPolicy would be better? > > 1. Can't agree that a commercial product is an alternative to the proposed > open-source API. > Moreover, I propose to add an API that has a little overlap with such a big > product as a Schema Registry as a whole. > > 2. AFAIU Confluent Schema Registry should use a similar technique to ensure > records schema in the topic. > My understanding based on Schema Registry docs [1]. Specifically: > - Confluent Schema Registry has custom topic configuration options to > enable or disable schema checks. > - "With this configuration, if a message is produced to the topic > my-topic-sv that does not have a valid schema for the value of the message, > an error is returned to the producer, and the message is discarded." > > [1] > https://docs.confluent.io/platform/current/schema-registry/schema-validation.html > > >> 1 дек. 2020 г., в 06:15, Paul Whalen <pgwha...@gmail.com> написал(а): >> >> Nikolay, >> >> I'm not a committer, but perhaps I can start the discussion. I've had the >> urge for a similar feature after being bitten by writing a poorly formed >> record to a topic - it's natural to want to push schema validation into the >> broker, since that's the way regular databases work. But I'm a bit >> skeptical of the complexity it introduces. Some questions I think would >> have to be answered that aren't currently in the KIP: >> - How does the producer get notified of a failure to pass the RecordPolicy >> for one or more records, and how should it recover? >> - Assuming a RecordPolicy can be loaded by a broker without restarting it, >> what is the mechanism by which this happens? >> - Must writes to replicas also adhere to the RecordPolicy? >> - Must already-written written records adhere to RecordPolicy, if it is >> added later? >> >> Also, the rejected alternatives section is blank - I see the status quo as >> at least one alternative, in particular, managing schema outside of kafka >> itself using something like the confluent schema registry. Maybe you can >> say why RecordPolicy would be better? >> >> Best, >> Paul >> >>> On Mon, Nov 30, 2020 at 9:58 AM Nikolay Izhikov <nizhi...@apache.org> wrote: >>> >>> Friendly bump. >>> >>> Please, share your feedback. >>> Do we need those feature in the Kafka? >>> >>>> 23 нояб. 2020 г., в 12:09, Nikolay Izhikov <nizhikov....@gmail.com> >>> написал(а): >>>> >>>> Hello! >>>> >>>> Any additional feedback on this KIP? >>>> I believe this API can be useful for Kafka users. >>>> >>>> >>>>> 18 нояб. 2020 г., в 14:47, Nikolay Izhikov <nizhikov....@gmail.com> >>> написал(а): >>>>> >>>>> Hello, Ismael. >>>>> >>>>> Thanks for the feedback. >>>>> You are right, I read public interfaces definition not carefully :) >>>>> >>>>> Updated KIP according to your objection. >>>>> I propose to expose 2 new public interfaces: >>>>> >>>>> ``` >>>>> package org.apache.kafka.common; >>>>> >>>>> public interface Record { >>>>> long timestamp(); >>>>> >>>>> boolean hasKey(); >>>>> >>>>> ByteBuffer key(); >>>>> >>>>> boolean hasValue(); >>>>> >>>>> ByteBuffer value(); >>>>> >>>>> Header[] headers(); >>>>> } >>>>> >>>>> package org.apache.kafka.server.policy; >>>>> >>>>> public interface RecordsPolicy extends Configurable, AutoCloseable { >>>>> void validate(String topic, int partition, Iterable<? extends Record> >>> records) throws PolicyViolationException; >>>>> } >>>>> ``` >>>>> >>>>> Data exposed in Record and in validate method itself seems to enough >>> for implementation of any reasonable Policy. >>>>> >>>>>> 17 нояб. 2020 г., в 19:44, Ismael Juma <ism...@juma.me.uk> написал(а): >>>>>> >>>>>> Thanks for the KIP. The policy interface is a small part of this. You >>> also >>>>>> have to describe the new public API that will be exposed as part of >>> this. >>>>>> For example, there is no public `Records` class. >>>>>> >>>>>> Ismael >>>>>> >>>>>> On Tue, Nov 17, 2020 at 8:24 AM Nikolay Izhikov <nizhi...@apache.org> >>> wrote: >>>>>> >>>>>>> Hello. >>>>>>> >>>>>>> I want to start discussion of the KIP-686 [1]. >>>>>>> I propose to introduce the new public interface for it RecordsPolicy: >>>>>>> >>>>>>> ``` >>>>>>> public interface RecordsPolicy extends Configurable, AutoCloseable { >>>>>>> void validate(String topic, Records records) throws >>>>>>> PolicyViolationException; >>>>>>> } >>>>>>> ``` >>>>>>> >>>>>>> and a two new configuration options: >>>>>>> * `records.policy.class.name: String` - sets class name of the >>>>>>> implementation of RecordsPolicy for the specific topic. >>>>>>> * `records.policy.enabled: Boolean` - enable or disable records >>> policy >>>>>>> for the topic. >>>>>>> >>>>>>> If `records.policy.enabled=true` then an instance of the >>> `RecordsPolicy` >>>>>>> should check each Records batch before applying data to the log. >>>>>>> If `PolicyViolationException` thrown from the >>> `RecordsPolicy#validate` >>>>>>> method then no data added to the log and the client receives an error. >>>>>>> >>>>>>> Motivation: >>>>>>> >>>>>>> During the adoption of Kafka in large enterprises, it's important to >>>>>>> guarantee data in some topic conforms to the specific format. >>>>>>> When data are written and read by the different applications >>> developed by >>>>>>> the different teams it's hard to guarantee data format using only >>> custom >>>>>>> SerDe, because malicious applications can use different SerDe. >>>>>>> The data format can be enforced only on the broker side. >>>>>>> >>>>>>> Please, share your feedback. >>>>>>> >>>>>>> [1] >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker >>>>> >>>> >>> >>> >