There is already a method on PTransform in Java SDK [1], which is apparently used only to attach "FeatureMetric" annotation in error handler. But seems that annotations are populated in PTransformTranslation and can then be used by the runner. So far so good. But there seems to be no "specification" about what should be the key and value is pure bytes, which is not well suited for usage in user-facing code. There is Annotations.Enum in proto specification which defines some internal constants as keys, these seem to be arbitrary strings with no strcture (e.g. we use URNs on other places). Could we refactor this a bit so that:

 1) the keys of annotations would follow URN structure that would help prevent collisions?  2) change the addAnnotation method on PTransform to accept TransformAnnotation interface (which would then provide key-value of the annotation)?

That way, we could add GroupByKey.HUGE constant of type TransformAnnotation with defined scoped key that would used as

 GroupByKey.create().addAnnotation(GroupByKey.HUGE)

which seems to be understandable for pipeline authors?

 Jan

[1] https://beam.apache.org/releases/javadoc/2.61.0/org/apache/beam/sdk/transforms/PTransform.html#addAnnotation-java.lang.String-byte:A-

On 1/14/25 18:23, Robert Bradshaw via dev wrote:
Resource hints were originally conceived as ways to customize the
runtime environment (ram, accelerators, I could imagine things like
custom containers, data dependencies, etc. being useful here too).
They also apply recursively through composite transforms, and get
reflected in the attached environment (which, it might be noted, a GBK
doesn't have).

I would say this case, as well as think like fanout ratio or key
preserving properties, would be most naturally attached to transforms
as annotations, which are arbitrary key-value pairs that can be
attached to any transform, and runners may be free to inspect (and act
on) or ignore. For example, they are used in ManagedIO (on Dataflow)
to understand the semantic meaning of otherwise opaque IOs.

Right now the only way to populate annotations is to override the
annotations() method when subclassing PTransform, but this is a
limitation that would be very nice to remove (e.g. a
with_annotations(key=value) in Python, or withAnnotation(key, value)
in java.

- Robert


On Tue, Jan 14, 2025 at 8:45 AM Jan Lukavský <je...@seznam.cz> wrote:
Generally, these points are aligned with my line of thinking as well. There are 
two points that made me start this thread, I didn't stress these explicitly, so 
I'll rephrase these:

  a) looking at the current usage of ResourceHints, there are hints that 
actually affect how runner (and we can say Dataflow, because apparently no 
other runner works with these hints, currently) allocate computation nodes to 
tasks. There are two hints that actually cannot be (simply) ignored, as they 
can result in failures (accelerator, minRam)
  b) more hints can be added only by modifying ResourceHints class directly, 
adding new hint to the core

  c) some hints might make sense only in the context of a runner, because they 
can (at least theoretically) affect how a _specific_ runner expands a transform

  d) the need to modify core to get some functionality for a specific runner 
adds more unnecessary tight coupling between core and runners, my (long-term) 
position is that this coupling should be loose, whenever it is possible

Last note is that I like the mentioned examples of "hints" (FanoutRatio, KeyInvariance), but these seem not 
to fit well into my definition of "resource". The question is if we could create some more dynamic approach, 
because if "hints" can be ignored (not affecting semantics, only execution), this can lead to separating 
Pipeline and some "hint configuration", that could be provided at pipeline submission runtime, not compile 
time. The ResourceHints could then be special case of those.

On 1/14/25 17:04, Robert Burke wrote:

+1 to Danny's comments.

Technically the place to document these on a broader runner perspective should 
be the Runner Capability matrix.

A similar hint would be "FanoutRatio" which can mark transforms that have a 
high fanout per element and lead the runner to make different optimization decisions.

Another is KeyInvariance which can also affect optimization (eg. If the key is 
known to not change

The only requirement is that the hint doesn't affect correctness, just 
performance, should the hint be ignored.

(I'm aware that in principle there are likely some areas they may overlap. To 
that I say, that the runner must always prefer correctness when it's ambiguous.)

On Tue, Jan 14, 2025, 7:36 AM Danny McCormick via dev <dev@beam.apache.org> 
wrote:
In my opinion, what you are describing fits the intention/current behavior of 
resource hints. Resource hints are just hints which allow the runner to 
optimize the execution environment where possible, so it should be legal for 
any runner to ignore any hints; as long as we're maintaining that behavior, I 
think it is ok.

Should we introduce some runner-specific way of creating hints applicable only 
to a specific runner?
IMO this just makes the pipeline less portable and doesn't really do anything 
to make switching runners easier. Ideally I could have a pipeline with a set of 
hints, some of which apply to only Spark, some of which apply to only Flink, 
and some of which apply only to Dataflow, and the pipeline should be fully 
portable across those environments without making modifications. Your use case 
fits this paradigm well since running 
input.apply(GroupByKey.create().setResourceHints(ResourceHints.huge())) on any 
non-Spark runner should still work fine (assuming the runner has an 
out-of-memory GBK implementation by default.

It would, however, be nice to at least have a matrix where we document which 
resource hints impact which runners.

Thanks,
Danny

On Tue, Jan 14, 2025 at 6:02 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,

as part of reviewing [1], I came across a question, which might be
solved using resource hints. Seems the usage of these hints is currently
limited, though. I'll explain the case in a few points:

   a) a generic implementation of GBK on Spark assumes that all values
fit into memory

   b) this can be changed to implementation which uses Spark's internal
sorting mechanism to group by key and window without the need for the
data to fit into memory

   c) this optimization can be more expensive for cases where a) is
sufficient

There is currently no simple way of knowing if a GBK fits to memory or
not. This could be solved using ResourceHints, e.g.:

input.apply(GroupByKey.create().setResourceHints(ResourceHints.huge()))

The expansion could then pick only the appropriate transforms, but it
requires changing the generic ResourceHints class. Is this intentional
and the expected approach? We can create pipeline-level hints, but this
seems not correct in this situation. Should we introduce some
runner-specific way of creating hints applicable only to a specific runner?

Alternative option seems to be somewhat similar concept of
"annotations", which seems to be introduced and currently used only for
error handlers.

Thanks for any opinions!
   Jan

[1] https://github.com/apache/beam/pull/33521

Reply via email to