Hi Jacopo, to prevent type erasure in Java, you need to create a sub-type that contains only reified types.
Instead of using a generic type with bound variables in stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType, MyRightType>()); you can use stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType, MyRightType>() { }); This will create an anonymous sub-type of MyKeyedBroadcastProcessFunction that has the two types reified. Another solution is to already create the sub type in your factory method. <MyLeftType, MyRightType> KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String, Either<MyLeftType, MyRightType>> createFunction(...) { return KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String, Either<MyLeftType, MyRightType>> { ... }; } On Wed, Mar 4, 2020 at 4:08 PM <jacopo.go...@ubs.com> wrote: > Hi all, > > > > Yes my problem is that I do not create the function inline but create a > function directly when creating the data stream job. > > My code (which I cannot share) is exactly like your example, Yun, are you > aware if there is a way to prevent code erasure? > > > > Kind regards, > > > > Jacopo Gobbi > > > > > > *From:* Yun Gao [mailto:yungao...@aliyun.com] > *Sent:* Freitag, 21. Februar 2020 16:00 > *To:* Robert Metzger; Gobbi, Jacopo-XT > *Cc:* user > *Subject:* [External] Re: Flink's Either type information > > > > Hi Jacopo, Robert, > > > > Very sorry for missing the previous email and not response in > time. I think exactly as Robert has pointed out with the example: using > inline anonymous subclass of *KeyedBroadcastProcessFunction* should not > cause the problem. As far as I know, the possible reason that cause the > attached exception might be that the parameter types of *Either get* erased > due to the way to create *KeyedBroadcastProcessFunction* object. For > example, if you first implement a generic subclass of > *KeyedBroadcastProcessFunction* like*:* > > > > *public class MyKeyedBroadcastProcessFunction<MyLeftType, > MyRightType> extends KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, > String>, String, Either<MyLeftType, MyRightType>> { ... }* > > > > and create a function object directly when constructing the > DataStream job: > > > > *stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType, > MyRightType>());* > > > > Then *MyLeftType* and *MyRightType *will be erased and will cause > the attached exception when Flink tries to inference the output type. > > > > And I totally agree with Robert that attaching the corresponding > codes would help debugging the problem. > > > > Yours, > > Yun > > > > > > ------------------------------------------------------------------ > > From:Robert Metzger <rmetz...@apache.org> > > Send Time:2020 Feb. 21 (Fri.) 19:47 > > To:jacopo.gobbi <jacopo.go...@ubs.com> > > Cc:yungao.gy <yungao...@aliyun.com>; user <user@flink.apache.org> > > Subject:Re: Flink's Either type information > > > > Hey Jacopo, > > can you post an example to reproduce the issue? I've tried it, but it > worked in this artificial example: > > > > MapStateDescriptor<String, String> state = *new > *MapStateDescriptor<>(*"test"*, BasicTypeInfo.*STRING_TYPE_INFO*, > BasicTypeInfo.*STRING_TYPE_INFO*); > DataStream<Either<Integer, String>> result = input > .map((MapFunction<String, Tuple2<Integer, String>>) value -> > Tuple2.*of*(0, > value)).returns(TupleTypeInfo.*getBasicTupleTypeInfo*(Integer.*class*, > String.*class*)) > .keyBy(0).connect(input.broadcast(state)) > .process(*new *KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, > String>, String, Either<Integer, String>>() { > @Override > *public void *processElement(Tuple2<Integer, String> value, > ReadOnlyContext ctx, Collector<Either<Integer, String>> out) *throws > *Exception { > out.collect(Either.*Left*(111)); > } > @Override > *public void *processBroadcastElement(String value, Context ctx, > Collector<Either<Integer, String>> out) *throws *Exception { } > }); > result.print(); > > > > On Wed, Feb 19, 2020 at 6:07 PM <jacopo.go...@ubs.com> wrote: > > Yes, I create it the way you mentioned. > > > > *From:* Yun Gao [mailto:yungao...@aliyun.com] > *Sent:* Dienstag, 18. Februar 2020 10:12 > *To:* Gobbi, Jacopo-XT; user > *Subject:* [External] Re: Flink's Either type information > > > > Hi Jacopo, > > > > Could you also provide how the KeyedBroadcastProcessFunction is > created when constructing datastream API ? For example, are you using > something like > > > > new KeyedBroadcastProcessFunction<Integer, Integer, Integer, > Either<MyLeft, MyRight>() { > > // Function implementation > > } > > > > or something else? > > > > Best, > > Yun > > > > > > ------------------------------------------------------------------ > > From:jacopo.gobbi <jacopo.go...@ubs.com> > > Send Time:2020 Feb. 17 (Mon.) 18:31 > > To:user <user@flink.apache.org> > > Subject:Flink's Either type information > > > > Hi all, > > > > How can an Either value be returned by a KeyedBroadcastProcessFunction? > > We keep getting "InvalidTypesException: Type extraction is not possible on > Either type as it does not contain information about the 'left' type." when > doing: out.collect(Either.<MyLeftType, MyRightType>Right(myObject)); > > > > Thanks, > > > > Jacopo Gobbi > > > > > > >