The code itself is fine. Turning the app’s log level to DEBUG will give you 
more information.

BTW, please make sure that the addresses of Kafka brokers are properly resolved.


> On Aug 22, 2019, at 15:45, Eyal Pe'er <eyal.p...@startapp.com> wrote:
> 
> Hi,
> 
> I'm trying to consume events using Apache Flink.
> 
> The code is very basic, trying to connect the topic split words by space and 
> print it to the console. Kafka version is 0.9.
> 
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>  
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> import org.apache.flink.util.Collector;
> import java.util.Properties;
>  
> public class KafkaStreaming {
>  
> public static void main(String[] args) throws Exception {
>     final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  
>     Properties props = new Properties();
>     props.setProperty("bootstrap.servers", "kafka servers:9092...");
>     props.setProperty("group.id <http://group.id/>", "flinkPOC");
>     FlinkKafkaConsumer09<String> consumer = new 
> FlinkKafkaConsumer09<>("topic", new SimpleStringSchema(), props);
>  
>     DataStream<String> dataStream = env.addSource(consumer);
>  
>     DataStream<String> wordDataStream = dataStream.flatMap(new Splitter());
>     wordDataStream.print();
>     env.execute("Word Split");
>  
> }
>  
> public static class Splitter implements FlatMapFunction<String, String> {
>  
>     public void flatMap(String sentence, Collector<String> out) throws 
> Exception {
>  
>         for (String word : sentence.split(" ")) {
>             out.collect(word);
>         }
>     }
>  
> }
> }
>  
> 
> The app does not print anything to the screen (although I produced events to 
> Kafka).
> 
> I tried to skip the Splitter FlatMap function, but still nothing happens. SSL 
> or any kind of authentication is not required from Kafka.
> 
> This is the error that I found in the logs:
> 
> 2019-08-20 14:36:17,654 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
> Custom Source -> FlatMap -> Sink: Print to Std. Out (1/1) 
> (02258a2cafab83afbc0f5650c088da2b) switched from RUNNING to FAILED.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
> fetching topic metadata
>  
> 
> The Kafka’s topic has only one partition, so the topic metadata supposed to 
> be very basic.
> 
> I ran Kafka and the Flink locally in order to eliminate network related 
> issues, but the issue persists. So my assumption is that I’m doing something 
> wrong…
> 
> Did you encounter such issue? Does someone have different code for consuming 
> Kafka events ?
>  
> Best regards
> Eyal Peer / Data Platform Developer
> <image001.png>

Reply via email to