Hi Paul,

sure, what I mean is basically what this comment in the blogpost says: „For 
Flink’s DataStream API, this setting does in fact not even result in reusing of 
objects, but only in *avoiding additional object copying* on the way, which 
happens by default as an additional safety net for users.“. In the context of 
my previous response, this avoiding of copies happens in the case where Flink 
chains operators together. Operator chaining runs operators in the same TM 
(JVM) and not going through network connections for the chain but instead just 
passes the output object of one operator as the input object to the next. 
Without object reuse, Flink will create a deep copy of the output object before 
passing the copy down to the next operator as input to avoid problems that come 
from two operators sharing the same object. With object reuse, the same output 
object instance becomes the input, no deep copy happens. Deep copies, at least 
for mutable objects, go through de/serialization and this introduces latency. 
That is what my comment was about.

Best,
Stefan 

> Am 26.09.2018 um 11:01 schrieb Paul Lam <paullin3...@gmail.com>:
> 
> Hi Stefan,
> 
> Sorry for jumping in the discussion. 
> 
> I’ve seen a blog post [1] of dataArtisans which says that object reuse has 
> not much influence on data streams.
> 
> >  For Flink’s DataStream API, this setting does in fact not even result in 
> > reusing of objects, but only in avoiding additional object copying on the 
> > way, which happens by default as an additional safety net for users.
> 
> So I’m a bit confused here. Could you please give more details about the 
> object reuse in data streams? Thanks a lot!
> 
> [1] 
> https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime
>  
> <https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime>
> 
> Best,
> Paul Lam
> 
> 
>> 在 2018年9月20日,23:52,Stefan Richter <s.rich...@data-artisans.com 
>> <mailto:s.rich...@data-artisans.com>> 写道:
>> 
>> Oh yes exactly, enable is right.
>> 
>>> Am 20.09.2018 um 17:48 schrieb Hequn Cheng <chenghe...@gmail.com 
>>> <mailto:chenghe...@gmail.com>>:
>>> 
>>> Hi Stefan,
>>> 
>>> Do you mean enable object reuse?
>>> If you want to reduce latency between chained operators, you can also try 
>>> to disable object-reuse:
>>> 
>>> On Thu, Sep 20, 2018 at 10:37 PM Stefan Richter 
>>> <s.rich...@data-artisans.com <mailto:s.rich...@data-artisans.com>> wrote:
>>> Sorry, forgot the link for reference [1], which is 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency>
>>> 
>>>> Am 20.09.2018 um 16:36 schrieb Stefan Richter <s.rich...@data-artisans.com 
>>>> <mailto:s.rich...@data-artisans.com>>:
>>>> 
>>>> Hi,
>>>> 
>>>> you provide not very much information for this question, e.g. what and how 
>>>> exactly your measure, if this is a local or distributed setting etc. I 
>>>> assume that it is distributed and that the cause for your observation is 
>>>> the buffer timeout, i.e. the maximum time that Flink waits until sending a 
>>>> buffer with just one element, which happens to be 100ms by default. You 
>>>> can decrease this value to some extend, at to cost of potential loss in 
>>>> throughput, but I think even values around 5-10ms are ok-ish. See [1] for 
>>>> more details. If you want to reduce latency between chained operators, you 
>>>> can also try to disable object-reuse:
>>>> 
>>>> StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> env.getConfig().enableObjectReuse();
>>>> 
>>>> Best,
>>>> Stefan
>>>> 
>>>>> Am 20.09.2018 um 16:03 schrieb James Yu <cyu...@gmail.com 
>>>>> <mailto:cyu...@gmail.com>>:
>>>>> 
>>>>> The previous email seems unable to display embedded images, let me put on 
>>>>> the links.
>>>>> Hi,
>>>>> 
>>>>> My team and I try to measure total time spent on our flink job and found 
>>>>> out that Flink takes 40ms ~ 100ms to proceed from one operator to 
>>>>> another. I wonder how can we reduce this transition time.
>>>>> 
>>>>> Following DAG represents our job:
>>>>> https://drive.google.com/open?id=1wNR8po-SooAfYtCxU3qUDm4-hm16tclV 
>>>>> <https://drive.google.com/open?id=1wNR8po-SooAfYtCxU3qUDm4-hm16tclV> 
>>>>> 
>>>>> 
>>>>> and here is the screenshot of our log:
>>>>> https://drive.google.com/open?id=14PTViZMkhagxeNjOb4R8u3BEr7Ym1xBi 
>>>>> <https://drive.google.com/open?id=14PTViZMkhagxeNjOb4R8u3BEr7Ym1xBi> 
>>>>> 
>>>>> at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
>>>>> at 19:37:04.605, the job is entering "Co-Flat Map"
>>>>> at 19:37:04.605, the job is leaving "Co-Flat Map"
>>>>> at 19:37:04.705, the job is entering "Co-Flat Map -> ...."
>>>>> at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."
>>>>> 
>>>>> both "Co-Flat Map" finishes merely instantly, while most of the execution 
>>>>> time is spent on the transition. Any idea?
>>>>> 
>>>>> 
>>>>> This is a UTF-8 formatted mail
>>>>> -----------------------------------------------
>>>>> James C.-C.Yu
>>>>> +886988713275
>>>>> +8615618429976
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to