It is possible to instantiate the FlinkKafkaConsumer with multiple topics [1]. Simply pass a list of topic names instead of a the name of a single topic.
streams.add(env.addSource(new FlinkKafkaConsumer09<>(Arrays.asList("foo", "bar", "foobar"), new JSONSchema(), properties)); [1] https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumer Cheers, Till On Thu, Jun 23, 2016 at 2:33 PM, Sendoh <unicorn.bana...@gmail.com> wrote: > Hi Flink developers, > > Can I ask how could we iterate several Kafka topics using the Kafka > connector? > > Our idea is like the following example: > > List<DataStream<JSONObject>> streams = new ArrayList<>(); > > // Iterate kafka topics > Iterator<String> topicIter = topicList.iterator(); > > while (topicIter.hasNext()){ > > String topic = topicIter.next(); > > streams.add(env.addSource(new FlinkKafkaConsumer09<>(topic, > new JSONSchema(), properties)).rebalance()); > > } > > Our goal is to union several kafka data streams into one, given the topics > as a list: > > Iterator<DataStream<JSONObject>> streamsIt = streams.iterator(); > > DataStream<JSONObject> currentStream = streamsIt.next(); > while(streamsIt.hasNext()){ > DataStream<JSONObject> nextStream = streamsIt.next(); > currentStream = currentStream.union(nextStream); > } > > Cheers, > > Sendoh > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Iterate-several-kafka-topics-using-the-kafka-connector-tp7673.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >