Hi Andrew,

I've found out that this's a bug brought by another bugfix FLINK-29681
<https://issues.apache.org/jira/browse/FLINK-29681>, I've created an issue
FLINK-31083 <https://issues.apache.org/jira/browse/FLINK-31083> for this
problem. You could temporarily set inner java type_info to None before
reusing the ProcessFunction to work around in your code, e.g.
```python
side_output_ds1 = processed_ds1.get_side_output(output_tag1)
output_tag1.type_info._j_typeinfo = None
processed_ds2 = processed_ds1.process(LinesWithAndToSideOutput())
```

Thanks for reporting!

David Anderson <dander...@apache.org> 于2023年2月15日周三 14:03写道:

> I can't respond to the python-specific aspects of this situation, but
> I don't believe you need to use the same OutputTag instance. It should
> be enough that the various tag instances involved all have the same
> String id. (That's why the id exists.)
>
> David
>
> On Tue, Feb 14, 2023 at 11:51 AM Andrew Otto <o...@wikimedia.org> wrote:
> >
> > Hi,
> >
> > I'm attempting to implement a generic error handling ProcessFunction in
> pyflink.  Given a user provided function, I want to invoke that function
> for each element in the DataStream, catch any errors thrown by the
> function, convert those errors into events, and then emit those event
> errors to a different DataStream sink.
> >
> > I'm trying to do this by reusing the same OutputTag in each of my
> ProcessFunctions.
> > However, this does not work, I believe because I am using the same
> error_output_tag in two different functions, which causes it to have a
> reference(?)  to _thread.Rlock, which causes the ProcessFunction instance
> to be un-pickleable.
> >
> > Here's a standalone example of the failure using the canonical
> word_count example.
> >
> > My question is.
> > 1. Does Flink support re-use of the same OutputTag instance in multiple
> ProcessFunctions?
> > 2. If so, is my problem pyflink / python / pickle specific?
> >
> > Thanks!
> > -Andrew Otto
> >  Wikimedia Foundation
> >
> >
>

Reply via email to