Hi Sergio,

You can find the explanation why we haven't added the close method in
the corresponding JIRA ticke[1]:

    When adding close() method to both DeserializationSchema and
    SerializationSchema with a default implementation, it breaks source
    compatibility if a user's class implements both interfaces at the
    same time. The problem is that java does not know which default
    implementation to use from those two interfaces as the close()
    signature is the same in both. In flink code base we have three of
    such implementations: SimpleStringSchema
    TypeInformationSerializationSchema and one in tests It is not a
    problem for open as we have a paremeter there that differentiates
    the two methods.

    We decided to skip the closefor now until we have a first use case
    for it. We do not need to close the schema registry client because
    it communicates over REST. Moreover no other schema needs a close
    for now. For the Table API we also need only the open for generating
    the code of the serializer.

Now that you're reaching out with such a requirement we might revisit
it. WDYT Arvid?

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-17306

On 02/09/2021 10:37, Sergio Morales wrote:
>
> Thank you for the answer. I’m using the (De)SerializationSchema in
> such way that it has a reference to a custom class that manages some
> resources. In the open() method I’m able to init the resources
> accordingly, but it is really strange that despite providing an
> “open()” there is no counter-part “close()” to release properly the
> same resources. We have in the project a memory leak that could be
> resolved using the close() API approach and now we have to do some
> extra hop by subclassing the SinkFunction to `@Override` the close
> method in order to close itself and the resources managed by the
> De/SerializationSchema too.
>
>  
>
> I was considering to add the close() API method by following a
> previous PR: https://github.com/apache/flink/pull/12006
> <https://github.com/apache/flink/pull/12006> , would it be something
> that the team is willing to accept or should I avoid any effort on
> that part because the previous design document is not valid anymore?
>
>  
>
> Regards,
>
> Sergio.
>
>  
>
> *From: *Caizhi Weng <tsreape...@gmail.com>
> *Date: *Thursday, 2 September 2021 at 04:18
> *To: *Sergio Morales <sdmorale...@gmail.com>
> *Cc: *user <user@flink.apache.org>
> *Subject: *Re: De/Serialization API to tear-down user code
>
>  
>
> Hi!
>
>  
>
> The (De)serializationSchema is only a helper for changing the data
> object to another format. What's your use case? If you're creating a
> (De)serializationSchema for a source / sink you might want to open and
> close the resources in the open / close methods of the source / sink,
> not in the (De)serializationSchema.
>
>  
>
> Sergio Morales <sdmorale...@gmail.com <mailto:sdmorale...@gmail.com>>
> 于2021年9月1日周三下午6:44写道:
>
>     Hi,
>
>     I’m currently working to close some resources while using the
>     SerializationSchema and DeserializationSchema (Flink-core
>     v1.12.1), however, after revising the document outlining the API
>     the methods
>     
> (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
>     
> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988>)
>     the close() operations are missing, and in the master branch I
>     could not find any new version including them:
>
>     *
>     
> https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java
>     
> <https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java>
>
>     *
>     
> https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
>     
> <https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java>
>
>     Thank you for any help.
>
>      
>
>     Regards,
>
>     Sergio.
>
>      
>

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to