Dear all,
I'm having trouble unifying two data streams using the union operator in
PyFlink. My code basically looks like this:
init_stream = (operator_stream
.filter(lambda r: r[0] is None)
.map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
.key_by(lambda x: x[0], Types.STRING())
)
stateful_operator_stream = (operator_stream
.filter(lambda r: r[0] is not None)
.map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(),
Types.PICKLED_BYTE_ARRAY()]))
.key_by(lambda x: x[0],Types.STRING())
)
print(init_stream)print(init_stream.get_type())
print(stateful_operator_stream.get_type())print(stateful_operator_stream)
final_operator_stream = init_stream
.union(stateful_operator_stream)
.process(stateful_operator)
In short, I have a datastream (operator_stream) of type Tuple[str,
Event] which I define as a tuple of Types.STRING() and
Types.PICKLED.BYTE_ARRAY().
When calling the union operator, I get an error which shows a type
mismatch between both streams:
py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
: java.lang.IllegalArgumentException: Cannot union streams of
different types: Java Tuple2<String, PickledByteArrayTypeInfo> and
Row(f0: String, f1: Java Tuple2<String, PickledByteArrayTypeInfo>)
at
org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
However, when I print the types of both datastreams they seem similar:
<pyflink.datastream.data_stream.KeyedStream object at 0x7f1539877320>
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
<pyflink.datastream.data_stream.KeyedStream object at 0x7f1539884f98>
Any thoughts? Thanks in advance!
Regards,
Wouter