Hi Jacopo,

to prevent type erasure in Java, you need to create a sub-type that
contains only reified types.

Instead of using a generic type with bound variables in

     stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType,
MyRightType>());

you can use

     stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType,
MyRightType>() {
     });

This will create an anonymous sub-type of MyKeyedBroadcastProcessFunction
that has the two types reified.

Another solution is to already create the sub type in your factory method.

     <MyLeftType, MyRightType> KeyedBroadcastProcessFunction<Integer,
Tuple2<Integer, String>, String, Either<MyLeftType, MyRightType>>
createFunction(...) {
          return KeyedBroadcastProcessFunction<Integer, Tuple2<Integer,
String>, String, Either<MyLeftType, MyRightType>> {
              ...
          };
     }


On Wed, Mar 4, 2020 at 4:08 PM <jacopo.go...@ubs.com> wrote:

> Hi all,
>
>
>
> Yes my problem is that I do not create the function inline but create a
> function directly when creating the data stream job.
>
> My code (which I cannot share) is exactly like your example, Yun, are you
> aware if there is a way to prevent code erasure?
>
>
>
> Kind regards,
>
>
>
> Jacopo Gobbi
>
>
>
>
>
> *From:* Yun Gao [mailto:yungao...@aliyun.com]
> *Sent:* Freitag, 21. Februar 2020 16:00
> *To:* Robert Metzger; Gobbi, Jacopo-XT
> *Cc:* user
> *Subject:* [External] Re: Flink's Either type information
>
>
>
>       Hi Jacopo, Robert,
>
>
>
>          Very sorry for missing the previous email and not response in
> time. I think exactly as Robert has pointed out with the example: using
> inline anonymous subclass of *KeyedBroadcastProcessFunction* should not
> cause the problem. As far as I know, the possible reason that cause the
> attached exception might be that the parameter types of *Either get* erased
> due to the way to create *KeyedBroadcastProcessFunction* object. For
> example, if you first implement a generic subclass of
> *KeyedBroadcastProcessFunction* like*:*
>
>
>
>       *public class MyKeyedBroadcastProcessFunction<MyLeftType,
> MyRightType> extends KeyedBroadcastProcessFunction<Integer, Tuple2<Integer,
> String>, String, Either<MyLeftType, MyRightType>> { ... }*
>
>
>
>      and create a function object directly when constructing the
> DataStream job:
>
>
>
>      *stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType,
> MyRightType>());*
>
>
>
>      Then *MyLeftType* and *MyRightType *will be erased and will cause
> the attached exception when Flink tries to inference the output type.
>
>
>
>      And I totally agree with Robert that attaching the corresponding
> codes would help debugging the problem.
>
>
>
>   Yours,
>
>     Yun
>
>
>
>
>
> ------------------------------------------------------------------
>
> From:Robert Metzger <rmetz...@apache.org>
>
> Send Time:2020 Feb. 21 (Fri.) 19:47
>
> To:jacopo.gobbi <jacopo.go...@ubs.com>
>
> Cc:yungao.gy <yungao...@aliyun.com>; user <user@flink.apache.org>
>
> Subject:Re: Flink's Either type information
>
>
>
> Hey Jacopo,
>
> can you post an example to reproduce the issue? I've tried it, but it
> worked in this artificial example:
>
>
>
> MapStateDescriptor<String, String> state = *new 
> *MapStateDescriptor<>(*"test"*, BasicTypeInfo.*STRING_TYPE_INFO*, 
> BasicTypeInfo.*STRING_TYPE_INFO*);
> DataStream<Either<Integer, String>> result = input
>       .map((MapFunction<String, Tuple2<Integer, String>>) value -> 
> Tuple2.*of*(0, 
> value)).returns(TupleTypeInfo.*getBasicTupleTypeInfo*(Integer.*class*, 
> String.*class*))
>       .keyBy(0).connect(input.broadcast(state))
>       .process(*new *KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, 
> String>, String, Either<Integer, String>>() {
>          @Override
>          *public void *processElement(Tuple2<Integer, String> value, 
> ReadOnlyContext ctx, Collector<Either<Integer, String>> out) *throws 
> *Exception {
>             out.collect(Either.*Left*(111));
>          }
>          @Override
>          *public void *processBroadcastElement(String value, Context ctx, 
> Collector<Either<Integer, String>> out) *throws *Exception { }
>       });
> result.print();
>
>
>
> On Wed, Feb 19, 2020 at 6:07 PM <jacopo.go...@ubs.com> wrote:
>
> Yes, I create it the way you mentioned.
>
>
>
> *From:* Yun Gao [mailto:yungao...@aliyun.com]
> *Sent:* Dienstag, 18. Februar 2020 10:12
> *To:* Gobbi, Jacopo-XT; user
> *Subject:* [External] Re: Flink's Either type information
>
>
>
>       Hi Jacopo,
>
>
>
>           Could you also provide how the KeyedBroadcastProcessFunction is
> created when constructing datastream API ? For example, are you using
> something like
>
>
>
>           new KeyedBroadcastProcessFunction<Integer, Integer, Integer,
> Either<MyLeft, MyRight>() {
>
>                        // Function implementation
>
>              }
>
>
>
>              or something else?
>
>
>
>      Best,
>
>       Yun
>
>
>
>
>
> ------------------------------------------------------------------
>
> From:jacopo.gobbi <jacopo.go...@ubs.com>
>
> Send Time:2020 Feb. 17 (Mon.) 18:31
>
> To:user <user@flink.apache.org>
>
> Subject:Flink's Either type information
>
>
>
> Hi all,
>
>
>
> How can an Either value be returned by a KeyedBroadcastProcessFunction?
>
> We keep getting "InvalidTypesException: Type extraction is not possible on
> Either type as it does not contain information about the 'left' type." when
> doing: out.collect(Either.<MyLeftType, MyRightType>Right(myObject));
>
>
>
> Thanks,
>
>
>
> Jacopo Gobbi
>
>
>
>
>
>
>

Reply via email to