DuBin created FLINK-13340:
-----------------------------

             Summary: Add more Kafka topic option of flink-connector-kafka
                 Key: FLINK-13340
                 URL: https://issues.apache.org/jira/browse/FLINK-13340
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
    Affects Versions: 1.8.1
            Reporter: DuBin


Currently, only 'topic' option implemented in the Kafka Connector Descriptor, 
we can only use it like :
{code:java}
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    tableEnv
      .connect(
        new Kafka()
          .version("0.11")
          .topic("test-flink-1")
          .startFromEarliest()
          .property("zookeeper.connect", "localhost:2181")
          .property("bootstrap.servers", "localhost:9092"))
      .withFormat(
        new Json()
          .deriveSchema()
      )
      .withSchema(
        new Schema()
          .field("name", Types.STRING)
          .field("age", Types.STRING)
      ){code}
but we cannot consume multiple topics or a topic regex pattern. 

Here is my thoughts:
{code:java}
          .topic("test-flink-1") 
          //.topics("test-flink-1,test-flink-2") or topics(List<String> topics)
          //.subscriptionPattern("test-flink-.*") or 
subscriptionPattern(Pattern pattern)
{code}
I already implement the code on my local env with help of the 
FlinkKafkaConsumer, and it works.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to