I am pretty sure I am doing something wrong here. Just that I do not understand why?
I wrote a small program that reads messages from Kafka and prints it out. public class Main { private static final int CHECKPOINT_INTERVAL = 100000; private static Properties getpropsFromEnv() { Properties props = new Properties(); props.setProperty("bootstrap.servers", System.getenv("KAFKA_ADDRESS")); props.setProperty("group.id", System.getenv("CONSUMER_GROUP_ID")); props.setProperty("topic", System.getenv("KAFKA_TOPIC")); return props; } public static void main(String[] args) throws Exception { Properties props = getpropsFromEnv(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(CHECKPOINT_INTERVAL); env.setParallelism(1); FlinkKafkaConsumer010<LogMessage> flinkConsumer = new FlinkKafkaConsumer010<LogMessage>( Arrays.asList(parameterTool.getRequired("topic").split(",")), new LogDeserializationSchema(), parameterTool.getProperties() ); DataStream<LogMessage> logMessageDataStream = env.addSource(flinkConsumer); logMessageDataStream.print(); env.execute("SomeJob"); } } public class LogDeserializationSchema implements DeserializationSchema<LogMessage> { @Override public LogMessage deserialize(byte[] message) { LogMessage logMessage = null; try { logMessage = LogMessage.parseFrom(message); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } finally { return logMessage; } } @Override public boolean isEndOfStream(LogMessage nextElement) { return false; } @Override public TypeInformation<LogMessage> getProducedType() { return TypeExtractor.getForClass(LogMessage.class); } } When I run this program, I do not see any messages being read by the consumer. Things to note : 1. I ran kafka-console-consumer using the same Kafka parameters and saw continuous output. 2. My Gradle file has the following depencies : dependencies { compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.7' compile 'org.aeonbits.owner:owner:1.0.9' compile group: 'com.mashape.unirest', name: 'unirest-java', version: '1.4.9' // For driver Suspension compile group: 'joda-time', name: 'joda-time', version: '2.9.4' compile 'com.google.protobuf:protobuf-java-util:3.1.0' /* * Flink Dependencies */ compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.0' compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.10', version: '1.3.0' compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10', version: '1.3.0' compile group: 'org.apache.flink', name: 'flink-clients_2.10', version: '1.3.0' } Can Someone please help ?