Have you looked at org.apache.flink.types.Either? If you'd wrap all
elements in both streams before the union you should be able to join
them properly.
On 17/07/2019 14:18, John Tipper wrote:
Hi All,
Can I union/join 2 streams containing generic classes, where each
stream has a different parameterised type? I'd like to process the
combined stream of values as a single raw type, casting to a specific
type for detailed processing, based on some information in the type
that will allow me to safely cast to the specific type.
I can't share my exact code, but the below example shows the sort of
thing I want to do.
So, as an example, given the following generic type:
class MyGenericContainer<IN> extends Tuple3<String, IN, SomeOtherClass> {
...
private final String myString;
private final IN value;
private final Class<IN> clazz; // created by constructor
private SomeOtherClass someOtherClass;
...
} ||
and 2 streams, I'd like to be able to do something like:
DataStream<MyGenericContainer<String>> stream1 = ...
DataStream<MyGenericContainer<Integer>> stream2 = ...
DataStream<...> merged = stream1.union(stream2).process(new
MyProcessFunction());
// within an operator, such as a MyProcessFunction:
MyGenericContainer container = raw generic container passed to function;
Object rawValue = container.getValue();
performProcessing((container.getClazz())rawValue); // safely cast
rawValue ||
However, I get an error when I do this:
Caused by:
org.apache.flink.api.common.functions.InvalidTypesException: Type of
TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be
determined. This is most likely a type erasure problem. The type
extraction currently supports types with generic variables only in
cases where all variables in the return type can be deduced from the
input type(s). Otherwise the type has to be specified explicitly using
type information.
at
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587)
at
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633)
||
If I try to add a|returns()|to the code, like this:
DataStream<...> merged = stream1.union(stream2)
.process(...)
.returns(new TypeHint<MyGenericContainer>() {}) ||
then I get a different exception:
Exception in thread "main"
org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a
generic variable.This is not supported, generic types must be fully
specified for the TypeHint.||
Is this sort of thing supported or is there another way of joining
multiple streams into a single stream, where each stream object will
have a specific type of a common generic type?
Many thanks,
John