Hi Sridhar

Are you using *ParameterTool *to set the properties? I couldn't see it in
your code, but you use it in the below line:

FlinkKafkaConsumer010<LogMessage> flinkConsumer =
                new FlinkKafkaConsumer010<LogMessage>(

Arrays.asList(parameterTool.getRequired("topic").split(",")), new
LogDeserializationSchema(), parameterTool.getProperties());


Make sure that the correct properties are passed to FlinkKafkaConsumer.


Best

Ziyad



Best Regards
*Ziyad Muhammed Mohiyudheen *
407, Internationales Studienzentrum Berlin
Theodor-Heuss-Platz 5
14052 Berlin
*Ph: +49 176 6587 3343 <%2B49%20176%206587%203343>*
*Mail to*: *mmzi...@gmail.com <mmzi...@gmail.com>*

On Tue, Jul 11, 2017 at 9:12 AM, Sridhar Chellappa <flinken...@gmail.com>
wrote:

> I am pretty sure I am doing something wrong here. Just that I do not
> understand why?
>
> I wrote a small program that reads messages from Kafka and prints it out.
>
>
>
> public class Main {
>
>     private static final int CHECKPOINT_INTERVAL = 100000;
>
>
>     private static Properties getpropsFromEnv() {
>         Properties props = new Properties();
>         props.setProperty("bootstrap.servers", 
> System.getenv("KAFKA_ADDRESS"));
>         props.setProperty("group.id", System.getenv("CONSUMER_GROUP_ID"));
>         props.setProperty("topic", System.getenv("KAFKA_TOPIC"));
>         return props;
>     }
>
>     public static void main(String[] args) throws Exception {
>
>
>         Properties props = getpropsFromEnv();
>
>
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.enableCheckpointing(CHECKPOINT_INTERVAL);
>         env.setParallelism(1);
>         FlinkKafkaConsumer010<LogMessage> flinkConsumer =
>                 new FlinkKafkaConsumer010<LogMessage>(
>                         
> Arrays.asList(parameterTool.getRequired("topic").split(",")), new 
> LogDeserializationSchema(), parameterTool.getProperties()
>                     );
>
>         DataStream<LogMessage> logMessageDataStream = 
> env.addSource(flinkConsumer);
>         logMessageDataStream.print();
>
>         env.execute("SomeJob");
>
>     }
> }
>
> public class LogDeserializationSchema implements 
> DeserializationSchema<LogMessage> {
>
>     @Override
>     public LogMessage deserialize(byte[] message) {
>         LogMessage logMessage = null;
>         try {
>             logMessage =  LogMessage.parseFrom(message);
>         } catch (InvalidProtocolBufferException e) {
>             e.printStackTrace();
>         } finally {
>             return logMessage;
>         }
>     }
>
>     @Override
>     public boolean isEndOfStream(LogMessage nextElement) {
>         return false;
>     }
>
>     @Override
>     public TypeInformation<LogMessage> getProducedType() {
>         return TypeExtractor.getForClass(LogMessage.class);
>     }
> }
>
>
> When I run this program, I do not see any messages being read by the consumer.
>
> Things to note :
>
> 1. I ran kafka-console-consumer using the same Kafka parameters and saw 
> continuous output.
>
> 2. My Gradle file has the following depencies :
>
> dependencies {
>     compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.7'
>
>
>     compile 'org.aeonbits.owner:owner:1.0.9'
>     compile group: 'com.mashape.unirest', name: 'unirest-java', version: 
> '1.4.9' // For driver Suspension
>     compile group: 'joda-time', name: 'joda-time', version: '2.9.4'
>     compile 'com.google.protobuf:protobuf-java-util:3.1.0'
>
>     /*
>      * Flink Dependencies
>      */
>     compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.0'
>     compile group: 'org.apache.flink', name: 
> 'flink-connector-kafka-0.10_2.10', version: '1.3.0'
>     compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10', 
> version: '1.3.0'
>     compile group: 'org.apache.flink', name: 'flink-clients_2.10', version: 
> '1.3.0'
>
>
>
>
> }
>
> Can Someone please help ?
>
>

Reply via email to