Hello.

I want to start discussion of the KIP-686 [1].
I propose to introduce the new public interface for it RecordsPolicy:

```
public interface RecordsPolicy extends Configurable, AutoCloseable {
   void validate(String topic, Records records) throws PolicyViolationException;
}
```

and a two new configuration options:
    * `records.policy.class.name: String` - sets class name of the 
implementation of RecordsPolicy for the specific topic.
    * `records.policy.enabled: Boolean` - enable or disable records policy for 
the topic.

If `records.policy.enabled=true` then an instance of the `RecordsPolicy` should 
check each Records batch before applying data to the log.
If `PolicyViolationException`  thrown from the `RecordsPolicy#validate` method 
then no data added to the log and the client receives an error.

Motivation: 

During the adoption of Kafka in large enterprises, it's important to guarantee 
data in some topic conforms to the specific format.
When data are written and read by the different applications developed by the 
different teams it's hard to guarantee data format using only custom SerDe, 
because malicious applications can use different SerDe. 
The data format can be enforced only on the broker side.

Please, share your feedback.

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker

Reply via email to