I am pretty sure I am doing something wrong here. Just that I do not
understand why?

I wrote a small program that reads messages from Kafka and prints it out.



public class Main {

    private static final int CHECKPOINT_INTERVAL = 100000;


    private static Properties getpropsFromEnv() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", System.getenv("KAFKA_ADDRESS"));
        props.setProperty("group.id", System.getenv("CONSUMER_GROUP_ID"));
        props.setProperty("topic", System.getenv("KAFKA_TOPIC"));
        return props;
    }

    public static void main(String[] args) throws Exception {


        Properties props = getpropsFromEnv();


        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(CHECKPOINT_INTERVAL);
        env.setParallelism(1);
        FlinkKafkaConsumer010<LogMessage> flinkConsumer =
                new FlinkKafkaConsumer010<LogMessage>(

Arrays.asList(parameterTool.getRequired("topic").split(",")), new
LogDeserializationSchema(), parameterTool.getProperties()
                    );

        DataStream<LogMessage> logMessageDataStream =
env.addSource(flinkConsumer);
        logMessageDataStream.print();

        env.execute("SomeJob");

    }
}

public class LogDeserializationSchema implements
DeserializationSchema<LogMessage> {

    @Override
    public LogMessage deserialize(byte[] message) {
        LogMessage logMessage = null;
        try {
            logMessage =  LogMessage.parseFrom(message);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        } finally {
            return logMessage;
        }
    }

    @Override
    public boolean isEndOfStream(LogMessage nextElement) {
        return false;
    }

    @Override
    public TypeInformation<LogMessage> getProducedType() {
        return TypeExtractor.getForClass(LogMessage.class);
    }
}


When I run this program, I do not see any messages being read by the consumer.

Things to note :

1. I ran kafka-console-consumer using the same Kafka parameters and
saw continuous output.

2. My Gradle file has the following depencies :

dependencies {
    compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.7'


    compile 'org.aeonbits.owner:owner:1.0.9'
    compile group: 'com.mashape.unirest', name: 'unirest-java',
version: '1.4.9' // For driver Suspension
    compile group: 'joda-time', name: 'joda-time', version: '2.9.4'
    compile 'com.google.protobuf:protobuf-java-util:3.1.0'

    /*
     * Flink Dependencies
     */
    compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.0'
    compile group: 'org.apache.flink', name:
'flink-connector-kafka-0.10_2.10', version: '1.3.0'
    compile group: 'org.apache.flink', name:
'flink-streaming-java_2.10', version: '1.3.0'
    compile group: 'org.apache.flink', name: 'flink-clients_2.10',
version: '1.3.0'




}

Can Someone please help ?

Reply via email to