Hi Vincenzo:

    Could you also attach the codes before line 72, namely how `delays` is 
defined ? Since the exception says the return type of "Custom Source" could not 
be defined, and I think it should refer to `delays`, and the exception is 
thrown when an operator is called on `delays` and Flink tries to create a new 
transformation based on the information of `delays`.

Best,
 Yun


 ------------------Original Mail ------------------
Sender:Vincenzo Pronestì <vincenzoprone...@hotmail.com>
Send Date:Mon Jun 22 19:02:05 2020
Recipients:flink-user <user@flink.apache.org>
Subject:Problems with type erasure

Hi there,
I need to execute the following code:
 72: KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays 73:       
        .flatMap(new Query1FlatMap()) 74:              .keyBy(item -> 
item.f0);but I keep getting this error message:The program finished with the 
following exception:The return type of function 'Custom Source' could not be 
determined automatically, due to type erasure. You can give type information 
hints by using the returns(...) method on the result of the transformation 
call, or by letting your function implement the 'ResultTypeQueryable' 
interface.org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)I've
 read this guide 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html
 (there's an example with Tuple2<String,Double>> which is the same I need) and 
I think I have two options:1 - Implement ResultTypeQueryable<Tuple2<String, 
Double>> in my Query1FlatMap class. I did this by adding:@Overridepublic 
TypeInformation<Tuple2<String, Double>> getProducedType() {    return 
TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});}2 - Use the 
returns method right after the flatMap(new Query1FlatMap()), like 
this:TypeInformation<Tuple2<String,Double>> tInfo = TypeInformation.of(new 
TypeHint<Tuple2<String, Double>>(){});        KeyedStream<Tuple2<String, 
Double>, String> keyedDelays = delays                .flatMap(new 
Query1FlatMap()).returns(tInfo)                .keyBy(item -> item.f0);Actually 
I've also tried with:TypeHint<Tuple2<String,Double>> tHint = new 
TypeHint<Tuple2<String, Double>>(){};KeyedStream<Tuple2<String, Double>, 
String> keyedDelays = delays                .flatMap(new 
Query1FlatMap()).returns(tHint)                .keyBy(item -> item.f0);The 
problem is none of all these things works and the error message is always the 
same as above. Does any of you know how I can fix this?Also I'm having the same 
issue with another code where the keyed stream has two Tuple2 (i.e. 
Tuple2<Tuple2<String, String>, Integer>, Tuple>). Would the solution work even 
in this last case? Or, do I need to change something because of the double 
Tuple2?Thank you for your attention.Best regards,Vincenzo 

Reply via email to