Hi tanjialiang Thanks for reporting this, I think it's a reasonable requirements. The problem might be introduced during the optimization when reusing the mutable state in Source. So the DecodingFormat#copy can avoid this situation.
But after checking the related code for DynamicTableSink, It's a little different, I think it do not rely on the copy interface now, even the DynamicTableSink#copy is not invoked by the framework. I guess the reason is that the optimization for sink are all static abilities spec which are all done before the optimization. So no copy is happened. Anyway, it's unclear for the implementer to know which stage will the `apply` happen, So +1 for introducing the copy for EncodingFormat and DecodingFormat to align the supporting of deep copy semantic this time. Best, Aitozi. tanjialiang <tanjl_w...@126.com> 于2023年4月13日周四 10:05写道: > > Hi, devs. > > > I'd like to start a discussion about to EncodingFormat and DecondingFormat > provide copy API, which relate to FLINK-31686 [1]. > > > Current, DecodingFormat doesn't support copy(), which makes the > DecodingFormat resued after filter/projection is pushed down. The > EncodingFormat has the same problem if class implements > EncodingFormat#applyWritableMetadata(). So I think EncodingFormat and > DecodingFormat need to provide a copy function, and it should be a deep copy > if format implements > DecodingFormat#applyReadableMetadata/EncodingFormat#applyWritableMetadata/BulkDecodingFormat#applyFilters. > > > > Looking forwards to your feedback. > > > [1]: [https://issues.apache.org/jira/browse/FLINK-31686] > > > Best regards, > tanjialiang > > >