Hi guys,

When I run my Flink topology (locally) I get this error:

The return type of function 'main(Job.java:69)' could not be determined 
automatically, due to type erasure. You can give type information hints by 
using the returns(...) method on the result of the transformation call, or by 
letting your function implement the 'ResultTypeQueryable' interface.

My code is:
FlinkKafkaConsumer010<String> controlsConsumer = new 
FlinkKafkaConsumer010<String>(Topics.CONTROL_TOPIC, new SimpleStringSchema(), 
properties);

DataStream <ControlMessage> controlStream = 
environment.addSource(controlsConsumer)
        .rebalance()
        .map(value -> {
                ControlMessage message = new ControlMessage();
                message.initFromJSON(value);
                return message;
        });
KeyedStream <Tuple2<String, ControlMessage>, Tuple> keyed = controlStream
        .map(message -> new Tuple2<String, 
ControlMessage>(message.getExpressionId(), message))
        .keyBy(0);

The exception is raised in the second map (before the keyBy() operation). From 
my understanding I have to use returns(…) after the map function. However, I 
can’t really understand why. Can someone, please, explain me why this happens? 

Thank you so much in advance.

Best,


Gabriele

Reply via email to