Hi Andrew, I've found out that this's a bug brought by another bugfix FLINK-29681 <https://issues.apache.org/jira/browse/FLINK-29681>, I've created an issue FLINK-31083 <https://issues.apache.org/jira/browse/FLINK-31083> for this problem. You could temporarily set inner java type_info to None before reusing the ProcessFunction to work around in your code, e.g. ```python side_output_ds1 = processed_ds1.get_side_output(output_tag1) output_tag1.type_info._j_typeinfo = None processed_ds2 = processed_ds1.process(LinesWithAndToSideOutput()) ```
Thanks for reporting! David Anderson <dander...@apache.org> 于2023年2月15日周三 14:03写道: > I can't respond to the python-specific aspects of this situation, but > I don't believe you need to use the same OutputTag instance. It should > be enough that the various tag instances involved all have the same > String id. (That's why the id exists.) > > David > > On Tue, Feb 14, 2023 at 11:51 AM Andrew Otto <o...@wikimedia.org> wrote: > > > > Hi, > > > > I'm attempting to implement a generic error handling ProcessFunction in > pyflink. Given a user provided function, I want to invoke that function > for each element in the DataStream, catch any errors thrown by the > function, convert those errors into events, and then emit those event > errors to a different DataStream sink. > > > > I'm trying to do this by reusing the same OutputTag in each of my > ProcessFunctions. > > However, this does not work, I believe because I am using the same > error_output_tag in two different functions, which causes it to have a > reference(?) to _thread.Rlock, which causes the ProcessFunction instance > to be un-pickleable. > > > > Here's a standalone example of the failure using the canonical > word_count example. > > > > My question is. > > 1. Does Flink support re-use of the same OutputTag instance in multiple > ProcessFunctions? > > 2. If so, is my problem pyflink / python / pickle specific? > > > > Thanks! > > -Andrew Otto > > Wikimedia Foundation > > > > >