Bump.

Let’s have this API in the open source?

Отправлено с iPhone

> 2 дек. 2020 г., в 16:43, Nikolay Izhikov <nizhi...@apache.org> написал(а):
> 
> Hello, Paul.
> 
> Thanks for the feedback!
> 
>> How does the producer get notified of a failure to pass the RecordPolicy for 
>> one or more records, 
> 
> The producer will receive `PolicyViolationException`.
> 
>> how should it recover?
> 
> Obvious answers are
> 
> Producer should switch to the correct schema OR producer should be stopped 
> abnormally.
> 
>> Assuming a RecordPolicy can be loaded by a broker without restarting it, 
>> what is the mechanism by which this happens?
> 
> Thanks for the good question:
> 
> Think we should choose from one of the following alternatives:
> 
>    1. We allow the users to use any `RecordsPolicy` implementation.
>        In this case, Kafka administrator is responsible for putting a custom 
> jar with the `RecordsPolicy` implementation to every Kafka Broker 
> classpath(libs directory).
>        AFAIK this selected as a base scenario for an `Authorizer` 
> implementation.
> 
>    2. We allow the users to select implementation from some predefined list 
> that Kafka developers included in some release.
>        In this case, every Kafka broker will have a specific implementation 
> from the Kafka release itself.
>        We can go with this because wrong `RecordsPolicy` implementation can 
> affect broker stability and performance.
> 
> I, personally, prefer first choice.
> 
>> Must writes to replicas also adhere to the RecordPolicy?
> 
> I think we should check only on the leader.
> 
>> Must already-written written records adhere to RecordPolicy, if it is added 
>> later?
> 
> No.
> 
>> managing schema outside of kafka itself using something like the confluent 
>> schema registry.  
>> Maybe you can say why RecordPolicy would be better?
> 
> 1. Can't agree that a commercial product is an alternative to the proposed 
> open-source API.
> Moreover, I propose to add an API that has a little overlap with such a big 
> product as a Schema Registry as a whole.
> 
> 2. AFAIU Confluent Schema Registry should use a similar technique to ensure 
> records schema in the topic.
> My understanding based on Schema Registry docs [1]. Specifically:
>    - Confluent Schema Registry has custom topic configuration options to 
> enable or disable schema checks.
>    - "With this configuration, if a message is produced to the topic 
> my-topic-sv that does not have a valid schema for the value of the message, 
> an error is returned to the producer, and the message is discarded."
> 
> [1] 
> https://docs.confluent.io/platform/current/schema-registry/schema-validation.html
> 
> 
>> 1 дек. 2020 г., в 06:15, Paul Whalen <pgwha...@gmail.com> написал(а):
>> 
>> Nikolay,
>> 
>> I'm not a committer, but perhaps I can start the discussion.  I've had the
>> urge for a similar feature after being bitten by writing a poorly formed
>> record to a topic - it's natural to want to push schema validation into the
>> broker, since that's the way regular databases work.  But I'm a bit
>> skeptical of the complexity it introduces.  Some questions I think would
>> have to be answered that aren't currently in the KIP:
>> - How does the producer get notified of a failure to pass the RecordPolicy
>> for one or more records, and how should it recover?
>> - Assuming a RecordPolicy can be loaded by a broker without restarting it,
>> what is the mechanism by which this happens?
>> - Must writes to replicas also adhere to the RecordPolicy?
>> - Must already-written written records adhere to RecordPolicy, if it is
>> added later?
>> 
>> Also, the rejected alternatives section is blank - I see the status quo as
>> at least one alternative, in particular, managing schema outside of kafka
>> itself using something like the confluent schema registry.  Maybe you can
>> say why RecordPolicy would be better?
>> 
>> Best,
>> Paul
>> 
>>> On Mon, Nov 30, 2020 at 9:58 AM Nikolay Izhikov <nizhi...@apache.org> wrote:
>>> 
>>> Friendly bump.
>>> 
>>> Please, share your feedback.
>>> Do we need those feature in the Kafka?
>>> 
>>>> 23 нояб. 2020 г., в 12:09, Nikolay Izhikov <nizhikov....@gmail.com>
>>> написал(а):
>>>> 
>>>> Hello!
>>>> 
>>>> Any additional feedback on this KIP?
>>>> I believe this API can be useful for Kafka users.
>>>> 
>>>> 
>>>>> 18 нояб. 2020 г., в 14:47, Nikolay Izhikov <nizhikov....@gmail.com>
>>> написал(а):
>>>>> 
>>>>> Hello, Ismael.
>>>>> 
>>>>> Thanks for the feedback.
>>>>> You are right, I read public interfaces definition not carefully :)
>>>>> 
>>>>> Updated KIP according to your objection.
>>>>> I propose to expose 2 new public interfaces:
>>>>> 
>>>>> ```
>>>>> package org.apache.kafka.common;
>>>>> 
>>>>> public interface Record {
>>>>> long timestamp();
>>>>> 
>>>>> boolean hasKey();
>>>>> 
>>>>> ByteBuffer key();
>>>>> 
>>>>> boolean hasValue();
>>>>> 
>>>>> ByteBuffer value();
>>>>> 
>>>>> Header[] headers();
>>>>> }
>>>>> 
>>>>> package org.apache.kafka.server.policy;
>>>>> 
>>>>> public interface RecordsPolicy extends Configurable, AutoCloseable {
>>>>> void validate(String topic, int partition, Iterable<? extends Record>
>>> records) throws PolicyViolationException;
>>>>> }
>>>>> ```
>>>>> 
>>>>> Data exposed in Record and in validate method itself seems to enough
>>> for implementation of any reasonable Policy.
>>>>> 
>>>>>> 17 нояб. 2020 г., в 19:44, Ismael Juma <ism...@juma.me.uk> написал(а):
>>>>>> 
>>>>>> Thanks for the KIP. The policy interface is a small part of this. You
>>> also
>>>>>> have to describe the new public API that will be exposed as part of
>>> this.
>>>>>> For example, there is no public `Records` class.
>>>>>> 
>>>>>> Ismael
>>>>>> 
>>>>>> On Tue, Nov 17, 2020 at 8:24 AM Nikolay Izhikov <nizhi...@apache.org>
>>> wrote:
>>>>>> 
>>>>>>> Hello.
>>>>>>> 
>>>>>>> I want to start discussion of the KIP-686 [1].
>>>>>>> I propose to introduce the new public interface for it RecordsPolicy:
>>>>>>> 
>>>>>>> ```
>>>>>>> public interface RecordsPolicy extends Configurable, AutoCloseable {
>>>>>>> void validate(String topic, Records records) throws
>>>>>>> PolicyViolationException;
>>>>>>> }
>>>>>>> ```
>>>>>>> 
>>>>>>> and a two new configuration options:
>>>>>>> * `records.policy.class.name: String` - sets class name of the
>>>>>>> implementation of RecordsPolicy for the specific topic.
>>>>>>> * `records.policy.enabled: Boolean` - enable or disable records
>>> policy
>>>>>>> for the topic.
>>>>>>> 
>>>>>>> If `records.policy.enabled=true` then an instance of the
>>> `RecordsPolicy`
>>>>>>> should check each Records batch before applying data to the log.
>>>>>>> If `PolicyViolationException`  thrown from the
>>> `RecordsPolicy#validate`
>>>>>>> method then no data added to the log and the client receives an error.
>>>>>>> 
>>>>>>> Motivation:
>>>>>>> 
>>>>>>> During the adoption of Kafka in large enterprises, it's important to
>>>>>>> guarantee data in some topic conforms to the specific format.
>>>>>>> When data are written and read by the different applications
>>> developed by
>>>>>>> the different teams it's hard to guarantee data format using only
>>> custom
>>>>>>> SerDe, because malicious applications can use different SerDe.
>>>>>>> The data format can be enforced only on the broker side.
>>>>>>> 
>>>>>>> Please, share your feedback.
>>>>>>> 
>>>>>>> [1]
>>>>>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker
>>>>> 
>>>> 
>>> 
>>> 
> 

Reply via email to