On Wed, Jan 15, 2025 at 7:20 AM Jan Lukavský <je...@seznam.cz> wrote: > > There is already a method on PTransform in Java SDK [1], which is > apparently used only to attach "FeatureMetric" annotation in error > handler. But seems that annotations are populated in > PTransformTranslation and can then be used by the runner. So far so > good. But there seems to be no "specification" about what should be the > key and value is pure bytes, which is not well suited for usage in > user-facing code. There is Annotations.Enum in proto specification which > defines some internal constants as keys, these seem to be arbitrary > strings with no strcture (e.g. we use URNs on other places). Could we > refactor this a bit so that: > > 1) the keys of annotations would follow URN structure that would help > prevent collisions?
+1, for sure. > 2) change the addAnnotation method on PTransform to accept > TransformAnnotation interface (which would then provide key-value of the > annotation)? > > That way, we could add GroupByKey.HUGE constant of type > TransformAnnotation with defined scoped key that would used as > > GroupByKey.create().addAnnotation(GroupByKey.HUGE) > > which seems to be understandable for pipeline authors? This sounds like a good API (though we can't remove the old variant for backwards compatibility reasons). > [1] > https://beam.apache.org/releases/javadoc/2.61.0/org/apache/beam/sdk/transforms/PTransform.html#addAnnotation-java.lang.String-byte:A- > > On 1/14/25 18:23, Robert Bradshaw via dev wrote: > > Resource hints were originally conceived as ways to customize the > > runtime environment (ram, accelerators, I could imagine things like > > custom containers, data dependencies, etc. being useful here too). > > They also apply recursively through composite transforms, and get > > reflected in the attached environment (which, it might be noted, a GBK > > doesn't have). > > > > I would say this case, as well as think like fanout ratio or key > > preserving properties, would be most naturally attached to transforms > > as annotations, which are arbitrary key-value pairs that can be > > attached to any transform, and runners may be free to inspect (and act > > on) or ignore. For example, they are used in ManagedIO (on Dataflow) > > to understand the semantic meaning of otherwise opaque IOs. > > > > Right now the only way to populate annotations is to override the > > annotations() method when subclassing PTransform, but this is a > > limitation that would be very nice to remove (e.g. a > > with_annotations(key=value) in Python, or withAnnotation(key, value) > > in java. > > > > - Robert > > > > > > On Tue, Jan 14, 2025 at 8:45 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Generally, these points are aligned with my line of thinking as well. > >> There are two points that made me start this thread, I didn't stress these > >> explicitly, so I'll rephrase these: > >> > >> a) looking at the current usage of ResourceHints, there are hints that > >> actually affect how runner (and we can say Dataflow, because apparently no > >> other runner works with these hints, currently) allocate computation nodes > >> to tasks. There are two hints that actually cannot be (simply) ignored, as > >> they can result in failures (accelerator, minRam) > >> b) more hints can be added only by modifying ResourceHints class > >> directly, adding new hint to the core > >> > >> c) some hints might make sense only in the context of a runner, because > >> they can (at least theoretically) affect how a _specific_ runner expands a > >> transform > >> > >> d) the need to modify core to get some functionality for a specific > >> runner adds more unnecessary tight coupling between core and runners, my > >> (long-term) position is that this coupling should be loose, whenever it is > >> possible > >> > >> Last note is that I like the mentioned examples of "hints" (FanoutRatio, > >> KeyInvariance), but these seem not to fit well into my definition of > >> "resource". The question is if we could create some more dynamic approach, > >> because if "hints" can be ignored (not affecting semantics, only > >> execution), this can lead to separating Pipeline and some "hint > >> configuration", that could be provided at pipeline submission runtime, not > >> compile time. The ResourceHints could then be special case of those. > >> > >> On 1/14/25 17:04, Robert Burke wrote: > >> > >> +1 to Danny's comments. > >> > >> Technically the place to document these on a broader runner perspective > >> should be the Runner Capability matrix. > >> > >> A similar hint would be "FanoutRatio" which can mark transforms that have > >> a high fanout per element and lead the runner to make different > >> optimization decisions. > >> > >> Another is KeyInvariance which can also affect optimization (eg. If the > >> key is known to not change > >> > >> The only requirement is that the hint doesn't affect correctness, just > >> performance, should the hint be ignored. > >> > >> (I'm aware that in principle there are likely some areas they may overlap. > >> To that I say, that the runner must always prefer correctness when it's > >> ambiguous.) > >> > >> On Tue, Jan 14, 2025, 7:36 AM Danny McCormick via dev > >> <dev@beam.apache.org> wrote: > >>> In my opinion, what you are describing fits the intention/current > >>> behavior of resource hints. Resource hints are just hints which allow the > >>> runner to optimize the execution environment where possible, so it should > >>> be legal for any runner to ignore any hints; as long as we're maintaining > >>> that behavior, I think it is ok. > >>> > >>>> Should we introduce some runner-specific way of creating hints > >>>> applicable only to a specific runner? > >>> IMO this just makes the pipeline less portable and doesn't really do > >>> anything to make switching runners easier. Ideally I could have a > >>> pipeline with a set of hints, some of which apply to only Spark, some of > >>> which apply to only Flink, and some of which apply only to Dataflow, and > >>> the pipeline should be fully portable across those environments without > >>> making modifications. Your use case fits this paradigm well since running > >>> input.apply(GroupByKey.create().setResourceHints(ResourceHints.huge())) > >>> on any non-Spark runner should still work fine (assuming the runner has > >>> an out-of-memory GBK implementation by default. > >>> > >>> It would, however, be nice to at least have a matrix where we document > >>> which resource hints impact which runners. > >>> > >>> Thanks, > >>> Danny > >>> > >>> On Tue, Jan 14, 2025 at 6:02 AM Jan Lukavský <je...@seznam.cz> wrote: > >>>> Hi, > >>>> > >>>> as part of reviewing [1], I came across a question, which might be > >>>> solved using resource hints. Seems the usage of these hints is currently > >>>> limited, though. I'll explain the case in a few points: > >>>> > >>>> a) a generic implementation of GBK on Spark assumes that all values > >>>> fit into memory > >>>> > >>>> b) this can be changed to implementation which uses Spark's internal > >>>> sorting mechanism to group by key and window without the need for the > >>>> data to fit into memory > >>>> > >>>> c) this optimization can be more expensive for cases where a) is > >>>> sufficient > >>>> > >>>> There is currently no simple way of knowing if a GBK fits to memory or > >>>> not. This could be solved using ResourceHints, e.g.: > >>>> > >>>> input.apply(GroupByKey.create().setResourceHints(ResourceHints.huge())) > >>>> > >>>> The expansion could then pick only the appropriate transforms, but it > >>>> requires changing the generic ResourceHints class. Is this intentional > >>>> and the expected approach? We can create pipeline-level hints, but this > >>>> seems not correct in this situation. Should we introduce some > >>>> runner-specific way of creating hints applicable only to a specific > >>>> runner? > >>>> > >>>> Alternative option seems to be somewhat similar concept of > >>>> "annotations", which seems to be introduced and currently used only for > >>>> error handlers. > >>>> > >>>> Thanks for any opinions! > >>>> Jan > >>>> > >>>> [1] https://github.com/apache/beam/pull/33521 > >>>>