Thanks for bringing this up, Matthias.

One thing that a user may achieve with an accumulator but not with a metric
group is to programmatically fetch the job execution result, rather than
outputting the results to an external sink, in attached mode. This can also
be achieved by using CollectSink, which is still @Experimental and
internally uses accumulators. So I guess it depends on 1) how stable we
think CollectSink is now, and 2) how many users directly use accumulators
rather than CollectSink and whether their requirements can be fully covered
by CollectSink. For 2), we probably also need to involve the user@ ML.

Best,

Xintong



On Wed, Aug 23, 2023 at 11:00 PM Matthias Pohl
<matthias.p...@aiven.io.invalid> wrote:

> Hi everyone,
> I was looking into serializing the ArchivedExecutionGraph for another FLIP
> and came across Accumulators [1] (don't mix that one up with the window
> accumulators of the Table/SQL API). Accumulators were introduced in Flink
> quite a while ago in Statosphere PR #340 [2].
>
> I had a brief chat with Chesnay about it who pointed out that there was an
> intention to use this for collecting metrics in the past. The Accumulator
> JavaDoc provides a hint that it was inspired by Hadoop's Counter concept
> [3] which also sounds like it is more or less equivalent to Flink's
> metrics.
>
> The Accumulator is currently accessible through the RuntimeContext
> interface which provides addAccumuator [4] and getAccumulator [5]. Usages
> for these messages appear in the following classes:
> - CollectSinkFunction [6]: Here it's used to collect the final data when
> closing the function. This feels like a misuse of the feature. Instead, the
> CollectSink could block the close call until all data was fetched from the
> client program.
> - DataSet.collect() [7]: Uses CollectHelper utilizes
> SerializedListAccumulator to collect the final data similarly to
> CollectSinkFunction
> - EmptyFieldsCountAccumulator [8] is an example program that counts empty
> fields. This could be migrated to MetricGroups
> - ChecksumHashCodeHelper [9] is used in DataSetUtils where the calling
> method is marked as deprecated for 2.0 already
> - CollectOutputFormat [10] uses SerializedListAccumulator analogously to
> DataSet.collect(). This class will be removed with the removal of the Scala
> API in 2.0.
>
> The initial investigation brings me to the conclusion that we can remove
> the Accumulator feature in favor of Metrics and proper collect
> implementations: That would also help cleaning up the
> (Archived)ExecutionGraph: IMHO, we should have a clear separation between
> Metrics (which are part of the ExecutionGraph) and processed data (which
> shouldn't be part of the ExecutionGraph).
>
> I'm curious what others think about this. Did I miss a scenario where
> Accumulators are actually needed? Or is this already part of some other 2.0
> effort [11] which I missed? I would suggest removing it could be a
> nice-to-have item for 2.0.
>
> Best,
> Matthias
>
>
>
> [1]
>
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
> <
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java#L40
> >
> [2] https://github.com/stratosphere/stratosphere/pull/340
> [3]
>
> https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Counters.html
> [4]
>
> https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L156
> [5]
>
> https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L165
>
> [6]
>
> https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L304
> [7]
>
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L145
> [8]
>
> https://github.com/apache/flink/blob/aa98c18d2ba975479fcfa4930b0139fa575d303e/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java#L156
> [9]
>
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L256
> [10]
>
> https://github.com/apache/flink/blob/91d81c427aa6312841ca868d54e8ce6ea721cd60/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/sinks/CollectTableSink.scala#L70
>
> [11] https://cwiki.apache.org/confluence/display/FLINK/2.0+Release
>
> --
>
> [image: Aiven] <https://www.aiven.io>
>
> *Matthias Pohl*
> Opensource Software Engineer, *Aiven*
> matthias.p...@aiven.io <i...@aiven.io>   |  +49 170 9869525
> aiven.io <https://www.aiven.io>   |   <https://www.facebook.com/aivencloud
> >
>   <https://www.linkedin.com/company/aiven/>   <
> https://twitter.com/aiven_io>
> *Aiven Deutschland GmbH*
> Alexanderufer 3-7, 10117 Berlin
> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> Amtsgericht Charlottenburg, HRB 209739 B
>

Reply via email to