Wow thank you so much!  Good to know its not just me.

At the end of my day yesterday, I started sniffing this out too.  I think I
effectively did the same thing as setting _j_typeinfo to None by manually
recreating the _j_typeinfo in a new ('cloned') output tag:

from pyflink.common.typeinfo import TypeInformation, _from_java_type
from pyflink.datastream import OutputTag

def clone_type_info(type_info: TypeInformation) -> TypeInformation:
    return _from_java_type(type_info.get_java_type_info())

def clone_output_tag(tag: OutputTag) -> OutputTag:
    return OutputTag(tag.tag_id, clone_type_info(tag.type_info))

Then, every time I need to use an OutputTag (or any function that will
enclos a _j_type_info) I make sure that that object is 'cloned'.

Thanks so much for the bugfiix!  Looking forward to it!


On Wed, Feb 15, 2023 at 4:41 AM Juntao Hu <maybach...@gmail.com> wrote:

> Hi Andrew,
>
> I've found out that this's a bug brought by another bugfix FLINK-29681
> <https://issues.apache.org/jira/browse/FLINK-29681>, I've created an
> issue FLINK-31083 <https://issues.apache.org/jira/browse/FLINK-31083> for
> this problem. You could temporarily set inner java type_info to None before
> reusing the ProcessFunction to work around in your code, e.g.
> ```python
> side_output_ds1 = processed_ds1.get_side_output(output_tag1)
> output_tag1.type_info._j_typeinfo = None
> processed_ds2 = processed_ds1.process(LinesWithAndToSideOutput())
> ```
>
> Thanks for reporting!
>
> David Anderson <dander...@apache.org> 于2023年2月15日周三 14:03写道:
>
>> I can't respond to the python-specific aspects of this situation, but
>> I don't believe you need to use the same OutputTag instance. It should
>> be enough that the various tag instances involved all have the same
>> String id. (That's why the id exists.)
>>
>> David
>>
>> On Tue, Feb 14, 2023 at 11:51 AM Andrew Otto <o...@wikimedia.org> wrote:
>> >
>> > Hi,
>> >
>> > I'm attempting to implement a generic error handling ProcessFunction in
>> pyflink.  Given a user provided function, I want to invoke that function
>> for each element in the DataStream, catch any errors thrown by the
>> function, convert those errors into events, and then emit those event
>> errors to a different DataStream sink.
>> >
>> > I'm trying to do this by reusing the same OutputTag in each of my
>> ProcessFunctions.
>> > However, this does not work, I believe because I am using the same
>> error_output_tag in two different functions, which causes it to have a
>> reference(?)  to _thread.Rlock, which causes the ProcessFunction instance
>> to be un-pickleable.
>> >
>> > Here's a standalone example of the failure using the canonical
>> word_count example.
>> >
>> > My question is.
>> > 1. Does Flink support re-use of the same OutputTag instance in multiple
>> ProcessFunctions?
>> > 2. If so, is my problem pyflink / python / pickle specific?
>> >
>> > Thanks!
>> > -Andrew Otto
>> >  Wikimedia Foundation
>> >
>> >
>>
>

Reply via email to