Thanks for the input. David is right: Beam is also utilizing the
accumulators [1]. In this sense you're right that this would require a more
wide-spread discussion whether other users would be affected as well. I
will give it a bit more thoughts.

[1]
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java#L167

On Tue, Aug 29, 2023 at 7:58 AM David Morávek <d...@apache.org> wrote:

> AFAIK Apache Beam also used acummulators for metric collection, which is
> indeed a major use case.
>
> I’m not convinced that MetricGroup is fuĺly replacing what acummulators
> have to offer though; OperatorCoordinators might be able to rplace
> remaining capabilities, but this need bit more thoughts, the missing part
> there would be that accumulators are part of the JobResult.
>
> On Tue 29. 8. 2023 at 6:12, Xintong Song <tonysong...@gmail.com> wrote:
>
> > Thanks for bringing this up, Matthias.
> >
> > One thing that a user may achieve with an accumulator but not with a
> metric
> > group is to programmatically fetch the job execution result, rather than
> > outputting the results to an external sink, in attached mode. This can
> also
> > be achieved by using CollectSink, which is still @Experimental and
> > internally uses accumulators. So I guess it depends on 1) how stable we
> > think CollectSink is now, and 2) how many users directly use accumulators
> > rather than CollectSink and whether their requirements can be fully
> covered
> > by CollectSink. For 2), we probably also need to involve the user@ ML.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Wed, Aug 23, 2023 at 11:00 PM Matthias Pohl
> > <matthias.p...@aiven.io.invalid> wrote:
> >
> > > Hi everyone,
> > > I was looking into serializing the ArchivedExecutionGraph for another
> > FLIP
> > > and came across Accumulators [1] (don't mix that one up with the window
> > > accumulators of the Table/SQL API). Accumulators were introduced in
> Flink
> > > quite a while ago in Statosphere PR #340 [2].
> > >
> > > I had a brief chat with Chesnay about it who pointed out that there was
> > an
> > > intention to use this for collecting metrics in the past. The
> Accumulator
> > > JavaDoc provides a hint that it was inspired by Hadoop's Counter
> concept
> > > [3] which also sounds like it is more or less equivalent to Flink's
> > > metrics.
> > >
> > > The Accumulator is currently accessible through the RuntimeContext
> > > interface which provides addAccumuator [4] and getAccumulator [5].
> Usages
> > > for these messages appear in the following classes:
> > > - CollectSinkFunction [6]: Here it's used to collect the final data
> when
> > > closing the function. This feels like a misuse of the feature. Instead,
> > the
> > > CollectSink could block the close call until all data was fetched from
> > the
> > > client program.
> > > - DataSet.collect() [7]: Uses CollectHelper utilizes
> > > SerializedListAccumulator to collect the final data similarly to
> > > CollectSinkFunction
> > > - EmptyFieldsCountAccumulator [8] is an example program that counts
> empty
> > > fields. This could be migrated to MetricGroups
> > > - ChecksumHashCodeHelper [9] is used in DataSetUtils where the calling
> > > method is marked as deprecated for 2.0 already
> > > - CollectOutputFormat [10] uses SerializedListAccumulator analogously
> to
> > > DataSet.collect(). This class will be removed with the removal of the
> > Scala
> > > API in 2.0.
> > >
> > > The initial investigation brings me to the conclusion that we can
> remove
> > > the Accumulator feature in favor of Metrics and proper collect
> > > implementations: That would also help cleaning up the
> > > (Archived)ExecutionGraph: IMHO, we should have a clear separation
> between
> > > Metrics (which are part of the ExecutionGraph) and processed data
> (which
> > > shouldn't be part of the ExecutionGraph).
> > >
> > > I'm curious what others think about this. Did I miss a scenario where
> > > Accumulators are actually needed? Or is this already part of some other
> > 2.0
> > > effort [11] which I missed? I would suggest removing it could be a
> > > nice-to-have item for 2.0.
> > >
> > > Best,
> > > Matthias
> > >
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
> > > <
> > >
> >
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java#L40
> > > >
> > > [2] https://github.com/stratosphere/stratosphere/pull/340
> > > [3]
> > >
> > >
> >
> https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Counters.html
> > > [4]
> > >
> > >
> >
> https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L156
> > > [5]
> > >
> > >
> >
> https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L165
> > >
> > > [6]
> > >
> > >
> >
> https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L304
> > > [7]
> > >
> > >
> >
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L145
> > > [8]
> > >
> > >
> >
> https://github.com/apache/flink/blob/aa98c18d2ba975479fcfa4930b0139fa575d303e/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java#L156
> > > [9]
> > >
> > >
> >
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L256
> > > [10]
> > >
> > >
> >
> https://github.com/apache/flink/blob/91d81c427aa6312841ca868d54e8ce6ea721cd60/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/sinks/CollectTableSink.scala#L70
> > >
> > > [11] https://cwiki.apache.org/confluence/display/FLINK/2.0+Release
> > >
> > > --
> > >
> > > [image: Aiven] <https://www.aiven.io>
> > >
> > > *Matthias Pohl*
> > > Opensource Software Engineer, *Aiven*
> > > matthias.p...@aiven.io <i...@aiven.io>   |  +49 170 9869525
> > > aiven.io <https://www.aiven.io>   |   <
> > https://www.facebook.com/aivencloud
> > > >
> > >   <https://www.linkedin.com/company/aiven/>   <
> > > https://twitter.com/aiven_io>
> > > *Aiven Deutschland GmbH*
> > > Alexanderufer 3-7, 10117 Berlin
> > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > > Amtsgericht Charlottenburg, HRB 209739 B
> > >
> >
>

Reply via email to