Hi Stefan,

Thanks for your detailed explanation! It helps a lot!

I think I misunderstood the sentence. I thought “avoiding additional object 
copying” was the default behavior. 

Best,
Paul Lam

> 在 2018年9月26日,17:22,Stefan Richter <s.rich...@data-artisans.com> 写道:
> 
> Hi Paul,
> 
> sure, what I mean is basically what this comment in the blogpost says: „For 
> Flink’s DataStream API, this setting does in fact not even result in reusing 
> of objects, but only in *avoiding additional object copying* on the way, 
> which happens by default as an additional safety net for users.“. In the 
> context of my previous response, this avoiding of copies happens in the case 
> where Flink chains operators together. Operator chaining runs operators in 
> the same TM (JVM) and not going through network connections for the chain but 
> instead just passes the output object of one operator as the input object to 
> the next. Without object reuse, Flink will create a deep copy of the output 
> object before passing the copy down to the next operator as input to avoid 
> problems that come from two operators sharing the same object. With object 
> reuse, the same output object instance becomes the input, no deep copy 
> happens. Deep copies, at least for mutable objects, go through 
> de/serialization and this introduces latency. That is what my comment was 
> about.
> 
> Best,
> Stefan 
> 
>> Am 26.09.2018 um 11:01 schrieb Paul Lam <paullin3...@gmail.com 
>> <mailto:paullin3...@gmail.com>>:
>> 
>> Hi Stefan,
>> 
>> Sorry for jumping in the discussion. 
>> 
>> I’ve seen a blog post [1] of dataArtisans which says that object reuse has 
>> not much influence on data streams.
>> 
>> >  For Flink’s DataStream API, this setting does in fact not even result in 
>> > reusing of objects, but only in avoiding additional object copying on the 
>> > way, which happens by default as an additional safety net for users.
>> 
>> So I’m a bit confused here. Could you please give more details about the 
>> object reuse in data streams? Thanks a lot!
>> 
>> [1] 
>> https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime
>>  
>> <https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime>
>> 
>> Best,
>> Paul Lam
>> 
>> 
>>> 在 2018年9月20日,23:52,Stefan Richter <s.rich...@data-artisans.com 
>>> <mailto:s.rich...@data-artisans.com>> 写道:
>>> 
>>> Oh yes exactly, enable is right.
>>> 
>>>> Am 20.09.2018 um 17:48 schrieb Hequn Cheng <chenghe...@gmail.com 
>>>> <mailto:chenghe...@gmail.com>>:
>>>> 
>>>> Hi Stefan,
>>>> 
>>>> Do you mean enable object reuse?
>>>> If you want to reduce latency between chained operators, you can also try 
>>>> to disable object-reuse:
>>>> 
>>>> On Thu, Sep 20, 2018 at 10:37 PM Stefan Richter 
>>>> <s.rich...@data-artisans.com <mailto:s.rich...@data-artisans.com>> wrote:
>>>> Sorry, forgot the link for reference [1], which is 
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency>
>>>> 
>>>>> Am 20.09.2018 um 16:36 schrieb Stefan Richter 
>>>>> <s.rich...@data-artisans.com <mailto:s.rich...@data-artisans.com>>:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> you provide not very much information for this question, e.g. what and 
>>>>> how exactly your measure, if this is a local or distributed setting etc. 
>>>>> I assume that it is distributed and that the cause for your observation 
>>>>> is the buffer timeout, i.e. the maximum time that Flink waits until 
>>>>> sending a buffer with just one element, which happens to be 100ms by 
>>>>> default. You can decrease this value to some extend, at to cost of 
>>>>> potential loss in throughput, but I think even values around 5-10ms are 
>>>>> ok-ish. See [1] for more details. If you want to reduce latency between 
>>>>> chained operators, you can also try to disable object-reuse:
>>>>> 
>>>>> StreamExecutionEnvironment env = 
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> env.getConfig().enableObjectReuse();
>>>>> 
>>>>> Best,
>>>>> Stefan
>>>>> 
>>>>>> Am 20.09.2018 um 16:03 schrieb James Yu <cyu...@gmail.com 
>>>>>> <mailto:cyu...@gmail.com>>:
>>>>>> 
>>>>>> The previous email seems unable to display embedded images, let me put 
>>>>>> on the links.
>>>>>> Hi,
>>>>>> 
>>>>>> My team and I try to measure total time spent on our flink job and found 
>>>>>> out that Flink takes 40ms ~ 100ms to proceed from one operator to 
>>>>>> another. I wonder how can we reduce this transition time.
>>>>>> 
>>>>>> Following DAG represents our job:
>>>>>> https://drive.google.com/open?id=1wNR8po-SooAfYtCxU3qUDm4-hm16tclV 
>>>>>> <https://drive.google.com/open?id=1wNR8po-SooAfYtCxU3qUDm4-hm16tclV> 
>>>>>> 
>>>>>> 
>>>>>> and here is the screenshot of our log:
>>>>>> https://drive.google.com/open?id=14PTViZMkhagxeNjOb4R8u3BEr7Ym1xBi 
>>>>>> <https://drive.google.com/open?id=14PTViZMkhagxeNjOb4R8u3BEr7Ym1xBi> 
>>>>>> 
>>>>>> at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
>>>>>> at 19:37:04.605, the job is entering "Co-Flat Map"
>>>>>> at 19:37:04.605, the job is leaving "Co-Flat Map"
>>>>>> at 19:37:04.705, the job is entering "Co-Flat Map -> ...."
>>>>>> at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."
>>>>>> 
>>>>>> both "Co-Flat Map" finishes merely instantly, while most of the 
>>>>>> execution time is spent on the transition. Any idea?
>>>>>> 
>>>>>> 
>>>>>> This is a UTF-8 formatted mail
>>>>>> -----------------------------------------------
>>>>>> James C.-C.Yu
>>>>>> +886988713275
>>>>>> +8615618429976
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to