Hi Stephan,

In terms of the performance concern, please see my understanding below.

## Breaking the pipeline v.s. adding a sink.
If two operators are initially chained, they will belong to the same stage
in the DAG and the same task, therefore the main processing path will just
have one task without serde in the middle. I was trying to see the overhead
of adding a new sink or breaking the pipeline.

  - Adding a new sink introduces serialization cost, and potentially
network IO if the sink writes to a remote storage instead of local file
system.
  - Breaking the pipeline introduces a new stage, a new task, additional
serialization / deserialization cost and potential network IO.

Therefore I thought that adding a new sink will have better performance
than breaking the pipeline because it has lower cost in general.
Please let me know if I missed something.

The above scenarios assume that users want to cache the result in the
middle of an operator chain, but not at the shuffle boundary. If the cache
is at the shuffle boundary, it would duplicate the records unless the
pluggable shuffle service is also the pluggable intermediate result storage
at the same time. In that case, there will be just one copy of the records,
but could be read by either the pluggable shuffle service or the pluggable
intermediate result storage.

## Reading a subset of record
You are right. Any additional indexing / compression / columnizing of the
raw intermediate result introduces overhead. So it only makes sense if the
saving is greater than the overhead. One such example is iteration. In that
case, the cached intermediate results may be read for some undefined times
and the initial overhead of columnizing would be worthwhile.


In general, I am with you that this could be put in an external library. It
is achievable if we only address the cross-session intermediate result
sharing. However, an external library is not sufficient to provide
optimized in-session intermediate result sharing. This is mainly because
when the job exits, RM needs to clean up the intermediate results. So
basically we are choosing between the following two options.

Option 1: in-session sharing is only served by shuffle service, no special
performance optimization.
Option 2: In-session sharing is served by shuffle service by default,
performance optimization can be provided by pluggable intermediate result
storage.

It would be helpful for us to first agree on whether we want to have
performance optimization for in-session intermediate result sharing? If
not, option 1 is good enough. Otherwise, we would need something pluggable
for the in-session intermediate result.

Thoughts?

Thanks,

Jiangjie (Becket) Qin




On Sun, Sep 22, 2019 at 8:44 PM Stephan Ewen <se...@apache.org> wrote:

> ## About the improvements you mentioned in (1)
>
>   - I am not sure that this helps to improve performance by avoiding to
> break the pipeline.
>     Attaching an additional sink, would in virtually any case add even more
> overhead than the pipeline breaking.
>     What is your reasoning why it would be faster, all in all?
>
>   - About reading only a subset of the records:
>      - If this is about reading the data once or twice, then
> columnarizing/indexing/compressing the data is more expensive than just
> reading it twice more.
>      - This means turning the mechanism into something like materialized
> view matching, rather than result caching. That should happen in different
> parts of the stack (view matching needs schema, semantics, etc.). I am not
> sure mixing both is even a good idea.
>
>
> ## The way I see the trade-offs are:
>
> Pro in core Flink:
>   - Small improvement to API experience, compared to a library
>
> Contra in core Flink:
>   - added API complexity, maintenance and evolution overhead
>   - not clear what impacts mixing materialized view matching and result
> caching has on the system architecture
>   - Not yet a frequent use case, possibly a frequent use case in the
> future.
>   - Starting as a library allows for merging into the core later when this
> use case becomes major and experience improvement proves big.
>
> Unclear
>   - is breaking the pipeline by introducing a blocking intermediate result
> really worse than duplicating the data into an additional sink?
>
>
> ==> Especially because so we can still make it part of Flink later once the
> use case and approach are a bit more fleshed out, this looks like a strong
> case for starting with a library approach here.
>
> Best,
> Stephan
>
>
>
> On Thu, Sep 19, 2019 at 2:41 AM Becket Qin <becket....@gmail.com> wrote:
>
> > Hi Stephan,
> >
> > Sorry for the belated reply. You are right that the functionality
> proposed
> > in this FLIP can be implemented out of the Flink core as an ecosystem
> > project.
> >
> > The main motivation of this FLIP is two folds:
> >
> > 1. Improve the performance of intermediate result sharing in the same
> > session.
> > Using the internal shuffle service to store cached result has two
> potential
> > performance problems.
> >   a) the cached intermediate results may break the operator chaining due
> to
> > the addition of BLOCKING_PERSISTENT edge.
> >   b) the downstream processor must read all the records in intermediate
> > results to process.
> >
> > A pluggable intermediate result storage will help address both of the
> > problem. Adding a sink will not break chaining, but just ensure cached
> > logical node will not be optimized away. The pluggable storage can help
> > improve the performance by making the intermediate results filterable /
> > projectable, etc. Alternatively we can make the shuffle service more
> > sophisticated, but it may complicate things and is not necessary for the
> > normal shuffles.
> >
> > This motivation seems difficult to be addressed as an external library on
> > top of Flink core, mainly because the in-session intermediate result
> > cleanup may need participation of RM to achieve fault tolerance. Also,
> > having an external library essentially introduces another way to cache
> the
> > in-session intermediate results.
> >
> > 2. Cross session intermediate result sharing.
> > As you said, this can be implemented as an external library. The only
> > difference is that users may need to deal with another set of API, but
> that
> > seems OK.
> >
> >
> > So for this FLIP, it would be good to see whether we think motivation 1
> is
> > worth addressing or not.
> >
> > What do you think?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Thu, Aug 15, 2019 at 11:42 PM Stephan Ewen <se...@apache.org> wrote:
> >
> > > Sorry for the late response. So many FLIPs these days.
> > >
> > > I am a bit unsure about the motivation here, and that this need to be a
> > > part of Flink. It sounds like this can be perfectly built around Flink
> > as a
> > > minimal library on top of it, without any change in the core APIs or
> > > runtime.
> > >
> > > The proposal to handle "caching intermediate results" (to make them
> > > reusable across jobs in a session), and "writing them in different
> > formats
> > > / indexing them" doesn't sound like it should be the same mechanism.
> > >
> > >   - The caching part is a transparent low-level primitive. It avoid
> > > re-executing a part of the job graph, but otherwise is completely
> > > transparent to the consumer job.
> > >
> > >   - Writing data out in a sink, compressing/indexing it and then
> reading
> > it
> > > in another job is also a way of reusing a previous result, but on a
> > > completely different abstraction level. It is not the same intermediate
> > > result any more. When the consumer reads from it and applies predicate
> > > pushdown, etc. then the consumer job looks completely different from a
> > job
> > > that consumed the original result. It hence needs to be solved on the
> API
> > > level via a sink and a source.
> > >
> > > I would suggest to keep these concepts separate: Caching (possibly
> > > automatically) for jobs in a session, and long term writing/sharing of
> > data
> > > sets.
> > >
> > > Solving the "long term writing/sharing" in a library rather than in the
> > > runtime also has the advantage of not pushing yet more stuff into
> Flink's
> > > core, which I believe is also an important criterion.
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > > On Thu, Jul 25, 2019 at 4:53 AM Xuannan Su <suxuanna...@gmail.com>
> > wrote:
> > >
> > > > Hi folks,
> > > >
> > > > I would like to start the FLIP discussion thread about the pluggable
> > > > intermediate result storage.
> > > >
> > > > This is phase 2 of FLIP-36: Support Interactive Programming in Flink
> > Skip
> > > > to end of metadata. While the FLIP-36 provides a default
> implementation
> > > of
> > > > the intermediate result storage using the shuffle service, we would
> > like
> > > to
> > > > make the intermediate result storage pluggable so that the user can
> > > easily
> > > > swap the storage.
> > > >
> > > > We are looking forward to your thought!
> > > >
> > > > The FLIP link is the following:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-48%3A+Pluggable+Intermediate+Result+Storage
> > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-48:+Pluggable+Intermediate+Result+Storage
> > > > >
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > >
> >
>

Reply via email to