The code itself is fine. Turning the app’s log level to DEBUG will give you more information.
BTW, please make sure that the addresses of Kafka brokers are properly resolved. > On Aug 22, 2019, at 15:45, Eyal Pe'er <eyal.p...@startapp.com> wrote: > > Hi, > > I'm trying to consume events using Apache Flink. > > The code is very basic, trying to connect the topic split words by space and > print it to the console. Kafka version is 0.9. > > import org.apache.flink.api.common.functions.FlatMapFunction; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; > import org.apache.flink.util.Collector; > import java.util.Properties; > > public class KafkaStreaming { > > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > Properties props = new Properties(); > props.setProperty("bootstrap.servers", "kafka servers:9092..."); > props.setProperty("group.id <http://group.id/>", "flinkPOC"); > FlinkKafkaConsumer09<String> consumer = new > FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props); > > DataStream<String> dataStream = env.addSource(consumer); > > DataStream<String> wordDataStream = dataStream.flatMap(new Splitter()); > wordDataStream.print(); > env.execute("Word Split"); > > } > > public static class Splitter implements FlatMapFunction<String, String> { > > public void flatMap(String sentence, Collector<String> out) throws > Exception { > > for (String word : sentence.split(" ")) { > out.collect(word); > } > } > > } > } > > > The app does not print anything to the screen (although I produced events to > Kafka). > > I tried to skip the Splitter FlatMap function, but still nothing happens. SSL > or any kind of authentication is not required from Kafka. > > This is the error that I found in the logs: > > 2019-08-20 14:36:17,654 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > Custom Source -> FlatMap -> Sink: Print to Std. Out (1/1) > (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED. > org.apache.kafka.common.errors.TimeoutException: Timeout expired while > fetching topic metadata > > > The Kafka’s topic has only one partition, so the topic metadata supposed to > be very basic. > > I ran Kafka and the Flink locally in order to eliminate network related > issues, but the issue persists. So my assumption is that I’m doing something > wrong… > > Did you encounter such issue? Does someone have different code for consuming > Kafka events ? > > Best regards > Eyal Peer / Data Platform Developer > <image001.png>