Hi Wouter,

1. These configurations work for both Python Table API and DataStream API. It 
seems targeted for Python Table API as only Python Table API is supported when 
adding the documentation. I will followup to improve the paragraph.

2. The recommended way is using `stream_execution_environment.add_jars`. Could 
you share some code snippets? 

Regards,
Dian

> 2021年5月23日 下午7:26,Wouter Zorgdrager <zorgdrag...@gmail.com> 写道:
> 
> Hi Dian, all,
> 
> Thanks, that indeed solved my problem. I have two more questions, I'm not 
> sure if it is better practice to send a new email to the mailing list or to 
> re-use a thread:
> 
> 1. I noticed very high latency (multiple seconds per message) for a job with 
> multiple operators and very low throughput. I bet because messages are 
> bundled until a size threshold or time threshold is met (and in a low 
> throughput scenario, only the time threshold is triggered).
> This is also the idea I get when reading the configuration page [1]. However, 
> these configuration values seem to be targeted at the TableAPI and it is 
> unclear to me how to configure this for the Datastream API. To be clear, this 
> is in PyFlink.
> 
> 2. I'm using the JVM Kafka Consumer and Producer for my Python job. Therefore 
> I had to add the flink-connector-sql-kafka jar to my Flink environment. I did 
> this by downloading the jar file from Maven and putting it under 
> 'venv/pyflink/lib'. Is there any easier way? I'm not particularly a fan of 
> manually changing my venv. 
> I tried to use stream_execution_environment.add_jars but that was 
> unsuccessful, I still got a ClassNotFoundException.  
> 
> Hope you can help. As always, thanks a lot!
> 
> Regards,
> Wouter
> 
> 
> [1] - 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/python/python_config/
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/python/python_config/>
> On Fri, 21 May 2021 at 05:25, Dian Fu <dian0511...@gmail.com 
> <mailto:dian0511...@gmail.com>> wrote:
> Hi Wouter,
> 
> 1) For the exception, it seems a bug. I have filed a ticket for it: 
> https://issues.apache.org/jira/browse/FLINK-22733 
> <https://issues.apache.org/jira/browse/FLINK-22733>
> 
> 2) Regarding to your requirements, I guess you should do it as following:
> ```
> init_stream = (operator_stream
>    .filter(lambda r: r[0] is None)
>    
> .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
> )
> 
> stateful_operator_stream = (operator_stream
>     .filter(lambda r: r[0] is not None)
>     .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 
> Types.PICKLED_BYTE_ARRAY()]))
> )
> 
> 
> init_stream.union(stateful_operator_stream).key_by(lambda x: 
> x[0],Types.STRING())
> 
> ```
> 
> The reason is that `union` will turns `KeyedStream` into `DataStream` and you 
> could not perform stateful operations on `DataStream` any more.
> 
> Regards,
> Dian
> 
>> 2021年5月21日 上午12:38,Wouter Zorgdrager <zorgdrag...@gmail.com 
>> <mailto:zorgdrag...@gmail.com>> 写道:
>> 
>> Dear all,
>> 
>> I'm having trouble unifying two data streams using the union operator in 
>> PyFlink. My code basically looks like this:
>> 
>> init_stream = (operator_stream
>>    .filter(lambda r: r[0] is None)
>>    
>> .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
>>    .key_by(lambda x: x[0], Types.STRING())
>> )
>> 
>> stateful_operator_stream = (operator_stream
>>     .filter(lambda r: r[0] is not None)
>>     .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 
>> Types.PICKLED_BYTE_ARRAY()]))
>>     .key_by(lambda x: x[0],Types.STRING())
>> )
>> 
>> print(init_stream)
>> print(init_stream.get_type())
>> 
>> print(stateful_operator_stream.get_type())
>> print(stateful_operator_stream)
>> 
>> final_operator_stream = init_stream
>>     .union(stateful_operator_stream)
>>     .process(stateful_operator)
>> 
>> 
>> In short, I have a datastream (operator_stream) of type Tuple[str, Event] 
>> which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY().
>> When calling the union operator, I get an error which shows a type mismatch 
>> between both streams:
>> 
>> py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
>> : java.lang.IllegalArgumentException: Cannot union streams of different 
>> types: Java Tuple2<String, PickledByteArrayTypeInfo> and Row(f0: String, f1: 
>> Java Tuple2<String, PickledByteArrayTypeInfo>)
>>      at 
>> org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
>>      at 
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
>> Method)
>>      at 
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>      at 
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>      at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>      at 
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>      at java.base/java.lang.Thread.run(Thread.java:834)
>> 
>> However, when I print the types of both datastreams they seem similar:
>> 
>> <pyflink.datastream.data_stream.KeyedStream object at 0x7f1539877320>
>> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
>> 
>> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
>> <pyflink.datastream.data_stream.KeyedStream object at 0x7f1539884f98>
>> 
>> Any thoughts? Thanks in advance!
>> 
>> Regards,
>> Wouter
> 

Reply via email to