Thanks for the input. David is right: Beam is also utilizing the accumulators [1]. In this sense you're right that this would require a more wide-spread discussion whether other users would be affected as well. I will give it a bit more thoughts.
[1] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java#L167 On Tue, Aug 29, 2023 at 7:58 AM David Morávek <d...@apache.org> wrote: > AFAIK Apache Beam also used acummulators for metric collection, which is > indeed a major use case. > > I’m not convinced that MetricGroup is fuĺly replacing what acummulators > have to offer though; OperatorCoordinators might be able to rplace > remaining capabilities, but this need bit more thoughts, the missing part > there would be that accumulators are part of the JobResult. > > On Tue 29. 8. 2023 at 6:12, Xintong Song <tonysong...@gmail.com> wrote: > > > Thanks for bringing this up, Matthias. > > > > One thing that a user may achieve with an accumulator but not with a > metric > > group is to programmatically fetch the job execution result, rather than > > outputting the results to an external sink, in attached mode. This can > also > > be achieved by using CollectSink, which is still @Experimental and > > internally uses accumulators. So I guess it depends on 1) how stable we > > think CollectSink is now, and 2) how many users directly use accumulators > > rather than CollectSink and whether their requirements can be fully > covered > > by CollectSink. For 2), we probably also need to involve the user@ ML. > > > > Best, > > > > Xintong > > > > > > > > On Wed, Aug 23, 2023 at 11:00 PM Matthias Pohl > > <matthias.p...@aiven.io.invalid> wrote: > > > > > Hi everyone, > > > I was looking into serializing the ArchivedExecutionGraph for another > > FLIP > > > and came across Accumulators [1] (don't mix that one up with the window > > > accumulators of the Table/SQL API). Accumulators were introduced in > Flink > > > quite a while ago in Statosphere PR #340 [2]. > > > > > > I had a brief chat with Chesnay about it who pointed out that there was > > an > > > intention to use this for collecting metrics in the past. The > Accumulator > > > JavaDoc provides a hint that it was inspired by Hadoop's Counter > concept > > > [3] which also sounds like it is more or less equivalent to Flink's > > > metrics. > > > > > > The Accumulator is currently accessible through the RuntimeContext > > > interface which provides addAccumuator [4] and getAccumulator [5]. > Usages > > > for these messages appear in the following classes: > > > - CollectSinkFunction [6]: Here it's used to collect the final data > when > > > closing the function. This feels like a misuse of the feature. Instead, > > the > > > CollectSink could block the close call until all data was fetched from > > the > > > client program. > > > - DataSet.collect() [7]: Uses CollectHelper utilizes > > > SerializedListAccumulator to collect the final data similarly to > > > CollectSinkFunction > > > - EmptyFieldsCountAccumulator [8] is an example program that counts > empty > > > fields. This could be migrated to MetricGroups > > > - ChecksumHashCodeHelper [9] is used in DataSetUtils where the calling > > > method is marked as deprecated for 2.0 already > > > - CollectOutputFormat [10] uses SerializedListAccumulator analogously > to > > > DataSet.collect(). This class will be removed with the removal of the > > Scala > > > API in 2.0. > > > > > > The initial investigation brings me to the conclusion that we can > remove > > > the Accumulator feature in favor of Metrics and proper collect > > > implementations: That would also help cleaning up the > > > (Archived)ExecutionGraph: IMHO, we should have a clear separation > between > > > Metrics (which are part of the ExecutionGraph) and processed data > (which > > > shouldn't be part of the ExecutionGraph). > > > > > > I'm curious what others think about this. Did I miss a scenario where > > > Accumulators are actually needed? Or is this already part of some other > > 2.0 > > > effort [11] which I missed? I would suggest removing it could be a > > > nice-to-have item for 2.0. > > > > > > Best, > > > Matthias > > > > > > > > > > > > [1] > > > > > > > > > https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java > > > < > > > > > > https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java#L40 > > > > > > > [2] https://github.com/stratosphere/stratosphere/pull/340 > > > [3] > > > > > > > > > https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Counters.html > > > [4] > > > > > > > > > https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L156 > > > [5] > > > > > > > > > https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L165 > > > > > > [6] > > > > > > > > > https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L304 > > > [7] > > > > > > > > > https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L145 > > > [8] > > > > > > > > > https://github.com/apache/flink/blob/aa98c18d2ba975479fcfa4930b0139fa575d303e/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java#L156 > > > [9] > > > > > > > > > https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L256 > > > [10] > > > > > > > > > https://github.com/apache/flink/blob/91d81c427aa6312841ca868d54e8ce6ea721cd60/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/sinks/CollectTableSink.scala#L70 > > > > > > [11] https://cwiki.apache.org/confluence/display/FLINK/2.0+Release > > > > > > -- > > > > > > [image: Aiven] <https://www.aiven.io> > > > > > > *Matthias Pohl* > > > Opensource Software Engineer, *Aiven* > > > matthias.p...@aiven.io <i...@aiven.io> | +49 170 9869525 > > > aiven.io <https://www.aiven.io> | < > > https://www.facebook.com/aivencloud > > > > > > > <https://www.linkedin.com/company/aiven/> < > > > https://twitter.com/aiven_io> > > > *Aiven Deutschland GmbH* > > > Alexanderufer 3-7, 10117 Berlin > > > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > > > Amtsgericht Charlottenburg, HRB 209739 B > > > > > >