Hi Wouter,
1) For the exception, it seems a bug. I have filed a ticket for it:
https://issues.apache.org/jira/browse/FLINK-22733
<https://issues.apache.org/jira/browse/FLINK-22733>
2) Regarding to your requirements, I guess you should do it as following:
```
init_stream = (operator_stream
.filter(lambda r: r[0] is None)
.map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
)
stateful_operator_stream = (operator_stream
.filter(lambda r: r[0] is not None)
.map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(),
Types.PICKLED_BYTE_ARRAY()]))
)
init_stream.union(stateful_operator_stream).key_by(lambda x:
x[0],Types.STRING())
```
The reason is that `union` will turns `KeyedStream` into `DataStream` and you
could not perform stateful operations on `DataStream` any more.
Regards,
Dian
> 2021年5月21日 上午12:38,Wouter Zorgdrager <[email protected]> 写道:
>
> Dear all,
>
> I'm having trouble unifying two data streams using the union operator in
> PyFlink. My code basically looks like this:
>
> init_stream = (operator_stream
> .filter(lambda r: r[0] is None)
>
> .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
> .key_by(lambda x: x[0], Types.STRING())
> )
>
> stateful_operator_stream = (operator_stream
> .filter(lambda r: r[0] is not None)
> .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(),
> Types.PICKLED_BYTE_ARRAY()]))
> .key_by(lambda x: x[0],Types.STRING())
> )
>
> print(init_stream)
> print(init_stream.get_type())
>
> print(stateful_operator_stream.get_type())
> print(stateful_operator_stream)
>
> final_operator_stream = init_stream
> .union(stateful_operator_stream)
> .process(stateful_operator)
>
>
> In short, I have a datastream (operator_stream) of type Tuple[str, Event]
> which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY().
> When calling the union operator, I get an error which shows a type mismatch
> between both streams:
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
> : java.lang.IllegalArgumentException: Cannot union streams of different
> types: Java Tuple2<String, PickledByteArrayTypeInfo> and Row(f0: String, f1:
> Java Tuple2<String, PickledByteArrayTypeInfo>)
> at
> org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Thread.java:834)
>
> However, when I print the types of both datastreams they seem similar:
>
> <pyflink.datastream.data_stream.KeyedStream object at 0x7f1539877320>
> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
>
> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
> <pyflink.datastream.data_stream.KeyedStream object at 0x7f1539884f98>
>
> Any thoughts? Thanks in advance!
>
> Regards,
> Wouter