Hi there,

I need to execute the following code:

 72: KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays
 73:               .flatMap(new Query1FlatMap())74: .keyBy(item -> item.f0);

but I keep getting this error message:

        The program finished with the following exception:

        The return type of function 'Custom Source' could not be determined 
automatically, due to type erasure. You can give type information hints by 
using the returns(...) method on the result of the transformation call, or by 
letting your function implement the 'ResultTypeQueryable' interface.
                
org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)
                
org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)
                
org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)
                org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)

I've read this guide 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html 
(there's an example with Tuple2<String,Double>> which is the same I need) and I 
think I have two options:
1 - Implement ResultTypeQueryable<Tuple2<String, Double>> in myQuery1FlatMap
 class. I did this by adding:
@Override public TypeInformation<Tuple2<String, Double>> getProducedType() {
        return TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
        }

2 - Use the returns method right after the flatMap(new Query1FlatMap()), like 
this:
        TypeInformation<Tuple2<String,Double>> tInfo = TypeInformation.of(new 
TypeHint<Tuple2<String, Double>>(){});
        KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays
                .flatMap(new Query1FlatMap()).returns(tInfo).keyBy(item -> 
item.f0);
Actually I've also tried with:
        TypeHint<Tuple2<String,Double>> tHint =new TypeHint<Tuple2<String, 
Double>>(){};
        KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays
                .flatMap(new Query1FlatMap()).returns(tHint).keyBy(item -> 
item.f0);

The problem is none of all these things works and the error message is always 
the same as above. Does any of you know how I can fix this?
Also I'm having the same issue with another code where the keyed stream has two Tuple2 (i.e. 
Tuple2<Tuple2<String, String>, Integer>, Tuple>). Would the solution work even 
in this last case? Or, do I need to change something because of the double Tuple2?

Thank you for your attention.
Best regards,
Vincenzo

Reply via email to