Request for Permission to Write KIP
Hello! I am a new developer to the Apache Kafka project and I would like to write up some KIPs to review/discuss with the Apache Kafka dev community. My Wiki ID is: atang https://cwiki.apache.org/confluence/display/~atang Thanks! Cheers, Allen
[DISCUSS] KIP-296 Add connector level configurability for producer/consumer client configs
Hi Kafka devs, I'd like to get a discussion going on KIP-296, which would allow Kafka Connect users to be able to define producer/consumer client configurations on a connector-by-connector basis to override worker-level defaults. The KIP can be found here: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80452962 Here is an implementation that I propose to expose these set of capabilities to connectors: https://github.com/natengall/kafka/commit/e9a6ae29403ee26baa1c6d6771075d25821f7f36 Please let me know your thoughts! Thanks! Cheers, Allen
[DISCUSS] KIP-301 Schema Inferencing in JsonConverters
Hi, I just opened a KIP to add Schema Inferencing in JsonConverters for Kafka Connect. The details of the proposal can be found here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-301%3A+Schema+Inferencing+for+JsonConverter Also, I have created a - 1.) JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-6895 2.) Provisional PR with initial discussion: https://github.com/apache/kafka/pull/5001 Looking forward to the community's feedback! Cheers! -Allen
Re: [DISCUSS] KIP-301 Schema Inferencing in JsonConverters
I've went through several iterations of back-and-forth with @rhauch on the PR and on Confluent's Slack Community. The current thinking is that assuming an empty array is a String array is not necessarily the best option, nor is assuming that all null values in a JSON node is a String. We might be able to account for these potentially false assumptions/inferences by introducing new task properties (with value.converter prefix) that explicitly define overrides for either specific json field keys, or give the option for Kafka Connect users to provide a full immutabl schema they know are true for the topics impacted by the Sink Connector. What do you think? - Allen On Mon, May 14, 2018 at 2:58 PM, Allen Tang wrote: > Hi, > > I just opened a KIP to add Schema Inferencing in JsonConverters for Kafka > Connect. > > The details of the proposal can be found here: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-301%3A+Schema+Inferencing+for+JsonConverter > > Also, I have created a - > > 1.) JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-6895 > > 2.) Provisional PR with initial discussion: > https://github.com/apache/kafka/pull/5001 > > Looking forward to the community's feedback! Cheers! > > -Allen > >
[jira] [Created] (KAFKA-6890) Add connector level configurability for producer/consumer client configs
Allen Tang created KAFKA-6890: - Summary: Add connector level configurability for producer/consumer client configs Key: KAFKA-6890 URL: https://issues.apache.org/jira/browse/KAFKA-6890 Project: Kafka Issue Type: New Feature Components: KafkaConnect Reporter: Allen Tang Right now, each source connector and sink connector inherit their client configurations from the worker properties. Within the worker properties, any configuration that has a prefix of "producer." or "consumer." are applied to all source connectors and sink connectors respectively. We should also provide connector-level overrides whereby connector properties that are prefixed with "producer." and "consumer." are used to feed into the producer and consumer clients embedded within source and sink connectors respectively. The prefixes will be removed via a String#substring() call, and the remainder of the configuration key will be used as the client configuration key. The value is fed directly to the client as the configuration value. If there were client configurations defined at the worker level, they are overriden by the connector-level client configurations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6895) Schema Inferencing for JsonConverter
Allen Tang created KAFKA-6895: - Summary: Schema Inferencing for JsonConverter Key: KAFKA-6895 URL: https://issues.apache.org/jira/browse/KAFKA-6895 Project: Kafka Issue Type: New Feature Components: KafkaConnect Reporter: Allen Tang Though there does exist a converter in the connect-json library called "JsonConverter", there are limitations as to the domain of JSON payloads this converter is compatible with on the Sink Connector side when serializing them into Kafka Connect datatypes; When reading byte arrays from Kafka, the JsonConverter expects its inputs to be a JSON envelope that contains the fields "schema" and "payload", otherwise it'll throw a DataException reporting: ??JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.?? (when schemas.enable is true) or ??JSON value converted to Kafka Connect must be in envelope containing schema?? (when schemas.enable is false) For example, if your JSON payload looks something on the order of: ??{ "c1": 1, "c2": "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 }?? This will not be compatible for Sink Connectors that require the schema for data ingest when mapping from Kafka Connect datatypes to, for example, JDBC datatypes. Rather, that data is expected to be structured like so: ??{ "schema": \{ "type": "struct", "fields": [{ "type": "int32", "optional": true, "field": "c1" }, \{ "type": "string", "optional": true, "field": "c2" }, \{ "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" }, \{ "type": "int64", "optional": false, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" }], "optional": false, "name": "foobar" }, "payload": \{ "c1": 1, "c2": "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }?? The "schema" is a necessary component in order to dictate to the JsonConverter how to map the payload's JSON datatypes to Kafka Connect datatypes on the consumer side. Introduce a new configuration for the JsonConverter class called "schemas.infer.enable". When this flag is set to "false", the existing behavior is exhibited. When it's set to "true", infer the schema from the contents of the JSON record, and return that as part of the SchemaAndValue object for Sink Connectors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)