This all sounds good. I will add my standard comment that this hint is a
property of the data, not the pipeline logic. So it is a different type of
hint than key invariance and fanout ratio).

This is not a problem for the proposed approach, in my opinion.  Obviously,
almost always there will be some pipeline code that is written specifically
for the data in mind.

A couple other examples of "hints" that you can keep in mind are
Combine.withHotkeyFanout, Redistribute (both variants), and
GroupByKey.fewKeys. These were chosen to be expressed as transforms, even
though they are more like hints. I bring up these examples to say that we
don't have to be too pedantic here, because it is already too late :-)

And anyhow a runner is always allowed to implement any piece of a pipeline
with anything that has the same "behavior", whether or not it is expressed
as a hint or some other way (that's the whole point of Beam, and how we
have fusion, combiner lifting, flatten unzipping, multiple runners, etc).

It is probably less appropriate for reusable transforms that are expected
to be used in more than one context. That can be up to transform/pipeline
authors.

And to bring it back around and connect to the above: having an API like
GroupByKey.biggerThanMemory() as an API choice is just as fine with me as
GroupByKey.fewKeys() and it can just be a composite that adds the
hint/annotation to the primitive node. No need to combine API design with
model design / no need to force users to express things in terms of the
lowest level parts of the model.

Kenn

On Wed, Jan 15, 2025 at 12:36 PM Robert Bradshaw via dev <
dev@beam.apache.org> wrote:

> On Wed, Jan 15, 2025 at 7:20 AM Jan Lukavský <je...@seznam.cz> wrote:
> >
> > There is already a method on PTransform in Java SDK [1], which is
> > apparently used only to attach "FeatureMetric" annotation in error
> > handler. But seems that annotations are populated in
> > PTransformTranslation and can then be used by the runner. So far so
> > good. But there seems to be no "specification" about what should be the
> > key and value is pure bytes, which is not well suited for usage in
> > user-facing code. There is Annotations.Enum in proto specification which
> > defines some internal constants as keys, these seem to be arbitrary
> > strings with no strcture (e.g. we use URNs on other places). Could we
> > refactor this a bit so that:
> >
> >   1) the keys of annotations would follow URN structure that would help
> > prevent collisions?
>
> +1, for sure.
>
> >   2) change the addAnnotation method on PTransform to accept
> > TransformAnnotation interface (which would then provide key-value of the
> > annotation)?
> >
> > That way, we could add GroupByKey.HUGE constant of type
> > TransformAnnotation with defined scoped key that would used as
> >
> >   GroupByKey.create().addAnnotation(GroupByKey.HUGE)
> >
> > which seems to be understandable for pipeline authors?
>
> This sounds like a good API (though we can't remove the old variant
> for backwards compatibility reasons).
>
> > [1]
> >
> https://beam.apache.org/releases/javadoc/2.61.0/org/apache/beam/sdk/transforms/PTransform.html#addAnnotation-java.lang.String-byte:A-
> >
> > On 1/14/25 18:23, Robert Bradshaw via dev wrote:
> > > Resource hints were originally conceived as ways to customize the
> > > runtime environment (ram, accelerators, I could imagine things like
> > > custom containers, data dependencies, etc. being useful here too).
> > > They also apply recursively through composite transforms, and get
> > > reflected in the attached environment (which, it might be noted, a GBK
> > > doesn't have).
> > >
> > > I would say this case, as well as think like fanout ratio or key
> > > preserving properties, would be most naturally attached to transforms
> > > as annotations, which are arbitrary key-value pairs that can be
> > > attached to any transform, and runners may be free to inspect (and act
> > > on) or ignore. For example, they are used in ManagedIO (on Dataflow)
> > > to understand the semantic meaning of otherwise opaque IOs.
> > >
> > > Right now the only way to populate annotations is to override the
> > > annotations() method when subclassing PTransform, but this is a
> > > limitation that would be very nice to remove (e.g. a
> > > with_annotations(key=value) in Python, or withAnnotation(key, value)
> > > in java.
> > >
> > > - Robert
> > >
> > >
> > > On Tue, Jan 14, 2025 at 8:45 AM Jan Lukavský <je...@seznam.cz> wrote:
> > >> Generally, these points are aligned with my line of thinking as well.
> There are two points that made me start this thread, I didn't stress these
> explicitly, so I'll rephrase these:
> > >>
> > >>   a) looking at the current usage of ResourceHints, there are hints
> that actually affect how runner (and we can say Dataflow, because
> apparently no other runner works with these hints, currently) allocate
> computation nodes to tasks. There are two hints that actually cannot be
> (simply) ignored, as they can result in failures (accelerator, minRam)
> > >>   b) more hints can be added only by modifying ResourceHints class
> directly, adding new hint to the core
> > >>
> > >>   c) some hints might make sense only in the context of a runner,
> because they can (at least theoretically) affect how a _specific_ runner
> expands a transform
> > >>
> > >>   d) the need to modify core to get some functionality for a specific
> runner adds more unnecessary tight coupling between core and runners, my
> (long-term) position is that this coupling should be loose, whenever it is
> possible
> > >>
> > >> Last note is that I like the mentioned examples of "hints"
> (FanoutRatio, KeyInvariance), but these seem not to fit well into my
> definition of "resource". The question is if we could create some more
> dynamic approach, because if "hints" can be ignored (not affecting
> semantics, only execution), this can lead to separating Pipeline and some
> "hint configuration", that could be provided at pipeline submission
> runtime, not compile time. The ResourceHints could then be special case of
> those.
> > >>
> > >> On 1/14/25 17:04, Robert Burke wrote:
> > >>
> > >> +1 to Danny's comments.
> > >>
> > >> Technically the place to document these on a broader runner
> perspective should be the Runner Capability matrix.
> > >>
> > >> A similar hint would be "FanoutRatio" which can mark transforms that
> have a high fanout per element and lead the runner to make different
> optimization decisions.
> > >>
> > >> Another is KeyInvariance which can also affect optimization (eg. If
> the key is known to not change
> > >>
> > >> The only requirement is that the hint doesn't affect correctness,
> just performance, should the hint be ignored.
> > >>
> > >> (I'm aware that in principle there are likely some areas they may
> overlap. To that I say, that the runner must always prefer correctness when
> it's ambiguous.)
> > >>
> > >> On Tue, Jan 14, 2025, 7:36 AM Danny McCormick via dev <
> dev@beam.apache.org> wrote:
> > >>> In my opinion, what you are describing fits the intention/current
> behavior of resource hints. Resource hints are just hints which allow the
> runner to optimize the execution environment where possible, so it should
> be legal for any runner to ignore any hints; as long as we're maintaining
> that behavior, I think it is ok.
> > >>>
> > >>>> Should we introduce some runner-specific way of creating hints
> applicable only to a specific runner?
> > >>> IMO this just makes the pipeline less portable and doesn't really do
> anything to make switching runners easier. Ideally I could have a pipeline
> with a set of hints, some of which apply to only Spark, some of which apply
> to only Flink, and some of which apply only to Dataflow, and the pipeline
> should be fully portable across those environments without making
> modifications. Your use case fits this paradigm well since running
> input.apply(GroupByKey.create().setResourceHints(ResourceHints.huge())) on
> any non-Spark runner should still work fine (assuming the runner has an
> out-of-memory GBK implementation by default.
> > >>>
> > >>> It would, however, be nice to at least have a matrix where we
> document which resource hints impact which runners.
> > >>>
> > >>> Thanks,
> > >>> Danny
> > >>>
> > >>> On Tue, Jan 14, 2025 at 6:02 AM Jan Lukavský <je...@seznam.cz>
> wrote:
> > >>>> Hi,
> > >>>>
> > >>>> as part of reviewing [1], I came across a question, which might be
> > >>>> solved using resource hints. Seems the usage of these hints is
> currently
> > >>>> limited, though. I'll explain the case in a few points:
> > >>>>
> > >>>>    a) a generic implementation of GBK on Spark assumes that all
> values
> > >>>> fit into memory
> > >>>>
> > >>>>    b) this can be changed to implementation which uses Spark's
> internal
> > >>>> sorting mechanism to group by key and window without the need for
> the
> > >>>> data to fit into memory
> > >>>>
> > >>>>    c) this optimization can be more expensive for cases where a) is
> > >>>> sufficient
> > >>>>
> > >>>> There is currently no simple way of knowing if a GBK fits to memory
> or
> > >>>> not. This could be solved using ResourceHints, e.g.:
> > >>>>
> > >>>>
> input.apply(GroupByKey.create().setResourceHints(ResourceHints.huge()))
> > >>>>
> > >>>> The expansion could then pick only the appropriate transforms, but
> it
> > >>>> requires changing the generic ResourceHints class. Is this
> intentional
> > >>>> and the expected approach? We can create pipeline-level hints, but
> this
> > >>>> seems not correct in this situation. Should we introduce some
> > >>>> runner-specific way of creating hints applicable only to a specific
> runner?
> > >>>>
> > >>>> Alternative option seems to be somewhat similar concept of
> > >>>> "annotations", which seems to be introduced and currently used only
> for
> > >>>> error handlers.
> > >>>>
> > >>>> Thanks for any opinions!
> > >>>>    Jan
> > >>>>
> > >>>> [1] https://github.com/apache/beam/pull/33521
> > >>>>
>

Reply via email to