There is already a method on PTransform in Java SDK [1], which is
apparently used only to attach "FeatureMetric" annotation in error
handler. But seems that annotations are populated in
PTransformTranslation and can then be used by the runner. So far so
good. But there seems to be no "specification" about what should be the
key and value is pure bytes, which is not well suited for usage in
user-facing code. There is Annotations.Enum in proto specification which
defines some internal constants as keys, these seem to be arbitrary
strings with no strcture (e.g. we use URNs on other places). Could we
refactor this a bit so that:
1) the keys of annotations would follow URN structure that would help
prevent collisions?
2) change the addAnnotation method on PTransform to accept
TransformAnnotation interface (which would then provide key-value of the
annotation)?
That way, we could add GroupByKey.HUGE constant of type
TransformAnnotation with defined scoped key that would used as
GroupByKey.create().addAnnotation(GroupByKey.HUGE)
which seems to be understandable for pipeline authors?
Jan
[1]
https://beam.apache.org/releases/javadoc/2.61.0/org/apache/beam/sdk/transforms/PTransform.html#addAnnotation-java.lang.String-byte:A-
On 1/14/25 18:23, Robert Bradshaw via dev wrote:
Resource hints were originally conceived as ways to customize the
runtime environment (ram, accelerators, I could imagine things like
custom containers, data dependencies, etc. being useful here too).
They also apply recursively through composite transforms, and get
reflected in the attached environment (which, it might be noted, a GBK
doesn't have).
I would say this case, as well as think like fanout ratio or key
preserving properties, would be most naturally attached to transforms
as annotations, which are arbitrary key-value pairs that can be
attached to any transform, and runners may be free to inspect (and act
on) or ignore. For example, they are used in ManagedIO (on Dataflow)
to understand the semantic meaning of otherwise opaque IOs.
Right now the only way to populate annotations is to override the
annotations() method when subclassing PTransform, but this is a
limitation that would be very nice to remove (e.g. a
with_annotations(key=value) in Python, or withAnnotation(key, value)
in java.
- Robert
On Tue, Jan 14, 2025 at 8:45 AM Jan Lukavský <je...@seznam.cz> wrote:
Generally, these points are aligned with my line of thinking as well. There are
two points that made me start this thread, I didn't stress these explicitly, so
I'll rephrase these:
a) looking at the current usage of ResourceHints, there are hints that
actually affect how runner (and we can say Dataflow, because apparently no
other runner works with these hints, currently) allocate computation nodes to
tasks. There are two hints that actually cannot be (simply) ignored, as they
can result in failures (accelerator, minRam)
b) more hints can be added only by modifying ResourceHints class directly,
adding new hint to the core
c) some hints might make sense only in the context of a runner, because they
can (at least theoretically) affect how a _specific_ runner expands a transform
d) the need to modify core to get some functionality for a specific runner
adds more unnecessary tight coupling between core and runners, my (long-term)
position is that this coupling should be loose, whenever it is possible
Last note is that I like the mentioned examples of "hints" (FanoutRatio, KeyInvariance), but these seem not
to fit well into my definition of "resource". The question is if we could create some more dynamic approach,
because if "hints" can be ignored (not affecting semantics, only execution), this can lead to separating
Pipeline and some "hint configuration", that could be provided at pipeline submission runtime, not compile
time. The ResourceHints could then be special case of those.
On 1/14/25 17:04, Robert Burke wrote:
+1 to Danny's comments.
Technically the place to document these on a broader runner perspective should
be the Runner Capability matrix.
A similar hint would be "FanoutRatio" which can mark transforms that have a
high fanout per element and lead the runner to make different optimization decisions.
Another is KeyInvariance which can also affect optimization (eg. If the key is
known to not change
The only requirement is that the hint doesn't affect correctness, just
performance, should the hint be ignored.
(I'm aware that in principle there are likely some areas they may overlap. To
that I say, that the runner must always prefer correctness when it's ambiguous.)
On Tue, Jan 14, 2025, 7:36 AM Danny McCormick via dev <dev@beam.apache.org>
wrote:
In my opinion, what you are describing fits the intention/current behavior of
resource hints. Resource hints are just hints which allow the runner to
optimize the execution environment where possible, so it should be legal for
any runner to ignore any hints; as long as we're maintaining that behavior, I
think it is ok.
Should we introduce some runner-specific way of creating hints applicable only
to a specific runner?
IMO this just makes the pipeline less portable and doesn't really do anything
to make switching runners easier. Ideally I could have a pipeline with a set of
hints, some of which apply to only Spark, some of which apply to only Flink,
and some of which apply only to Dataflow, and the pipeline should be fully
portable across those environments without making modifications. Your use case
fits this paradigm well since running
input.apply(GroupByKey.create().setResourceHints(ResourceHints.huge())) on any
non-Spark runner should still work fine (assuming the runner has an
out-of-memory GBK implementation by default.
It would, however, be nice to at least have a matrix where we document which
resource hints impact which runners.
Thanks,
Danny
On Tue, Jan 14, 2025 at 6:02 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,
as part of reviewing [1], I came across a question, which might be
solved using resource hints. Seems the usage of these hints is currently
limited, though. I'll explain the case in a few points:
a) a generic implementation of GBK on Spark assumes that all values
fit into memory
b) this can be changed to implementation which uses Spark's internal
sorting mechanism to group by key and window without the need for the
data to fit into memory
c) this optimization can be more expensive for cases where a) is
sufficient
There is currently no simple way of knowing if a GBK fits to memory or
not. This could be solved using ResourceHints, e.g.:
input.apply(GroupByKey.create().setResourceHints(ResourceHints.huge()))
The expansion could then pick only the appropriate transforms, but it
requires changing the generic ResourceHints class. Is this intentional
and the expected approach? We can create pipeline-level hints, but this
seems not correct in this situation. Should we introduce some
runner-specific way of creating hints applicable only to a specific runner?
Alternative option seems to be somewhat similar concept of
"annotations", which seems to be introduced and currently used only for
error handlers.
Thanks for any opinions!
Jan
[1] https://github.com/apache/beam/pull/33521