Hi Vincenzo,

the preferred way to get the type information for tuples is to use
org.apache.flink.api.common.typeinfo.Types. For Tuple2<Tuple2<String,
String>, Integer>, you'd perform

Types.TUPLE(Types.TUPLE(Types.STRING, Types.STRING), Types.INT)

Nested tuples are not an issue in general.

On Mon, Jun 22, 2020 at 2:18 PM Yun Gao <yungao...@aliyun.com> wrote:

> Hi Vincenzo:
>
>     Could you also attach the codes before line 72, namely how `delays` is
> defined ? Since the exception says the return type of "Custom Source" could
> not be defined, and I think it should refer to `delays`, and the exception
> is thrown when an operator is called on `delays` and Flink tries to create
> a new transformation based on the information of `delays`.
>
> Best,
>  Yun
>
> ------------------Original Mail ------------------
> *Sender:*Vincenzo Pronestì <vincenzoprone...@hotmail.com>
> *Send Date:*Mon Jun 22 19:02:05 2020
> *Recipients:*flink-user <user@flink.apache.org>
> *Subject:*Problems with type erasure
>
>> Hi there,
>>
>> I need to execute the following code:
>>
>>  72: KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays 73:    
>>            .flatMap(new Query1FlatMap()) 74:              .keyBy(item -> 
>> item.f0);but I keep getting this error message:The program finished with the 
>> following exception:The return type of function 'Custom Source' could not be 
>> determined automatically, due to type erasure. You can give type information 
>> hints by using the returns(...) method on the result of the transformation 
>> call, or by letting your function implement the 'ResultTypeQueryable' 
>> interface.org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451)org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178)org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:635)org.apache.flink.nyschoolbuses.Query2.main(Query2.java:73)I've
>>  read this guide 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html
>>  (there's an example with Tuple2<String,Double>> which is the same I need) 
>> and I think I have two options:1 - Implement 
>> ResultTypeQueryable<Tuple2<String, Double>> in my Query1FlatMap class. I did 
>> this by adding:@Overridepublic TypeInformation<Tuple2<String, Double>> 
>> getProducedType() {    return TypeInformation.of(new TypeHint<Tuple2<String, 
>> Double>>(){});}2 - Use the returns method right after the flatMap(new 
>> Query1FlatMap()), like this:TypeInformation<Tuple2<String,Double>> tInfo = 
>> TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});        
>> KeyedStream<Tuple2<String, Double>, String> keyedDelays = delays             
>>    .flatMap(new Query1FlatMap()).returns(tInfo)                .keyBy(item 
>> -> item.f0);Actually I've also tried with:TypeHint<Tuple2<String,Double>> 
>> tHint = new TypeHint<Tuple2<String, Double>>(){};KeyedStream<Tuple2<String, 
>> Double>, String> keyedDelays = delays                .flatMap(new 
>> Query1FlatMap()).returns(tHint)                .keyBy(item -> item.f0);The 
>> problem is none of all these things works and the error message is always 
>> the same as above. Does any of you know how I can fix this?Also I'm having 
>> the same issue with another code where the keyed stream has two Tuple2 (i.e. 
>> Tuple2<Tuple2<String, String>, Integer>, Tuple>). Would the solution work 
>> even in this last case? Or, do I need to change something because of the 
>> double Tuple2?Thank you for your attention.Best regards,Vincenzo
>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to