Kenn
On Wed, Jan 15, 2025 at 12:36 PM Robert Bradshaw via dev
<dev@beam.apache.org> wrote:
On Wed, Jan 15, 2025 at 7:20 AM Jan Lukavský <je...@seznam.cz> wrote:
>
> There is already a method on PTransform in Java SDK [1], which is
> apparently used only to attach "FeatureMetric" annotation in error
> handler. But seems that annotations are populated in
> PTransformTranslation and can then be used by the runner. So far so
> good. But there seems to be no "specification" about what should
be the
> key and value is pure bytes, which is not well suited for usage in
> user-facing code. There is Annotations.Enum in proto
specification which
> defines some internal constants as keys, these seem to be arbitrary
> strings with no strcture (e.g. we use URNs on other places).
Could we
> refactor this a bit so that:
>
> 1) the keys of annotations would follow URN structure that
would help
> prevent collisions?
+1, for sure.
> 2) change the addAnnotation method on PTransform to accept
> TransformAnnotation interface (which would then provide
key-value of the
> annotation)?
>
> That way, we could add GroupByKey.HUGE constant of type
> TransformAnnotation with defined scoped key that would used as
>
> GroupByKey.create().addAnnotation(GroupByKey.HUGE)
>
> which seems to be understandable for pipeline authors?
This sounds like a good API (though we can't remove the old variant
for backwards compatibility reasons).
> [1]
>
https://beam.apache.org/releases/javadoc/2.61.0/org/apache/beam/sdk/transforms/PTransform.html#addAnnotation-java.lang.String-byte:A-
>
> On 1/14/25 18:23, Robert Bradshaw via dev wrote:
> > Resource hints were originally conceived as ways to customize the
> > runtime environment (ram, accelerators, I could imagine things
like
> > custom containers, data dependencies, etc. being useful here too).
> > They also apply recursively through composite transforms, and get
> > reflected in the attached environment (which, it might be
noted, a GBK
> > doesn't have).
> >
> > I would say this case, as well as think like fanout ratio or key
> > preserving properties, would be most naturally attached to
transforms
> > as annotations, which are arbitrary key-value pairs that can be
> > attached to any transform, and runners may be free to inspect
(and act
> > on) or ignore. For example, they are used in ManagedIO (on
Dataflow)
> > to understand the semantic meaning of otherwise opaque IOs.
> >
> > Right now the only way to populate annotations is to override the
> > annotations() method when subclassing PTransform, but this is a
> > limitation that would be very nice to remove (e.g. a
> > with_annotations(key=value) in Python, or withAnnotation(key,
value)
> > in java.
> >
> > - Robert
> >
> >
> > On Tue, Jan 14, 2025 at 8:45 AM Jan Lukavský <je...@seznam.cz>
wrote:
> >> Generally, these points are aligned with my line of thinking
as well. There are two points that made me start this thread, I
didn't stress these explicitly, so I'll rephrase these:
> >>
> >> a) looking at the current usage of ResourceHints, there are
hints that actually affect how runner (and we can say Dataflow,
because apparently no other runner works with these hints,
currently) allocate computation nodes to tasks. There are two
hints that actually cannot be (simply) ignored, as they can result
in failures (accelerator, minRam)
> >> b) more hints can be added only by modifying ResourceHints
class directly, adding new hint to the core
> >>
> >> c) some hints might make sense only in the context of a
runner, because they can (at least theoretically) affect how a
_specific_ runner expands a transform
> >>
> >> d) the need to modify core to get some functionality for a
specific runner adds more unnecessary tight coupling between core
and runners, my (long-term) position is that this coupling should
be loose, whenever it is possible
> >>
> >> Last note is that I like the mentioned examples of "hints"
(FanoutRatio, KeyInvariance), but these seem not to fit well into
my definition of "resource". The question is if we could create
some more dynamic approach, because if "hints" can be ignored (not
affecting semantics, only execution), this can lead to separating
Pipeline and some "hint configuration", that could be provided at
pipeline submission runtime, not compile time. The ResourceHints
could then be special case of those.
> >>
> >> On 1/14/25 17:04, Robert Burke wrote:
> >>
> >> +1 to Danny's comments.
> >>
> >> Technically the place to document these on a broader runner
perspective should be the Runner Capability matrix.
> >>
> >> A similar hint would be "FanoutRatio" which can mark
transforms that have a high fanout per element and lead the runner
to make different optimization decisions.
> >>
> >> Another is KeyInvariance which can also affect optimization
(eg. If the key is known to not change
> >>
> >> The only requirement is that the hint doesn't affect
correctness, just performance, should the hint be ignored.
> >>
> >> (I'm aware that in principle there are likely some areas they
may overlap. To that I say, that the runner must always prefer
correctness when it's ambiguous.)
> >>
> >> On Tue, Jan 14, 2025, 7:36 AM Danny McCormick via dev
<dev@beam.apache.org> wrote:
> >>> In my opinion, what you are describing fits the
intention/current behavior of resource hints. Resource hints are
just hints which allow the runner to optimize the execution
environment where possible, so it should be legal for any runner
to ignore any hints; as long as we're maintaining that behavior, I
think it is ok.
> >>>
> >>>> Should we introduce some runner-specific way of creating
hints applicable only to a specific runner?
> >>> IMO this just makes the pipeline less portable and doesn't
really do anything to make switching runners easier. Ideally I
could have a pipeline with a set of hints, some of which apply to
only Spark, some of which apply to only Flink, and some of which
apply only to Dataflow, and the pipeline should be fully portable
across those environments without making modifications. Your use
case fits this paradigm well since running
input.apply(GroupByKey.create().setResourceHints(ResourceHints.huge()))
on any non-Spark runner should still work fine (assuming the
runner has an out-of-memory GBK implementation by default.
> >>>
> >>> It would, however, be nice to at least have a matrix where
we document which resource hints impact which runners.
> >>>
> >>> Thanks,
> >>> Danny
> >>>
> >>> On Tue, Jan 14, 2025 at 6:02 AM Jan Lukavský
<je...@seznam.cz> wrote:
> >>>> Hi,
> >>>>
> >>>> as part of reviewing [1], I came across a question, which
might be
> >>>> solved using resource hints. Seems the usage of these hints
is currently
> >>>> limited, though. I'll explain the case in a few points:
> >>>>
> >>>> a) a generic implementation of GBK on Spark assumes that
all values
> >>>> fit into memory
> >>>>
> >>>> b) this can be changed to implementation which uses
Spark's internal
> >>>> sorting mechanism to group by key and window without the
need for the
> >>>> data to fit into memory
> >>>>
> >>>> c) this optimization can be more expensive for cases
where a) is
> >>>> sufficient
> >>>>
> >>>> There is currently no simple way of knowing if a GBK fits
to memory or
> >>>> not. This could be solved using ResourceHints, e.g.:
> >>>>
> >>>>
input.apply(GroupByKey.create().setResourceHints(ResourceHints.huge()))
> >>>>
> >>>> The expansion could then pick only the appropriate
transforms, but it
> >>>> requires changing the generic ResourceHints class. Is this
intentional
> >>>> and the expected approach? We can create pipeline-level
hints, but this
> >>>> seems not correct in this situation. Should we introduce some
> >>>> runner-specific way of creating hints applicable only to a
specific runner?
> >>>>
> >>>> Alternative option seems to be somewhat similar concept of
> >>>> "annotations", which seems to be introduced and currently
used only for
> >>>> error handlers.
> >>>>
> >>>> Thanks for any opinions!
> >>>> Jan
> >>>>
> >>>> [1] https://github.com/apache/beam/pull/33521
> >>>>