You will get into trouble if some Op caches the data locally and its downstream 
Op in the same operator chain modifies the data in-place. AFAIK, all Flink SQL 
operators handle it properly (deep copy when caching is needed), but you should 
be careful with UDFs and custom connectors.

Example: suppose we have two chained operators A->B with object reuse enabled,  
A caches the data in a local hash map, and B modifies data in-place. Now when a 
single row arrives, Op A first caches it in the map, the same object is 
directly passed to Op B, and Op B modifies it in-place. Note that the entry 
cached in Op A is also updated as object is reused.

Best,
Zhanghao Chen
________________________________
From: Winterchill <809025...@qq.com.INVALID>
Sent: Thursday, April 24, 2025 19:01
To: dev <dev@flink.apache.org>
Subject: Re: [DISCUSS] FlinkSQL support enableObjectReuse to optimize 
CopyingChainingOutput

Thanks.I read about it in docs and have some questions:


https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/execution_configuration/


The Flink documentation indicates that enabling `enableObjectReuse` may cause 
bugs, but the description is not detailed enough.&nbsp;
In what situations would it lead to bugs? Can we use this param in production 
environment?



---Original---
From: "Zhanghao Chen"<zhanghao.c...@outlook.com&gt;
Date: Thu, Apr 24, 2025 11:12 AM
To: "dev"<dev@flink.apache.org&gt;;
Subject: Re: [DISCUSS] FlinkSQL support enableObjectReuse to optimize 
CopyingChainingOutput


Simply setting config pipeline.object-reuse=true should work for that.

Best,
Zhanghao Chen
________________________________
From: Winterchill <809025...@qq.com.INVALID&gt;
Sent: Wednesday, April 23, 2025 20:12
To: dev
Subject: [DISCUSS] FlinkSQL support enableObjectReuse to optimize 
CopyingChainingOutput

During our analysis of FlinkSQL, we found that the FlinkSQL data stream heavily 
uses `CopyingChainingOutput` for operator-to-operator data transmission.


In the process, we observed that some operators, such as `WatermarkAssigner`, 
do not necessarily require the deep copy logic of `CopyingChainingOutput`. 
Additionally, we noticed that FlinkSQL does not support the `enableObjectReuse` 
parameter in `DataStream`.


Based on this, we have the following questions:
1. Will FlinkSQL support `enableObjectReuse` in the future?
2. We would like try to modify FlinkSQL's internal logic to eliminate 
unnecessary deep copies. How can we do this safely? Do you have any suggestions?

Reply via email to