GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2069
[FLINK-3872] [table, connector-kafka] Add KafkaJsonTableSource Adds `StreamTableSource` variants for Kafka with syntactic sugar for parsing JSON streams. ```java KafkaJsonTableSource source = new Kafka08JsonTableSource( topic, props, new String[] { "id" }, // field names new Class<?>[] { Long.class }); // field types tableEnvironment.registerTableSource("kafka-stream", source) ``` You can then continue to work with the stream: ```java Table result = tableEnvironment.ingest("kafka-stream").filter("id > 1000"); tableEnvironment.toDataStream(result, Row.class).print(); ``` **Limitations** - Assumes flat JSON field access (we can easily extend this to use JSON pointers, allowing us to parse nested fields like `/location/area` as field names). - This does not extract any timestamp or watermarks (not an issue right now as the Table API currently does not support operations where this is needed). - API is kind of cumbersome and non Scalaesque for the Scala Table API. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 3872-kafkajson_table Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2069.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2069 ---- commit 12ec6a594d23bd36bed1e07eeaba2aa75a768f67 Author: Ufuk Celebi <u...@apache.org> Date: 2016-06-02T20:38:23Z [FLINK-3872] [table, connector-kafka] Add JsonRowDeserializationSchema - Adds a deserialization schema from byte[] to Row to be used in conjunction with the Table API. commit a8dc3aa7ab70a91b12af2adccbbed821bf25ecc9 Author: Ufuk Celebi <u...@apache.org> Date: 2016-06-03T13:24:22Z [FLINK-3872] [table, connector-kafka] Add KafkaTableSource ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---