On 1/20/25 18:18, Kenneth Knowles wrote:
This all sounds good. I will add my standard comment that this hint is a property of the data, not the pipeline logic. So it is a different type of hint than key invariance and fanout ratio).

This is not a problem for the proposed approach, in my opinion.  Obviously, almost always there will be some pipeline code that is written specifically for the data in mind.

A couple other examples of "hints" that you can keep in mind are Combine.withHotkeyFanout, Redistribute (both variants), and GroupByKey.fewKeys. These were chosen to be expressed as transforms, even though they are more like hints. I bring up these examples to say that we don't have to be too pedantic here, because it is already too late :-)

And anyhow a runner is always allowed to implement any piece of a pipeline with anything that has the same "behavior", whether or not it is expressed as a hint or some other way (that's the whole point of Beam, and how we have fusion, combiner lifting, flatten unzipping, multiple runners, etc).

It is probably less appropriate for reusable transforms that are expected to be used in more than one context. That can be up to transform/pipeline authors.

And to bring it back around and connect to the above: having an API like GroupByKey.biggerThanMemory() as an API choice is just as fine with me as GroupByKey.fewKeys() and it can just be a composite that adds the hint/annotation to the primitive node. No need to combine API design with model design / no need to force users to express things in terms of the lowest level parts of the model.

I was thinking about that as well. But there is a problem. The GBK is often part of some other transform (e.g. FileIO, but can be any other). We need a way to (optionally) change the behavior of a transform that is part of some outer composite. Therefore this should work for

FileIO.write(...).addAnnotation(GroupByKey.HUGE)

as well.


Kenn

On Wed, Jan 15, 2025 at 12:36 PM Robert Bradshaw via dev <dev@beam.apache.org> wrote:

    On Wed, Jan 15, 2025 at 7:20 AM Jan Lukavský <je...@seznam.cz> wrote:
    >
    > There is already a method on PTransform in Java SDK [1], which is
    > apparently used only to attach "FeatureMetric" annotation in error
    > handler. But seems that annotations are populated in
    > PTransformTranslation and can then be used by the runner. So far so
    > good. But there seems to be no "specification" about what should
    be the
    > key and value is pure bytes, which is not well suited for usage in
    > user-facing code. There is Annotations.Enum in proto
    specification which
    > defines some internal constants as keys, these seem to be arbitrary
    > strings with no strcture (e.g. we use URNs on other places).
    Could we
    > refactor this a bit so that:
    >
    >   1) the keys of annotations would follow URN structure that
    would help
    > prevent collisions?

    +1, for sure.

    >   2) change the addAnnotation method on PTransform to accept
    > TransformAnnotation interface (which would then provide
    key-value of the
    > annotation)?
    >
    > That way, we could add GroupByKey.HUGE constant of type
    > TransformAnnotation with defined scoped key that would used as
    >
    >   GroupByKey.create().addAnnotation(GroupByKey.HUGE)
    >
    > which seems to be understandable for pipeline authors?

    This sounds like a good API (though we can't remove the old variant
    for backwards compatibility reasons).

    > [1]
    >
    
https://beam.apache.org/releases/javadoc/2.61.0/org/apache/beam/sdk/transforms/PTransform.html#addAnnotation-java.lang.String-byte:A-
    >
    > On 1/14/25 18:23, Robert Bradshaw via dev wrote:
    > > Resource hints were originally conceived as ways to customize the
    > > runtime environment (ram, accelerators, I could imagine things
    like
    > > custom containers, data dependencies, etc. being useful here too).
    > > They also apply recursively through composite transforms, and get
    > > reflected in the attached environment (which, it might be
    noted, a GBK
    > > doesn't have).
    > >
    > > I would say this case, as well as think like fanout ratio or key
    > > preserving properties, would be most naturally attached to
    transforms
    > > as annotations, which are arbitrary key-value pairs that can be
    > > attached to any transform, and runners may be free to inspect
    (and act
    > > on) or ignore. For example, they are used in ManagedIO (on
    Dataflow)
    > > to understand the semantic meaning of otherwise opaque IOs.
    > >
    > > Right now the only way to populate annotations is to override the
    > > annotations() method when subclassing PTransform, but this is a
    > > limitation that would be very nice to remove (e.g. a
    > > with_annotations(key=value) in Python, or withAnnotation(key,
    value)
    > > in java.
    > >
    > > - Robert
    > >
    > >
    > > On Tue, Jan 14, 2025 at 8:45 AM Jan Lukavský <je...@seznam.cz>
    wrote:
    > >> Generally, these points are aligned with my line of thinking
    as well. There are two points that made me start this thread, I
    didn't stress these explicitly, so I'll rephrase these:
    > >>
    > >>   a) looking at the current usage of ResourceHints, there are
    hints that actually affect how runner (and we can say Dataflow,
    because apparently no other runner works with these hints,
    currently) allocate computation nodes to tasks. There are two
    hints that actually cannot be (simply) ignored, as they can result
    in failures (accelerator, minRam)
    > >>   b) more hints can be added only by modifying ResourceHints
    class directly, adding new hint to the core
    > >>
    > >>   c) some hints might make sense only in the context of a
    runner, because they can (at least theoretically) affect how a
    _specific_ runner expands a transform
    > >>
    > >>   d) the need to modify core to get some functionality for a
    specific runner adds more unnecessary tight coupling between core
    and runners, my (long-term) position is that this coupling should
    be loose, whenever it is possible
    > >>
    > >> Last note is that I like the mentioned examples of "hints"
    (FanoutRatio, KeyInvariance), but these seem not to fit well into
    my definition of "resource". The question is if we could create
    some more dynamic approach, because if "hints" can be ignored (not
    affecting semantics, only execution), this can lead to separating
    Pipeline and some "hint configuration", that could be provided at
    pipeline submission runtime, not compile time. The ResourceHints
    could then be special case of those.
    > >>
    > >> On 1/14/25 17:04, Robert Burke wrote:
    > >>
    > >> +1 to Danny's comments.
    > >>
    > >> Technically the place to document these on a broader runner
    perspective should be the Runner Capability matrix.
    > >>
    > >> A similar hint would be "FanoutRatio" which can mark
    transforms that have a high fanout per element and lead the runner
    to make different optimization decisions.
    > >>
    > >> Another is KeyInvariance which can also affect optimization
    (eg. If the key is known to not change
    > >>
    > >> The only requirement is that the hint doesn't affect
    correctness, just performance, should the hint be ignored.
    > >>
    > >> (I'm aware that in principle there are likely some areas they
    may overlap. To that I say, that the runner must always prefer
    correctness when it's ambiguous.)
    > >>
    > >> On Tue, Jan 14, 2025, 7:36 AM Danny McCormick via dev
    <dev@beam.apache.org> wrote:
    > >>> In my opinion, what you are describing fits the
    intention/current behavior of resource hints. Resource hints are
    just hints which allow the runner to optimize the execution
    environment where possible, so it should be legal for any runner
    to ignore any hints; as long as we're maintaining that behavior, I
    think it is ok.
    > >>>
    > >>>> Should we introduce some runner-specific way of creating
    hints applicable only to a specific runner?
    > >>> IMO this just makes the pipeline less portable and doesn't
    really do anything to make switching runners easier. Ideally I
    could have a pipeline with a set of hints, some of which apply to
    only Spark, some of which apply to only Flink, and some of which
    apply only to Dataflow, and the pipeline should be fully portable
    across those environments without making modifications. Your use
    case fits this paradigm well since running
    input.apply(GroupByKey.create().setResourceHints(ResourceHints.huge()))
    on any non-Spark runner should still work fine (assuming the
    runner has an out-of-memory GBK implementation by default.
    > >>>
    > >>> It would, however, be nice to at least have a matrix where
    we document which resource hints impact which runners.
    > >>>
    > >>> Thanks,
    > >>> Danny
    > >>>
    > >>> On Tue, Jan 14, 2025 at 6:02 AM Jan Lukavský
    <je...@seznam.cz> wrote:
    > >>>> Hi,
    > >>>>
    > >>>> as part of reviewing [1], I came across a question, which
    might be
    > >>>> solved using resource hints. Seems the usage of these hints
    is currently
    > >>>> limited, though. I'll explain the case in a few points:
    > >>>>
    > >>>>    a) a generic implementation of GBK on Spark assumes that
    all values
    > >>>> fit into memory
    > >>>>
    > >>>>    b) this can be changed to implementation which uses
    Spark's internal
    > >>>> sorting mechanism to group by key and window without the
    need for the
    > >>>> data to fit into memory
    > >>>>
    > >>>>    c) this optimization can be more expensive for cases
    where a) is
    > >>>> sufficient
    > >>>>
    > >>>> There is currently no simple way of knowing if a GBK fits
    to memory or
    > >>>> not. This could be solved using ResourceHints, e.g.:
    > >>>>
    > >>>>
    input.apply(GroupByKey.create().setResourceHints(ResourceHints.huge()))
    > >>>>
    > >>>> The expansion could then pick only the appropriate
    transforms, but it
    > >>>> requires changing the generic ResourceHints class. Is this
    intentional
    > >>>> and the expected approach? We can create pipeline-level
    hints, but this
    > >>>> seems not correct in this situation. Should we introduce some
    > >>>> runner-specific way of creating hints applicable only to a
    specific runner?
    > >>>>
    > >>>> Alternative option seems to be somewhat similar concept of
    > >>>> "annotations", which seems to be introduced and currently
    used only for
    > >>>> error handlers.
    > >>>>
    > >>>> Thanks for any opinions!
    > >>>>    Jan
    > >>>>
    > >>>> [1] https://github.com/apache/beam/pull/33521
    > >>>>

Reply via email to