Hi Vincenzo, the preferred way to get the type information for tuples is to use org.apache.flink.api.common.typeinfo.Types. For Tuple2<Tuple2<String, String>, Integer>, you'd perform
Types.TUPLE(Types.TUPLE(Types.STRING, Types.STRING), Types.INT) Nested tuples are not an issue in general. On Mon, Jun 22, 2020 at 2:18 PM Yun Gao <yungao...@aliyun.com> wrote: > Hi Vincenzo: > > Could you also attach the codes before line 72, namely how `delays` is > defined ? Since the exception says the return type of "Custom Source" could > not be defined, and I think it should refer to `delays`, and the exception > is thrown when an operator is called on `delays` and Flink tries to create > a new transformation based on the information of `delays`. > > Best, > Yun > > ------------------Original Mail ------------------ > *Sender:*Vincenzo Pronestì <vincenzoprone...@hotmail.com> > *Send Date:*Mon Jun 22 19:02:05 2020 > *Recipients:*flink-user <user@flink.apache.org> > *Subject:*Problems with type erasure > >> Hi there, >> >> I need to execute the following code: >> >> 72: KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays 73: >> .flatMap(new Query1FlatMap()) 74: .keyBy(item -> >> item.f0);but I keep getting this error message:The program finished with the >> following exception:The return type of function 'Custom Source' could not be >> determined automatically, due to type erasure. You can give type information >> hints by using the returns(...) method on the result of the transformation >> call, or by letting your function implement the 'ResultTypeQueryable' >> interface.org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)I've >> read this guide >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html >> (there's an example with Tuple2<String,Double>> which is the same I need) >> and I think I have two options:1 - Implement >> ResultTypeQueryable<Tuple2<String, Double>> in my Query1FlatMap class. I did >> this by adding:@Overridepublic TypeInformation<Tuple2<String, Double>> >> getProducedType() { return TypeInformation.of(new TypeHint<Tuple2<String, >> Double>>(){});}2 - Use the returns method right after the flatMap(new >> Query1FlatMap()), like this:TypeInformation<Tuple2<String,Double>> tInfo = >> TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){}); >> KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays >> .flatMap(new Query1FlatMap()).returns(tInfo) .keyBy(item >> -> item.f0);Actually I've also tried with:TypeHint<Tuple2<String,Double>> >> tHint = new TypeHint<Tuple2<String, Double>>(){};KeyedStream<Tuple2<String, >> Double>, String> keyedDelays = delays .flatMap(new >> Query1FlatMap()).returns(tHint) .keyBy(item -> item.f0);The >> problem is none of all these things works and the error message is always >> the same as above. Does any of you know how I can fix this?Also I'm having >> the same issue with another code where the keyed stream has two Tuple2 (i.e. >> Tuple2<Tuple2<String, String>, Integer>, Tuple>). Would the solution work >> even in this last case? Or, do I need to change something because of the >> double Tuple2?Thank you for your attention.Best regards,Vincenzo >> >> -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng