Hi Jacopo, Robert,
Very sorry for missing the previous email and not response in time. I
think exactly as Robert has pointed out with the example: using inline
anonymous subclass of KeyedBroadcastProcessFunction should not cause the
problem. As far as I know, the possible reason that cause the attached
exception might be that the parameter types of Either get erased due to the way
to create KeyedBroadcastProcessFunction object. For example, if you first
implement a generic subclass of KeyedBroadcastProcessFunction like:
public class MyKeyedBroadcastProcessFunction<MyLeftType, MyRightType> extends
KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String,
Either<MyLeftType, MyRightType>> { ... }
and create a function object directly when constructing the DataStream job:
stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType, MyRightType>());
Then MyLeftType and MyRightType will be erased and will cause the attached
exception when Flink tries to inference the output type.
And I totally agree with Robert that attaching the corresponding codes
would help debugging the problem.
Yours,
Yun
------------------------------------------------------------------
From:Robert Metzger <[email protected]>
Send Time:2020 Feb. 21 (Fri.) 19:47
To:jacopo.gobbi <[email protected]>
Cc:yungao.gy <[email protected]>; user <[email protected]>
Subject:Re: Flink's Either type information
Hey Jacopo,
can you post an example to reproduce the issue? I've tried it, but it worked in
this artificial example:
MapStateDescriptor<String, String> state = new MapStateDescriptor<>("test",
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
DataStream<Either<Integer, String>> result = input
.map((MapFunction<String, Tuple2<Integer, String>>) value -> Tuple2.of(0,
value)).returns(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class,
String.class))
.keyBy(0).connect(input.broadcast(state))
.process(new KeyedBroadcastProcessFunction<Integer, Tuple2<Integer,
String>, String, Either<Integer, String>>() {
@Override
public void processElement(Tuple2<Integer, String> value,
ReadOnlyContext ctx, Collector<Either<Integer, String>> out) throws Exception {
out.collect(Either.Left(111));
}
@Override
public void processBroadcastElement(String value, Context ctx,
Collector<Either<Integer, String>> out) throws Exception { }
});
result.print();
On Wed, Feb 19, 2020 at 6:07 PM <[email protected]> wrote:
Yes, I create it the way you mentioned.
From: Yun Gao [mailto:[email protected]]
Sent: Dienstag, 18. Februar 2020 10:12
To: Gobbi, Jacopo-XT; user
Subject: [External] Re: Flink's Either type information
Hi Jacopo,
Could you also provide how the KeyedBroadcastProcessFunction is
created when constructing datastream API ? For example, are you using something
like
new KeyedBroadcastProcessFunction<Integer, Integer, Integer,
Either<MyLeft, MyRight>() {
// Function implementation
}
or something else?
Best,
Yun
------------------------------------------------------------------
From:jacopo.gobbi <[email protected]>
Send Time:2020 Feb. 17 (Mon.) 18:31
To:user <[email protected]>
Subject:Flink's Either type information
Hi all,
How can an Either value be returned by a KeyedBroadcastProcessFunction?
We keep getting "InvalidTypesException: Type extraction is not possible on
Either type as it does not contain information about the 'left' type." when
doing: out.collect(Either.<MyLeftType, MyRightType>Right(myObject));
Thanks,
Jacopo Gobbi