FWIW I think parallelism is close enough to a resource. If you phrased it
like "how many CPUs can work independently" it is more closely related to
resources. Just like how many bits it takes to encode something is a
semantic property, but "RAM" is a resource.

I think a big role of resource hints is to be a bridge between the Beam
Model, which tries hard to only include essential information, to a
particular implementation which may not be able to autotune various
inessential/implementation details. Specifying parallelism to a runner that
still requires manual tuning of that seems like a fine use of this.

Kenn

On Fri, Apr 21, 2023 at 11:30 AM Jan Lukavský <je...@seznam.cz> wrote:

> Absolutely agree this is not something that should be part of the model.
> The ResourceHints is good place, but given how Pipeline might get fused
> (and though this might be under the control of a runner, basically all
> runners use the same code, because there is currently no reason why this
> should be runner-specifiic), there is a problem with how to resolve
> conflicting settings. Also it is somewhat questionable if parallelism is a
> "resource". It feels more like a runtime property. I tend to think that
> FlinkPipelineOptions could be a good place for that, because this seems to
> apply (mostly) to Flink batch runner.
> On 4/21/23 19:43, Robert Bradshaw via dev wrote:
>
> +1 to not requiring details like this in the Beam model. There is,
> however, the question of how to pass such implementation-detail specific
> hints to a runner that requires them. Generally that's done via
> ResourceHints or annotations, and while the former seems a good fit it's
> primarily focused on setting up the right context for user code (which GBK
> is not).
>
> A complete hack is to add an experiment like
> flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do something
> cleaner.
>
>
> On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user <u...@beam.apache.org>
> wrote:
>
>> Hi Jan,
>>
>> To generalize the per-stage parallelism configuration, we should have a
>> FR proposing the capability to explicitly set autoscaling (in this case,
>> fixed size per stage) policy in Beam pipelines.
>>
>> Per-step or per-stage parallelism, or fusion/optimization is not part of
>> the Beam model. They are [Flink] runner implementation details and should
>> be configured for each runner.
>>
>> Also, when building the pipeline, it's not clear what the fusion looks
>> like until the pipeline is submitted to a runner, thus making configuration
>> of the parallelism/worker-per-stage not straightforward.
>> Flink's parallelism settings can be found here
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
>> it's still kind of a black box since you don't really know how many tasks
>> are actually spawned until you run a pipeline.
>>
>> That being said, if we have a general interface controlling how a
>> pipeline scales, each runner could adapt [auto]scaling in their own way.
>> For example, in a Flink job, each operator/stage's task slot is prorated
>> by their key numbers; the maximum parallelism is throttled by task slot
>> utilization.
>> Another example, in a Dataflow job, each stage horizontally scales by CPU
>> utilization; vertically scales by memory/disk utilization.
>>
>> +dev@beam.apache.org <dev@beam.apache.org>
>> Let's use this thread to discuss how to configure a pipeline for runners
>> so that they can scale workers appropriately without exposing
>> runner-specific details to the Beam model.
>>
>> Ning.
>>
>>
>> On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Ning,
>>>
>>> I might have missed that in the discussion, but we talk about batch
>>> execution, am I right? In streaming, all operators (PTransforms) of a
>>> Pipeline are run in the same slots, thus the downsides are limited. You can
>>> enforce streaming mode using --streaming command-line argument. But yes,
>>> this might have other implications. For batch only it obviously makes sense
>>> to limit parallelism of a (fused) 'stage', which is not an transform-level
>>> concept, but rather a more complex union of transforms divided by shuffle
>>> barrier. Would you be willing to start a follow-up thread in @dev mailing
>>> list for this for deeper discussion?
>>>
>>>  Jan
>>> On 4/20/23 19:18, Ning Kang via user wrote:
>>>
>>> Hi Jan,
>>>
>>> The approach works when your pipeline doesn't have too many operators.
>>> And the operator that needs the highest parallelism can only use at most
>>> #total_task_slots / #operators resources available in the cluster.
>>>
>>> Another downside is wasted resources for other smaller operators who
>>> cannot make full use of task slots assigned to them. You might see only
>>> 1/10 tasks running while the other 9/10 tasks idle for an operator with
>>> parallelism 10, especially when it's doing some aggregation like a SUM.
>>>
>>> One redeeming method is that, for operators following another operator
>>> with high fanout, we can explicitly add a Reshuffle to allow a higher
>>> parallelism. But this circles back to the first downside: if your pipeline
>>> has exponentially high fanout through it, setting a single parallelism for
>>> the whole pipeline is not ideal because it limits the scalability of your
>>> pipeline significantly.
>>>
>>> Ning.
>>>
>>>
>>> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi,
>>>>
>>>> this topic was discussed many years ago and the conclusion there was
>>>> that setting the parallelism of individual operators via
>>>> FlinkPipelineOptions (or ResourceHints) is be possible, but would be
>>>> somewhat cumbersome. Although I understand that it "feels" weird to have
>>>> high parallelism for operators with small inputs, does this actually bring
>>>> any relevant performance impact? I always use parallelism based on the
>>>> largest operator in the Pipeline and this seems to work just fine. Is there
>>>> any particular need or measurable impact of such approach?
>>>>
>>>>  Jan
>>>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>>>
>>>> Same need here, using Flink runner. We are processing a pcollection
>>>> (extracting features per element) then combining these into groups of
>>>> features and running the next operator on those groups.
>>>>
>>>> Each group contains ~50 elements, so the parallelism of the operator
>>>> upstream of the groupby should be higher, to be balanced with the
>>>> downstream operator.
>>>>
>>>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zjf...@gmail.com> wrote:
>>>>
>>>>> Hi Reuven,
>>>>>
>>>>> It would be better to set parallelism for operators, as I mentioned
>>>>> before, there may be multiple groupby, join operators in one pipeline, and
>>>>> their parallelism can be different due to different input data sizes.
>>>>>
>>>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Jeff - does setting the global default work for you, or do you need
>>>>>> per-operator control? Seems like it would be to add this to 
>>>>>> ResourceHints.
>>>>>>
>>>>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <rober...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yeah, I don't think we have a good per-operator API for this. If we
>>>>>>> were to add it, it probably belongs in ResourceHints.
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you
>>>>>>>> can set. I believe this sets the default parallelism for all Flink
>>>>>>>> operators.
>>>>>>>>
>>>>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zjf...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have
>>>>>>>>> such kind of mechanism, so I am looking for a general solution on the 
>>>>>>>>> beam
>>>>>>>>> side.
>>>>>>>>>
>>>>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <
>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>
>>>>>>>>>> To a (small) degree Sparks “new” AQE might be able to help
>>>>>>>>>> depending on what kind of operations Beam is compiling it down to.
>>>>>>>>>>
>>>>>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>>>>>> u...@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow -
>>>>>>>>>>>>> unlike Spark and Flink - dynamically modifies the parallelism as 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> operator runs, so there is no need to have such controls. In fact 
>>>>>>>>>>>>> these
>>>>>>>>>>>>> specific controls wouldn't make much sense for the way Dataflow 
>>>>>>>>>>>>> implements
>>>>>>>>>>>>> these operators.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>>>>>> u...@beam.apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>>>>>> operator level. And the input size of the operator is unknown 
>>>>>>>>>>>>>>>> at compiling
>>>>>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>>>>>  operator,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>>>>>> Spark also support to set operator level parallelism (see 
>>>>>>>>>>>>>>>> groupByKey
>>>>>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>>>>>> u...@beam.apache.org> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, 
>>>>>>>>>>>>>>>>> the number of
>>>>>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is 
>>>>>>>>>>>>>>>>> designed to
>>>>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be 
>>>>>>>>>>>>>>>>> dynamically
>>>>>>>>>>>>>>>>> split and moved around between workers, the number of workers 
>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the 
>>>>>>>>>>>>>>>>> parallelism of
>>>>>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <
>>>>>>>>>>>>>>>>> zjf...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any
>>>>>>>>>>>>>>>>>> way to set parallelism for individual operators like group 
>>>>>>>>>>>>>>>>>> by and join? I
>>>>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying 
>>>>>>>>>>>>>>>>>> execution
>>>>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group 
>>>>>>>>>>>>>>>>>> by and join in
>>>>>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Jeff Zhang
>>>>>>>>>
>>>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>

Reply via email to