GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/2069

    [FLINK-3872] [table, connector-kafka] Add KafkaJsonTableSource

    Adds `StreamTableSource` variants for Kafka with syntactic sugar for 
parsing JSON streams.
    
    ```java
    KafkaJsonTableSource source = new Kafka08JsonTableSource(
        topic,
        props,
        new String[] { "id" }, // field names
        new Class<?>[] { Long.class }); // field types
    
    tableEnvironment.registerTableSource("kafka-stream", source)
    ```
    
    You can then continue to work with the stream:
    
    ```java
    Table result = tableEnvironment.ingest("kafka-stream").filter("id > 1000");
    tableEnvironment.toDataStream(result, Row.class).print();
    ```
    
    **Limitations**
    - Assumes flat JSON field access (we can easily extend this to use JSON 
pointers, allowing us to parse nested fields like `/location/area` as field 
names).
    - This does not extract any timestamp or watermarks (not an issue right now 
as the Table API currently does not support operations where this is needed).
    - API is kind of cumbersome and non Scalaesque for the Scala Table API.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 3872-kafkajson_table

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2069.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2069
    
----
commit 12ec6a594d23bd36bed1e07eeaba2aa75a768f67
Author: Ufuk Celebi <u...@apache.org>
Date:   2016-06-02T20:38:23Z

    [FLINK-3872] [table, connector-kafka] Add JsonRowDeserializationSchema
    
    - Adds a deserialization schema from byte[] to Row to be used in conjunction
      with the Table API.

commit a8dc3aa7ab70a91b12af2adccbbed821bf25ecc9
Author: Ufuk Celebi <u...@apache.org>
Date:   2016-06-03T13:24:22Z

    [FLINK-3872] [table, connector-kafka] Add KafkaTableSource

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to