Request for Permission to Write KIP

2018-05-06 Thread Allen Tang
Hello!

I am a new developer to the Apache Kafka project and I would like to write
up some KIPs to review/discuss with the Apache Kafka dev community. My Wiki
ID is: atang

https://cwiki.apache.org/confluence/display/~atang

Thanks!

Cheers,
Allen


[DISCUSS] KIP-296 Add connector level configurability for producer/consumer client configs

2018-05-07 Thread Allen Tang
Hi Kafka devs,

I'd like to get a discussion going on KIP-296, which would allow Kafka
Connect users to be able to define producer/consumer client configurations
on a connector-by-connector basis to override worker-level defaults.

The KIP can be found here:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80452962

Here is an implementation that I propose to expose these set of
capabilities to  connectors:
https://github.com/natengall/kafka/commit/e9a6ae29403ee26baa1c6d6771075d25821f7f36

Please let me know your thoughts! Thanks!

Cheers,
Allen


[DISCUSS] KIP-301 Schema Inferencing in JsonConverters

2018-05-14 Thread Allen Tang
Hi,

I just opened a KIP to add Schema Inferencing in JsonConverters for
Kafka Connect.

The details of the proposal can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-301%3A+Schema+Inferencing+for+JsonConverter

Also, I have created a -

1.) JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-6895

2.) Provisional PR with initial discussion:
https://github.com/apache/kafka/pull/5001

Looking forward to the community's feedback! Cheers!

-Allen


Re: [DISCUSS] KIP-301 Schema Inferencing in JsonConverters

2018-05-15 Thread Allen Tang
I've went through several iterations of back-and-forth with @rhauch on the
PR and on Confluent's Slack Community. The current thinking is that assuming
an empty array is a String array is not necessarily the best option, nor is
assuming that all null values in a JSON node is a String.

We might be able to account for these potentially false
assumptions/inferences by introducing new task properties (with
value.converter prefix) that explicitly define overrides for either
specific json field keys, or give the option for Kafka Connect users to
provide a full immutabl schema they know are true for the topics impacted
by the Sink Connector.

What do you think?

- Allen


On Mon, May 14, 2018 at 2:58 PM, Allen Tang  wrote:

> Hi,
>
> I just opened a KIP to add Schema Inferencing in JsonConverters for Kafka 
> Connect.
>
> The details of the proposal can be found here: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-301%3A+Schema+Inferencing+for+JsonConverter
>
> Also, I have created a -
>
> 1.) JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-6895
>
> 2.) Provisional PR with initial discussion: 
> https://github.com/apache/kafka/pull/5001
>
> Looking forward to the community's feedback! Cheers!
>
> -Allen
>
>


[jira] [Created] (KAFKA-6890) Add connector level configurability for producer/consumer client configs

2018-05-09 Thread Allen Tang (JIRA)
Allen Tang created KAFKA-6890:
-

 Summary: Add connector level configurability for producer/consumer 
client configs
 Key: KAFKA-6890
 URL: https://issues.apache.org/jira/browse/KAFKA-6890
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Reporter: Allen Tang


Right now, each source connector and sink connector inherit their client 
configurations from the worker properties. Within the worker properties, any 
configuration that has a prefix of "producer." or "consumer." are applied to 
all source connectors and sink connectors respectively.

We should also provide connector-level overrides whereby connector properties 
that are prefixed with "producer." and "consumer." are used to feed into the 
producer and consumer clients embedded within source and sink connectors 
respectively. The prefixes will be removed via a String#substring() call, and 
the remainder of the configuration key will be used as the client configuration 
key. The value is fed directly to the client as the configuration value. If 
there were client configurations defined at the worker level, they are 
overriden by the connector-level client configurations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6895) Schema Inferencing for JsonConverter

2018-05-10 Thread Allen Tang (JIRA)
Allen Tang created KAFKA-6895:
-

 Summary: Schema Inferencing for JsonConverter
 Key: KAFKA-6895
 URL: https://issues.apache.org/jira/browse/KAFKA-6895
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Reporter: Allen Tang


Though there does exist a converter in the connect-json library called 
"JsonConverter", there are limitations as to the domain of JSON payloads this 
converter is compatible with on the Sink Connector side when serializing them 
into Kafka Connect datatypes; When reading byte arrays from Kafka, the 
JsonConverter expects its inputs to be a JSON envelope that contains the fields 
"schema" and "payload", otherwise it'll throw a DataException reporting:
??JsonConverter with schemas.enable requires "schema" and "payload" fields and 
may not contain additional fields. If you are trying to deserialize plain JSON 
data, set schemas.enable=false in your converter configuration.??
(when schemas.enable is true) or
??JSON value converted to Kafka Connect must be in envelope containing schema??
(when schemas.enable is false)
For example, if your JSON payload looks something on the order of:
??{ "c1": 1, "c2": "bar", "create_ts": 1501834166000, "update_ts": 
1501834166000 }??
This will not be compatible for Sink Connectors that require the schema for 
data ingest when mapping from Kafka Connect datatypes to, for example, JDBC 
datatypes. Rather, that data is expected to be structured like so:
??{ "schema": \{ "type": "struct", "fields": [{ "type": "int32", "optional": 
true, "field": "c1" }, \{ "type": "string", "optional": true, "field": "c2" }, 
\{ "type": "int64", "optional": false, "name": 
"org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" 
}, \{ "type": "int64", "optional": false, "name": 
"org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" 
}], "optional": false, "name": "foobar" }, "payload": \{ "c1": 1, "c2": 
"bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }??


The "schema" is a necessary component in order to dictate to the JsonConverter 
how to map the payload's JSON datatypes to Kafka Connect datatypes on the 
consumer side.

 

Introduce a new configuration for the JsonConverter class called 
"schemas.infer.enable". When this flag is set to "false", the existing behavior 
is exhibited. When it's set to "true", infer the schema from the contents of 
the JSON record, and return that as part of the SchemaAndValue object for Sink 
Connectors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)