Hi Stefan, Thanks for your detailed explanation! It helps a lot!
I think I misunderstood the sentence. I thought “avoiding additional object copying” was the default behavior. Best, Paul Lam > 在 2018年9月26日,17:22,Stefan Richter <s.rich...@data-artisans.com> 写道: > > Hi Paul, > > sure, what I mean is basically what this comment in the blogpost says: „For > Flink’s DataStream API, this setting does in fact not even result in reusing > of objects, but only in *avoiding additional object copying* on the way, > which happens by default as an additional safety net for users.“. In the > context of my previous response, this avoiding of copies happens in the case > where Flink chains operators together. Operator chaining runs operators in > the same TM (JVM) and not going through network connections for the chain but > instead just passes the output object of one operator as the input object to > the next. Without object reuse, Flink will create a deep copy of the output > object before passing the copy down to the next operator as input to avoid > problems that come from two operators sharing the same object. With object > reuse, the same output object instance becomes the input, no deep copy > happens. Deep copies, at least for mutable objects, go through > de/serialization and this introduces latency. That is what my comment was > about. > > Best, > Stefan > >> Am 26.09.2018 um 11:01 schrieb Paul Lam <paullin3...@gmail.com >> <mailto:paullin3...@gmail.com>>: >> >> Hi Stefan, >> >> Sorry for jumping in the discussion. >> >> I’ve seen a blog post [1] of dataArtisans which says that object reuse has >> not much influence on data streams. >> >> > For Flink’s DataStream API, this setting does in fact not even result in >> > reusing of objects, but only in avoiding additional object copying on the >> > way, which happens by default as an additional safety net for users. >> >> So I’m a bit confused here. Could you please give more details about the >> object reuse in data streams? Thanks a lot! >> >> [1] >> https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime >> >> <https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime> >> >> Best, >> Paul Lam >> >> >>> 在 2018年9月20日,23:52,Stefan Richter <s.rich...@data-artisans.com >>> <mailto:s.rich...@data-artisans.com>> 写道: >>> >>> Oh yes exactly, enable is right. >>> >>>> Am 20.09.2018 um 17:48 schrieb Hequn Cheng <chenghe...@gmail.com >>>> <mailto:chenghe...@gmail.com>>: >>>> >>>> Hi Stefan, >>>> >>>> Do you mean enable object reuse? >>>> If you want to reduce latency between chained operators, you can also try >>>> to disable object-reuse: >>>> >>>> On Thu, Sep 20, 2018 at 10:37 PM Stefan Richter >>>> <s.rich...@data-artisans.com <mailto:s.rich...@data-artisans.com>> wrote: >>>> Sorry, forgot the link for reference [1], which is >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency >>>> >>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/datastream_api.html#controlling-latency> >>>> >>>>> Am 20.09.2018 um 16:36 schrieb Stefan Richter >>>>> <s.rich...@data-artisans.com <mailto:s.rich...@data-artisans.com>>: >>>>> >>>>> Hi, >>>>> >>>>> you provide not very much information for this question, e.g. what and >>>>> how exactly your measure, if this is a local or distributed setting etc. >>>>> I assume that it is distributed and that the cause for your observation >>>>> is the buffer timeout, i.e. the maximum time that Flink waits until >>>>> sending a buffer with just one element, which happens to be 100ms by >>>>> default. You can decrease this value to some extend, at to cost of >>>>> potential loss in throughput, but I think even values around 5-10ms are >>>>> ok-ish. See [1] for more details. If you want to reduce latency between >>>>> chained operators, you can also try to disable object-reuse: >>>>> >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> env.getConfig().enableObjectReuse(); >>>>> >>>>> Best, >>>>> Stefan >>>>> >>>>>> Am 20.09.2018 um 16:03 schrieb James Yu <cyu...@gmail.com >>>>>> <mailto:cyu...@gmail.com>>: >>>>>> >>>>>> The previous email seems unable to display embedded images, let me put >>>>>> on the links. >>>>>> Hi, >>>>>> >>>>>> My team and I try to measure total time spent on our flink job and found >>>>>> out that Flink takes 40ms ~ 100ms to proceed from one operator to >>>>>> another. I wonder how can we reduce this transition time. >>>>>> >>>>>> Following DAG represents our job: >>>>>> https://drive.google.com/open?id=1wNR8po-SooAfYtCxU3qUDm4-hm16tclV >>>>>> <https://drive.google.com/open?id=1wNR8po-SooAfYtCxU3qUDm4-hm16tclV> >>>>>> >>>>>> >>>>>> and here is the screenshot of our log: >>>>>> https://drive.google.com/open?id=14PTViZMkhagxeNjOb4R8u3BEr7Ym1xBi >>>>>> <https://drive.google.com/open?id=14PTViZMkhagxeNjOb4R8u3BEr7Ym1xBi> >>>>>> >>>>>> at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map" >>>>>> at 19:37:04.605, the job is entering "Co-Flat Map" >>>>>> at 19:37:04.605, the job is leaving "Co-Flat Map" >>>>>> at 19:37:04.705, the job is entering "Co-Flat Map -> ...." >>>>>> at 19:37:04.708, the job is leaving "Co-Flat Map -> ..." >>>>>> >>>>>> both "Co-Flat Map" finishes merely instantly, while most of the >>>>>> execution time is spent on the transition. Any idea? >>>>>> >>>>>> >>>>>> This is a UTF-8 formatted mail >>>>>> ----------------------------------------------- >>>>>> James C.-C.Yu >>>>>> +886988713275 >>>>>> +8615618429976 >>>>>> >>>>> >>>> >>> >> >