FWIW I think parallelism is close enough to a resource. If you phrased it like "how many CPUs can work independently" it is more closely related to resources. Just like how many bits it takes to encode something is a semantic property, but "RAM" is a resource.
I think a big role of resource hints is to be a bridge between the Beam Model, which tries hard to only include essential information, to a particular implementation which may not be able to autotune various inessential/implementation details. Specifying parallelism to a runner that still requires manual tuning of that seems like a fine use of this. Kenn On Fri, Apr 21, 2023 at 11:30 AM Jan Lukavský <je...@seznam.cz> wrote: > Absolutely agree this is not something that should be part of the model. > The ResourceHints is good place, but given how Pipeline might get fused > (and though this might be under the control of a runner, basically all > runners use the same code, because there is currently no reason why this > should be runner-specifiic), there is a problem with how to resolve > conflicting settings. Also it is somewhat questionable if parallelism is a > "resource". It feels more like a runtime property. I tend to think that > FlinkPipelineOptions could be a good place for that, because this seems to > apply (mostly) to Flink batch runner. > On 4/21/23 19:43, Robert Bradshaw via dev wrote: > > +1 to not requiring details like this in the Beam model. There is, > however, the question of how to pass such implementation-detail specific > hints to a runner that requires them. Generally that's done via > ResourceHints or annotations, and while the former seems a good fit it's > primarily focused on setting up the right context for user code (which GBK > is not). > > A complete hack is to add an experiment like > flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do something > cleaner. > > > On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user <u...@beam.apache.org> > wrote: > >> Hi Jan, >> >> To generalize the per-stage parallelism configuration, we should have a >> FR proposing the capability to explicitly set autoscaling (in this case, >> fixed size per stage) policy in Beam pipelines. >> >> Per-step or per-stage parallelism, or fusion/optimization is not part of >> the Beam model. They are [Flink] runner implementation details and should >> be configured for each runner. >> >> Also, when building the pipeline, it's not clear what the fusion looks >> like until the pipeline is submitted to a runner, thus making configuration >> of the parallelism/worker-per-stage not straightforward. >> Flink's parallelism settings can be found here >> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>, >> it's still kind of a black box since you don't really know how many tasks >> are actually spawned until you run a pipeline. >> >> That being said, if we have a general interface controlling how a >> pipeline scales, each runner could adapt [auto]scaling in their own way. >> For example, in a Flink job, each operator/stage's task slot is prorated >> by their key numbers; the maximum parallelism is throttled by task slot >> utilization. >> Another example, in a Dataflow job, each stage horizontally scales by CPU >> utilization; vertically scales by memory/disk utilization. >> >> +dev@beam.apache.org <dev@beam.apache.org> >> Let's use this thread to discuss how to configure a pipeline for runners >> so that they can scale workers appropriately without exposing >> runner-specific details to the Beam model. >> >> Ning. >> >> >> On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi Ning, >>> >>> I might have missed that in the discussion, but we talk about batch >>> execution, am I right? In streaming, all operators (PTransforms) of a >>> Pipeline are run in the same slots, thus the downsides are limited. You can >>> enforce streaming mode using --streaming command-line argument. But yes, >>> this might have other implications. For batch only it obviously makes sense >>> to limit parallelism of a (fused) 'stage', which is not an transform-level >>> concept, but rather a more complex union of transforms divided by shuffle >>> barrier. Would you be willing to start a follow-up thread in @dev mailing >>> list for this for deeper discussion? >>> >>> Jan >>> On 4/20/23 19:18, Ning Kang via user wrote: >>> >>> Hi Jan, >>> >>> The approach works when your pipeline doesn't have too many operators. >>> And the operator that needs the highest parallelism can only use at most >>> #total_task_slots / #operators resources available in the cluster. >>> >>> Another downside is wasted resources for other smaller operators who >>> cannot make full use of task slots assigned to them. You might see only >>> 1/10 tasks running while the other 9/10 tasks idle for an operator with >>> parallelism 10, especially when it's doing some aggregation like a SUM. >>> >>> One redeeming method is that, for operators following another operator >>> with high fanout, we can explicitly add a Reshuffle to allow a higher >>> parallelism. But this circles back to the first downside: if your pipeline >>> has exponentially high fanout through it, setting a single parallelism for >>> the whole pipeline is not ideal because it limits the scalability of your >>> pipeline significantly. >>> >>> Ning. >>> >>> >>> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi, >>>> >>>> this topic was discussed many years ago and the conclusion there was >>>> that setting the parallelism of individual operators via >>>> FlinkPipelineOptions (or ResourceHints) is be possible, but would be >>>> somewhat cumbersome. Although I understand that it "feels" weird to have >>>> high parallelism for operators with small inputs, does this actually bring >>>> any relevant performance impact? I always use parallelism based on the >>>> largest operator in the Pipeline and this seems to work just fine. Is there >>>> any particular need or measurable impact of such approach? >>>> >>>> Jan >>>> On 4/19/23 17:23, Nimalan Mahendran wrote: >>>> >>>> Same need here, using Flink runner. We are processing a pcollection >>>> (extracting features per element) then combining these into groups of >>>> features and running the next operator on those groups. >>>> >>>> Each group contains ~50 elements, so the parallelism of the operator >>>> upstream of the groupby should be higher, to be balanced with the >>>> downstream operator. >>>> >>>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zjf...@gmail.com> wrote: >>>> >>>>> Hi Reuven, >>>>> >>>>> It would be better to set parallelism for operators, as I mentioned >>>>> before, there may be multiple groupby, join operators in one pipeline, and >>>>> their parallelism can be different due to different input data sizes. >>>>> >>>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> Jeff - does setting the global default work for you, or do you need >>>>>> per-operator control? Seems like it would be to add this to >>>>>> ResourceHints. >>>>>> >>>>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <rober...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Yeah, I don't think we have a good per-operator API for this. If we >>>>>>> were to add it, it probably belongs in ResourceHints. >>>>>>> >>>>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you >>>>>>>> can set. I believe this sets the default parallelism for all Flink >>>>>>>> operators. >>>>>>>> >>>>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zjf...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have >>>>>>>>> such kind of mechanism, so I am looking for a general solution on the >>>>>>>>> beam >>>>>>>>> side. >>>>>>>>> >>>>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau < >>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>> >>>>>>>>>> To a (small) degree Sparks “new” AQE might be able to help >>>>>>>>>> depending on what kind of operations Beam is compiling it down to. >>>>>>>>>> >>>>>>>>>> Have you tried setting spark.sql.adaptive.enabled & >>>>>>>>>> spark.sql.adaptive.coalescePartitions.enabled >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user < >>>>>>>>>> u...@beam.apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> I see. Robert - what is the story for parallelism controls on >>>>>>>>>>> GBK with the Spark or Flink runners? >>>>>>>>>>> >>>>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> No, I don't use dataflow, I use Spark & Flink. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - >>>>>>>>>>>>> unlike Spark and Flink - dynamically modifies the parallelism as >>>>>>>>>>>>> the >>>>>>>>>>>>> operator runs, so there is no need to have such controls. In fact >>>>>>>>>>>>> these >>>>>>>>>>>>> specific controls wouldn't make much sense for the way Dataflow >>>>>>>>>>>>> implements >>>>>>>>>>>>> these operators. >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Just for performance tuning like in Spark and Flink. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user < >>>>>>>>>>>>>> u...@beam.apache.org> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> What are you trying to achieve by setting the parallelism? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in >>>>>>>>>>>>>>>> operator level. And the input size of the operator is unknown >>>>>>>>>>>>>>>> at compiling >>>>>>>>>>>>>>>> stage if it is not a source >>>>>>>>>>>>>>>> operator, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Here's an example of flink >>>>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level >>>>>>>>>>>>>>>> Spark also support to set operator level parallelism (see >>>>>>>>>>>>>>>> groupByKey >>>>>>>>>>>>>>>> and reduceByKey): >>>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user < >>>>>>>>>>>>>>>> u...@beam.apache.org> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The maximum parallelism is always determined by the >>>>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, >>>>>>>>>>>>>>>>> the number of >>>>>>>>>>>>>>>>> keys in your data determines the maximum parallelism. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your >>>>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is >>>>>>>>>>>>>>>>> designed to >>>>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be >>>>>>>>>>>>>>>>> dynamically >>>>>>>>>>>>>>>>> split and moved around between workers, the number of workers >>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the >>>>>>>>>>>>>>>>> parallelism of >>>>>>>>>>>>>>>>> the execution. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang < >>>>>>>>>>>>>>>>> zjf...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any >>>>>>>>>>>>>>>>>> way to set parallelism for individual operators like group >>>>>>>>>>>>>>>>>> by and join? I >>>>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying >>>>>>>>>>>>>>>>>> execution >>>>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group >>>>>>>>>>>>>>>>>> by and join in >>>>>>>>>>>>>>>>>> both spark & flink. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>> >>>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Best Regards >>>>>>>>>>>> >>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>> Books (Learning Spark, High Performance Spark, etc.): >>>>>>>>>> https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9> >>>>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> Jeff Zhang >>>>>>>>> >>>>>>>> >>>>> >>>>> -- >>>>> Best Regards >>>>> >>>>> Jeff Zhang >>>>> >>>>