Hi Sridhar Are you using *ParameterTool *to set the properties? I couldn't see it in your code, but you use it in the below line:
FlinkKafkaConsumer010<LogMessage> flinkConsumer = new FlinkKafkaConsumer010<LogMessage>( Arrays.asList(parameterTool.getRequired("topic").split(",")), new LogDeserializationSchema(), parameterTool.getProperties()); Make sure that the correct properties are passed to FlinkKafkaConsumer. Best Ziyad Best Regards *Ziyad Muhammed Mohiyudheen * 407, Internationales Studienzentrum Berlin Theodor-Heuss-Platz 5 14052 Berlin *Ph: +49 176 6587 3343 <%2B49%20176%206587%203343>* *Mail to*: *mmzi...@gmail.com <mmzi...@gmail.com>* On Tue, Jul 11, 2017 at 9:12 AM, Sridhar Chellappa <flinken...@gmail.com> wrote: > I am pretty sure I am doing something wrong here. Just that I do not > understand why? > > I wrote a small program that reads messages from Kafka and prints it out. > > > > public class Main { > > private static final int CHECKPOINT_INTERVAL = 100000; > > > private static Properties getpropsFromEnv() { > Properties props = new Properties(); > props.setProperty("bootstrap.servers", > System.getenv("KAFKA_ADDRESS")); > props.setProperty("group.id", System.getenv("CONSUMER_GROUP_ID")); > props.setProperty("topic", System.getenv("KAFKA_TOPIC")); > return props; > } > > public static void main(String[] args) throws Exception { > > > Properties props = getpropsFromEnv(); > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(CHECKPOINT_INTERVAL); > env.setParallelism(1); > FlinkKafkaConsumer010<LogMessage> flinkConsumer = > new FlinkKafkaConsumer010<LogMessage>( > > Arrays.asList(parameterTool.getRequired("topic").split(",")), new > LogDeserializationSchema(), parameterTool.getProperties() > ); > > DataStream<LogMessage> logMessageDataStream = > env.addSource(flinkConsumer); > logMessageDataStream.print(); > > env.execute("SomeJob"); > > } > } > > public class LogDeserializationSchema implements > DeserializationSchema<LogMessage> { > > @Override > public LogMessage deserialize(byte[] message) { > LogMessage logMessage = null; > try { > logMessage = LogMessage.parseFrom(message); > } catch (InvalidProtocolBufferException e) { > e.printStackTrace(); > } finally { > return logMessage; > } > } > > @Override > public boolean isEndOfStream(LogMessage nextElement) { > return false; > } > > @Override > public TypeInformation<LogMessage> getProducedType() { > return TypeExtractor.getForClass(LogMessage.class); > } > } > > > When I run this program, I do not see any messages being read by the consumer. > > Things to note : > > 1. I ran kafka-console-consumer using the same Kafka parameters and saw > continuous output. > > 2. My Gradle file has the following depencies : > > dependencies { > compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.7' > > > compile 'org.aeonbits.owner:owner:1.0.9' > compile group: 'com.mashape.unirest', name: 'unirest-java', version: > '1.4.9' // For driver Suspension > compile group: 'joda-time', name: 'joda-time', version: '2.9.4' > compile 'com.google.protobuf:protobuf-java-util:3.1.0' > > /* > * Flink Dependencies > */ > compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.0' > compile group: 'org.apache.flink', name: > 'flink-connector-kafka-0.10_2.10', version: '1.3.0' > compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10', > version: '1.3.0' > compile group: 'org.apache.flink', name: 'flink-clients_2.10', version: > '1.3.0' > > > > > } > > Can Someone please help ? > >