Hi Wouter, 1. These configurations work for both Python Table API and DataStream API. It seems targeted for Python Table API as only Python Table API is supported when adding the documentation. I will followup to improve the paragraph.
2. The recommended way is using `stream_execution_environment.add_jars`. Could you share some code snippets? Regards, Dian > 2021年5月23日 下午7:26,Wouter Zorgdrager <zorgdrag...@gmail.com> 写道: > > Hi Dian, all, > > Thanks, that indeed solved my problem. I have two more questions, I'm not > sure if it is better practice to send a new email to the mailing list or to > re-use a thread: > > 1. I noticed very high latency (multiple seconds per message) for a job with > multiple operators and very low throughput. I bet because messages are > bundled until a size threshold or time threshold is met (and in a low > throughput scenario, only the time threshold is triggered). > This is also the idea I get when reading the configuration page [1]. However, > these configuration values seem to be targeted at the TableAPI and it is > unclear to me how to configure this for the Datastream API. To be clear, this > is in PyFlink. > > 2. I'm using the JVM Kafka Consumer and Producer for my Python job. Therefore > I had to add the flink-connector-sql-kafka jar to my Flink environment. I did > this by downloading the jar file from Maven and putting it under > 'venv/pyflink/lib'. Is there any easier way? I'm not particularly a fan of > manually changing my venv. > I tried to use stream_execution_environment.add_jars but that was > unsuccessful, I still got a ClassNotFoundException. > > Hope you can help. As always, thanks a lot! > > Regards, > Wouter > > > [1] - > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/python/python_config/ > > <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/python/python_config/> > On Fri, 21 May 2021 at 05:25, Dian Fu <dian0511...@gmail.com > <mailto:dian0511...@gmail.com>> wrote: > Hi Wouter, > > 1) For the exception, it seems a bug. I have filed a ticket for it: > https://issues.apache.org/jira/browse/FLINK-22733 > <https://issues.apache.org/jira/browse/FLINK-22733> > > 2) Regarding to your requirements, I guess you should do it as following: > ``` > init_stream = (operator_stream > .filter(lambda r: r[0] is None) > > .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()])) > ) > > stateful_operator_stream = (operator_stream > .filter(lambda r: r[0] is not None) > .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), > Types.PICKLED_BYTE_ARRAY()])) > ) > > > init_stream.union(stateful_operator_stream).key_by(lambda x: > x[0],Types.STRING()) > > ``` > > The reason is that `union` will turns `KeyedStream` into `DataStream` and you > could not perform stateful operations on `DataStream` any more. > > Regards, > Dian > >> 2021年5月21日 上午12:38,Wouter Zorgdrager <zorgdrag...@gmail.com >> <mailto:zorgdrag...@gmail.com>> 写道: >> >> Dear all, >> >> I'm having trouble unifying two data streams using the union operator in >> PyFlink. My code basically looks like this: >> >> init_stream = (operator_stream >> .filter(lambda r: r[0] is None) >> >> .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()])) >> .key_by(lambda x: x[0], Types.STRING()) >> ) >> >> stateful_operator_stream = (operator_stream >> .filter(lambda r: r[0] is not None) >> .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), >> Types.PICKLED_BYTE_ARRAY()])) >> .key_by(lambda x: x[0],Types.STRING()) >> ) >> >> print(init_stream) >> print(init_stream.get_type()) >> >> print(stateful_operator_stream.get_type()) >> print(stateful_operator_stream) >> >> final_operator_stream = init_stream >> .union(stateful_operator_stream) >> .process(stateful_operator) >> >> >> In short, I have a datastream (operator_stream) of type Tuple[str, Event] >> which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY(). >> When calling the union operator, I get an error which shows a type mismatch >> between both streams: >> >> py4j.protocol.Py4JJavaError: An error occurred while calling o732.union. >> : java.lang.IllegalArgumentException: Cannot union streams of different >> types: Java Tuple2<String, PickledByteArrayTypeInfo> and Row(f0: String, f1: >> Java Tuple2<String, PickledByteArrayTypeInfo>) >> at >> org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238) >> at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> at >> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >> at >> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> at >> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >> at >> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >> at java.base/java.lang.Thread.run(Thread.java:834) >> >> However, when I print the types of both datastreams they seem similar: >> >> <pyflink.datastream.data_stream.KeyedStream object at 0x7f1539877320> >> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo)) >> >> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo)) >> <pyflink.datastream.data_stream.KeyedStream object at 0x7f1539884f98> >> >> Any thoughts? Thanks in advance! >> >> Regards, >> Wouter >