DuBin created FLINK-13340: ----------------------------- Summary: Add more Kafka topic option of flink-connector-kafka Key: FLINK-13340 URL: https://issues.apache.org/jira/browse/FLINK-13340 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.8.1 Reporter: DuBin
Currently, only 'topic' option implemented in the Kafka Connector Descriptor, we can only use it like : {code:java} val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) tableEnv .connect( new Kafka() .version("0.11") .topic("test-flink-1") .startFromEarliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092")) .withFormat( new Json() .deriveSchema() ) .withSchema( new Schema() .field("name", Types.STRING) .field("age", Types.STRING) ){code} but we cannot consume multiple topics or a topic regex pattern. Here is my thoughts: {code:java} .topic("test-flink-1") //.topics("test-flink-1,test-flink-2") or topics(List<String> topics) //.subscriptionPattern("test-flink-.*") or subscriptionPattern(Pattern pattern) {code} I already implement the code on my local env with help of the FlinkKafkaConsumer, and it works. -- This message was sent by Atlassian JIRA (v7.6.14#76016)