GitHub user rhauch opened a pull request: https://github.com/apache/kafka/pull/4319
[WIP] KAFKA-5142: Add Connect support for message headers (KIP-145) *NEW PROPOSAL FOR KIP-145... DO NOT MERGE* Changed the Connect API and runtime to support message headers as described in KIP-145. The new `Header` interface defines an immutable representation of a Kafka header (key-value pair) with support for the Connect value types and schemas. This interface provides methods for easily converting between many of the built-in primitive, structured, and logical data types. The new `Headers` interface defines an ordered collection of headers and is used to track all headers associated with a `ConnectRecord` (and thus `SourceRecord` and `SinkRecord`). This does allow multiple headers with the same key. The `Headers` contains methods for adding, removing, finding, and modifying headers. Convenience methods allow connectors and transforms to easily use and modify the headers for a record. A new `HeaderConverter` interface is also defined to enable the Connect runtime framework to be able to serialize and deserialize headers between the in-memory representation and Kafkaâs byte[] representation. A new `SimpleHeaderConverter` implementation has been added, and this serializes to strings and deserializes by inferring the schemas (`Struct` header values are serialized without the schemas, so they can only be deserialized as `Map` instances without a schema.) The `StringConverter`, `JsonConverter`, and `ByteArrayConverter` have all been extended to also be `HeaderConverter` implementations. Each connector can be configured with a different header converter, although by default the `SimpleHeaderConverter` is used to serialize header values as strings without schemas. Unit and integration tests are added for `ConnectHeader` and `ConnectHeaders`, the two implementation classes for headers. Additional test methods are added for the methods added to the `Converter` implementations. Finally, the `ConnectRecord` object is already used heavily, so only limited tests need to be added while quite a few of the existing tests already cover the changes. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation matches KIP-145 - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/rhauch/kafka kafka-5142-b Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4319.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4319 ---- commit 1c35692da19f3c8c92ce60946a69f576878b958a Author: Randall Hauch <rha...@gmail.com> Date: 2017-12-05T17:05:00Z KAFKA-5142: Add message headers to Connect API (KIP-145) Changed the Connect API to add message headers as described in KIP-145. The new `Header` interface defines an immutable representation of a Kafka header (name-value pair) with support for the Connect value types and schemas. Kafka headers have a string name and a binary value, which doesnât align well with Connectâs existing data and schema mechanisms. Thus, Connectâs `Header` interface provides methods for easily converting between many of the built-in primitive, structured, and logical data types. And, as discussed below, a new `HeaderConverter` interface is added to define how the Kafka header binary values are converted to Connect data objects. The new `Headers` interface defines an ordered collection of headers and is used to track all headers associated with a `ConnectRecord`. Like the Kafka headers API, the Connect `Headers` interface allows storing multiple headers with the same key in an ordered list. The Connect `Headers` interface is mutable and has a number of methods that make it easy for connectors and transformations to add, modify, and remove headers from the record, and the interface is designed to allow chaining multiple mutating methods. The existing constructors and methods in `ConnectRecord`, `SinkRecord`, and `SourceRecord` are unchanged to maintain backward compatibility, and in these situations the records will contain an empty `Headers` object that connectors and transforms can modify. There is also an additional constructor that allows an existing `Headers` to be passed in. A new overloaded form of `newRecord` method was created to allow connectors and transforms to create a new record with an entirely new `Headers` object. A new `HeaderConverter` interface is also defined to enable the Connect runtime framework to be able to serialize and deserialize headers between the in-memory representation and Kafkaâs byte[] representation. Unit and integration tests are added for `ConnectHeader` and `ConnectHeaders`, the two implementation classes for headers. The `ConnectRecord` object is already used heavily, so only limited tests need to be added while quite a few of the existing tests already cover the changes. However, new unit tests were added for `SinkRecord` and `SourceRecord to verify the header behavior, including when the `newRecord` methods are called. commit f398eba326d6c0cc8732770cb3bfc962f0453995 Author: Randall Hauch <rha...@gmail.com> Date: 2017-12-13T01:27:26Z KAFKA-5142: Add message header converters to Connect API (KIP-145) This is the second commit for the public Connect API changes for KIP-145, and deals primarily with `HeaderConverter` implementations. Connect has three `Converter` implementations, `StringConverter`, `JsonConverter` and `ByteArrayConverter`. These were modified to also implement `HeaderConverter`, without changing any of the existing functionality. Like many of our pluggable components in Connect, the `HeaderConverter` interface extends `Configurable` that allows implementations to expose a `ConfigDef` that describes the supported configuration properties, and a `config` method that can be used to initialize the component with provided configuration properties. The `StringConverter`, `JsonConverter` and `ByteArrayConverter` were changed to support these methods in a backward compatible manner. There are now `StringConverterConfig` and `JsonConverterConfig` classes that define the `ConfigDef` for the implementations; the `ByteArrayConverter` has no configuration properties and doesn't need a config class. Note that the existing `Converter` interface has a special `config` signature with a parameter that sas whether the converter is being used for keys or values. This is different than the `Configurable.config` signature, so this commit adds new `ConverterConfig` abstract class that defines a `converter.type` property that can be used to set whether the converter is being used for keys, values, or headers. The existing `Converter` methods internally set this property based upon the supplied boolean parameter, so the default for `converter.type` can be `header`. commit 14cf25a957ce1a7f0207f3fbdc9da5a30d5f3488 Author: Randall Hauch <rha...@gmail.com> Date: 2017-12-13T01:28:44Z KAFKA-5142: Add message headers to Connect runtime (KIP-145) This is the third commit for KIP-145 and changes the Connect runtime to support headers. Each Connect worker now configures a `HeaderConverter` for each connector task, in the same way it creates key and value `Converter` instances. This is entirely backward compatible, so that existing worker and connector configurations will work without changes. By default, the worker will use the `SimpleHeaderConverter` to serialize header values as strings and to deserialize them by inferring the schemas. ---- ---