[ https://issues.apache.org/jira/browse/FLINK-13340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16903496#comment-16903496 ]
DuBin commented on FLINK-13340: ------------------------------- hi [~twalthr], can you please help review this PR? > Add more Kafka topic option of flink-connector-kafka > ---------------------------------------------------- > > Key: FLINK-13340 > URL: https://issues.apache.org/jira/browse/FLINK-13340 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / API > Affects Versions: 1.8.1 > Reporter: DuBin > Assignee: DuBin > Priority: Major > Labels: features, pull-request-available > Original Estimate: 48h > Time Spent: 10m > Remaining Estimate: 47h 50m > > Currently, only 'topic' option implemented in the Kafka Connector Descriptor, > we can only use it like : > {code:java} > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(env) > tableEnv > .connect( > new Kafka() > .version("0.11") > .topic("test-flink-1") > .startFromEarliest() > .property("zookeeper.connect", "localhost:2181") > .property("bootstrap.servers", "localhost:9092")) > .withFormat( > new Json() > .deriveSchema() > ) > .withSchema( > new Schema() > .field("name", Types.STRING) > .field("age", Types.STRING) > ){code} > but we cannot consume multiple topics or a topic regex pattern. > Here is my thoughts: > {code:java} > .topic("test-flink-1") > //.topics("test-flink-1,test-flink-2") or topics(List<String> > topics) > //.subscriptionPattern("test-flink-.*") or > subscriptionPattern(Pattern pattern) > {code} > I already implement the code on my local env with help of the > FlinkKafkaConsumer, and it works. -- This message was sent by Atlassian JIRA (v7.6.14#76016)