Wow thank you so much! Good to know its not just me. At the end of my day yesterday, I started sniffing this out too. I think I effectively did the same thing as setting _j_typeinfo to None by manually recreating the _j_typeinfo in a new ('cloned') output tag:
from pyflink.common.typeinfo import TypeInformation, _from_java_type from pyflink.datastream import OutputTag def clone_type_info(type_info: TypeInformation) -> TypeInformation: return _from_java_type(type_info.get_java_type_info()) def clone_output_tag(tag: OutputTag) -> OutputTag: return OutputTag(tag.tag_id, clone_type_info(tag.type_info)) Then, every time I need to use an OutputTag (or any function that will enclos a _j_type_info) I make sure that that object is 'cloned'. Thanks so much for the bugfiix! Looking forward to it! On Wed, Feb 15, 2023 at 4:41 AM Juntao Hu <maybach...@gmail.com> wrote: > Hi Andrew, > > I've found out that this's a bug brought by another bugfix FLINK-29681 > <https://issues.apache.org/jira/browse/FLINK-29681>, I've created an > issue FLINK-31083 <https://issues.apache.org/jira/browse/FLINK-31083> for > this problem. You could temporarily set inner java type_info to None before > reusing the ProcessFunction to work around in your code, e.g. > ```python > side_output_ds1 = processed_ds1.get_side_output(output_tag1) > output_tag1.type_info._j_typeinfo = None > processed_ds2 = processed_ds1.process(LinesWithAndToSideOutput()) > ``` > > Thanks for reporting! > > David Anderson <dander...@apache.org> 于2023年2月15日周三 14:03写道: > >> I can't respond to the python-specific aspects of this situation, but >> I don't believe you need to use the same OutputTag instance. It should >> be enough that the various tag instances involved all have the same >> String id. (That's why the id exists.) >> >> David >> >> On Tue, Feb 14, 2023 at 11:51 AM Andrew Otto <o...@wikimedia.org> wrote: >> > >> > Hi, >> > >> > I'm attempting to implement a generic error handling ProcessFunction in >> pyflink. Given a user provided function, I want to invoke that function >> for each element in the DataStream, catch any errors thrown by the >> function, convert those errors into events, and then emit those event >> errors to a different DataStream sink. >> > >> > I'm trying to do this by reusing the same OutputTag in each of my >> ProcessFunctions. >> > However, this does not work, I believe because I am using the same >> error_output_tag in two different functions, which causes it to have a >> reference(?) to _thread.Rlock, which causes the ProcessFunction instance >> to be un-pickleable. >> > >> > Here's a standalone example of the failure using the canonical >> word_count example. >> > >> > My question is. >> > 1. Does Flink support re-use of the same OutputTag instance in multiple >> ProcessFunctions? >> > 2. If so, is my problem pyflink / python / pickle specific? >> > >> > Thanks! >> > -Andrew Otto >> > Wikimedia Foundation >> > >> > >> >