Hi Jan, To generalize the per-stage parallelism configuration, we should have a FR proposing the capability to explicitly set autoscaling (in this case, fixed size per stage) policy in Beam pipelines.
Per-step or per-stage parallelism, or fusion/optimization is not part of the Beam model. They are [Flink] runner implementation details and should be configured for each runner. Also, when building the pipeline, it's not clear what the fusion looks like until the pipeline is submitted to a runner, thus making configuration of the parallelism/worker-per-stage not straightforward. Flink's parallelism settings can be found here <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>, it's still kind of a black box since you don't really know how many tasks are actually spawned until you run a pipeline. That being said, if we have a general interface controlling how a pipeline scales, each runner could adapt [auto]scaling in their own way. For example, in a Flink job, each operator/stage's task slot is prorated by their key numbers; the maximum parallelism is throttled by task slot utilization. Another example, in a Dataflow job, each stage horizontally scales by CPU utilization; vertically scales by memory/disk utilization. +d...@beam.apache.org <d...@beam.apache.org> Let's use this thread to discuss how to configure a pipeline for runners so that they can scale workers appropriately without exposing runner-specific details to the Beam model. Ning. On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote: > Hi Ning, > > I might have missed that in the discussion, but we talk about batch > execution, am I right? In streaming, all operators (PTransforms) of a > Pipeline are run in the same slots, thus the downsides are limited. You can > enforce streaming mode using --streaming command-line argument. But yes, > this might have other implications. For batch only it obviously makes sense > to limit parallelism of a (fused) 'stage', which is not an transform-level > concept, but rather a more complex union of transforms divided by shuffle > barrier. Would you be willing to start a follow-up thread in @dev mailing > list for this for deeper discussion? > > Jan > On 4/20/23 19:18, Ning Kang via user wrote: > > Hi Jan, > > The approach works when your pipeline doesn't have too many operators. And > the operator that needs the highest parallelism can only use at most > #total_task_slots / #operators resources available in the cluster. > > Another downside is wasted resources for other smaller operators who > cannot make full use of task slots assigned to them. You might see only > 1/10 tasks running while the other 9/10 tasks idle for an operator with > parallelism 10, especially when it's doing some aggregation like a SUM. > > One redeeming method is that, for operators following another operator > with high fanout, we can explicitly add a Reshuffle to allow a higher > parallelism. But this circles back to the first downside: if your pipeline > has exponentially high fanout through it, setting a single parallelism for > the whole pipeline is not ideal because it limits the scalability of your > pipeline significantly. > > Ning. > > > On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi, >> >> this topic was discussed many years ago and the conclusion there was that >> setting the parallelism of individual operators via FlinkPipelineOptions >> (or ResourceHints) is be possible, but would be somewhat cumbersome. >> Although I understand that it "feels" weird to have high parallelism for >> operators with small inputs, does this actually bring any relevant >> performance impact? I always use parallelism based on the largest operator >> in the Pipeline and this seems to work just fine. Is there any particular >> need or measurable impact of such approach? >> >> Jan >> On 4/19/23 17:23, Nimalan Mahendran wrote: >> >> Same need here, using Flink runner. We are processing a pcollection >> (extracting features per element) then combining these into groups of >> features and running the next operator on those groups. >> >> Each group contains ~50 elements, so the parallelism of the operator >> upstream of the groupby should be higher, to be balanced with the >> downstream operator. >> >> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zjf...@gmail.com> wrote: >> >>> Hi Reuven, >>> >>> It would be better to set parallelism for operators, as I mentioned >>> before, there may be multiple groupby, join operators in one pipeline, and >>> their parallelism can be different due to different input data sizes. >>> >>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote: >>> >>>> Jeff - does setting the global default work for you, or do you need >>>> per-operator control? Seems like it would be to add this to ResourceHints. >>>> >>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> Yeah, I don't think we have a good per-operator API for this. If we >>>>> were to add it, it probably belongs in ResourceHints. >>>>> >>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you >>>>>> can set. I believe this sets the default parallelism for all Flink >>>>>> operators. >>>>>> >>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zjf...@gmail.com> wrote: >>>>>> >>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have >>>>>>> such kind of mechanism, so I am looking for a general solution on the >>>>>>> beam >>>>>>> side. >>>>>>> >>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <hol...@pigscanfly.ca> >>>>>>> wrote: >>>>>>> >>>>>>>> To a (small) degree Sparks “new” AQE might be able to help >>>>>>>> depending on what kind of operations Beam is compiling it down to. >>>>>>>> >>>>>>>> Have you tried setting spark.sql.adaptive.enabled & >>>>>>>> spark.sql.adaptive.coalescePartitions.enabled >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user < >>>>>>>> user@beam.apache.org> wrote: >>>>>>>> >>>>>>>>> I see. Robert - what is the story for parallelism controls on >>>>>>>>> GBK with the Spark or Flink runners? >>>>>>>>> >>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zjf...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> No, I don't use dataflow, I use Spark & Flink. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike >>>>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the >>>>>>>>>>> operator >>>>>>>>>>> runs, so there is no need to have such controls. In fact these >>>>>>>>>>> specific >>>>>>>>>>> controls wouldn't make much sense for the way Dataflow implements >>>>>>>>>>> these >>>>>>>>>>> operators. >>>>>>>>>>> >>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Just for performance tuning like in Spark and Flink. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user < >>>>>>>>>>>> user@beam.apache.org> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> What are you trying to achieve by setting the parallelism? >>>>>>>>>>>>> >>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in >>>>>>>>>>>>>> operator level. And the input size of the operator is unknown at >>>>>>>>>>>>>> compiling >>>>>>>>>>>>>> stage if it is not a source >>>>>>>>>>>>>> operator, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Here's an example of flink >>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level >>>>>>>>>>>>>> Spark also support to set operator level parallelism (see >>>>>>>>>>>>>> groupByKey >>>>>>>>>>>>>> and reduceByKey): >>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user < >>>>>>>>>>>>>> user@beam.apache.org> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> The maximum parallelism is always determined by the >>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, >>>>>>>>>>>>>>> the number of >>>>>>>>>>>>>>> keys in your data determines the maximum parallelism. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your >>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is >>>>>>>>>>>>>>> designed to >>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be >>>>>>>>>>>>>>> dynamically >>>>>>>>>>>>>>> split and moved around between workers, the number of workers >>>>>>>>>>>>>>> will >>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the >>>>>>>>>>>>>>> parallelism of >>>>>>>>>>>>>>> the execution. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any >>>>>>>>>>>>>>>> way to set parallelism for individual operators like group by >>>>>>>>>>>>>>>> and join? I >>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying >>>>>>>>>>>>>>>> execution >>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by >>>>>>>>>>>>>>>> and join in >>>>>>>>>>>>>>>> both spark & flink. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>> >>>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Best Regards >>>>>>>>>>>> >>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best Regards >>>>>>>>>> >>>>>>>>>> Jeff Zhang >>>>>>>>>> >>>>>>>>> -- >>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>> Books (Learning Spark, High Performance Spark, etc.): >>>>>>>> https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9> >>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best Regards >>>>>>> >>>>>>> Jeff Zhang >>>>>>> >>>>>> >>> >>> -- >>> Best Regards >>> >>> Jeff Zhang >>> >>