[ https://issues.apache.org/jira/browse/FLINK-3872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15314238#comment-15314238 ]
ASF GitHub Bot commented on FLINK-3872: --------------------------------------- GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2069 [FLINK-3872] [table, connector-kafka] Add KafkaJsonTableSource Adds `StreamTableSource` variants for Kafka with syntactic sugar for parsing JSON streams. ```java KafkaJsonTableSource source = new Kafka08JsonTableSource( topic, props, new String[] { "id" }, // field names new Class<?>[] { Long.class }); // field types tableEnvironment.registerTableSource("kafka-stream", source) ``` You can then continue to work with the stream: ```java Table result = tableEnvironment.ingest("kafka-stream").filter("id > 1000"); tableEnvironment.toDataStream(result, Row.class).print(); ``` **Limitations** - Assumes flat JSON field access (we can easily extend this to use JSON pointers, allowing us to parse nested fields like `/location/area` as field names). - This does not extract any timestamp or watermarks (not an issue right now as the Table API currently does not support operations where this is needed). - API is kind of cumbersome and non Scalaesque for the Scala Table API. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 3872-kafkajson_table Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2069.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2069 ---- commit 12ec6a594d23bd36bed1e07eeaba2aa75a768f67 Author: Ufuk Celebi <u...@apache.org> Date: 2016-06-02T20:38:23Z [FLINK-3872] [table, connector-kafka] Add JsonRowDeserializationSchema - Adds a deserialization schema from byte[] to Row to be used in conjunction with the Table API. commit a8dc3aa7ab70a91b12af2adccbbed821bf25ecc9 Author: Ufuk Celebi <u...@apache.org> Date: 2016-06-03T13:24:22Z [FLINK-3872] [table, connector-kafka] Add KafkaTableSource ---- > Add Kafka TableSource with JSON serialization > --------------------------------------------- > > Key: FLINK-3872 > URL: https://issues.apache.org/jira/browse/FLINK-3872 > Project: Flink > Issue Type: New Feature > Components: Table API > Reporter: Fabian Hueske > Assignee: Ufuk Celebi > Fix For: 1.1.0 > > > Add a Kafka TableSource which reads JSON serialized data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)