Hi Flink developers, Can I ask how could we iterate several Kafka topics using the Kafka connector?
Our idea is like the following example: List<DataStream<JSONObject>> streams = new ArrayList<>(); // Iterate kafka topics Iterator<String> topicIter = topicList.iterator(); while (topicIter.hasNext()){ String topic = topicIter.next(); streams.add(env.addSource(new FlinkKafkaConsumer09<>(topic, new JSONSchema(), properties)).rebalance()); } Our goal is to union several kafka data streams into one, given the topics as a list: Iterator<DataStream<JSONObject>> streamsIt = streams.iterator(); DataStream<JSONObject> currentStream = streamsIt.next(); while(streamsIt.hasNext()){ DataStream<JSONObject> nextStream = streamsIt.next(); currentStream = currentStream.union(nextStream); } Cheers, Sendoh -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Iterate-several-kafka-topics-using-the-kafka-connector-tp7673.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.