Absolutely agree this is not something that should be part of the model. The ResourceHints is good place, but given how Pipeline might get fused (and though this might be under the control of a runner, basically all runners use the same code, because there is currently no reason why this should be runner-specifiic), there is a problem with how to resolve conflicting settings. Also it is somewhat questionable if parallelism is a "resource". It feels more like a runtime property. I tend to think that FlinkPipelineOptions could be a good place for that, because this seems to apply (mostly) to Flink batch runner.

On 4/21/23 19:43, Robert Bradshaw via dev wrote:
+1 to not requiring details like this in the Beam model. There is, however, the question of how to pass such implementation-detail specific hints to a runner that requires them. Generally that's done via ResourceHints or annotations, and while the former seems a good fit it's primarily focused on setting up the right context for user code (which GBK is not).

A complete hack is to add an experiment like flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do something cleaner.


On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user <u...@beam.apache.org> wrote:

    Hi Jan,

    To generalize the per-stage parallelism configuration, we should
    have a FR proposing the capability to explicitly set autoscaling
    (in this case, fixed size per stage) policy in Beam pipelines.

    Per-step or per-stage parallelism, or fusion/optimization is not
    part of the Beam model. They are [Flink] runner implementation
    details and should be configured for each runner.

    Also, when building the pipeline, it's not clear what the fusion
    looks like until the pipeline is submitted to a runner, thus
    making configuration of the parallelism/worker-per-stage not
    straightforward.
    Flink's parallelism settings can be found here
    
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
    it's still kind of a black box since you don't really know how
    many tasks are actually spawned until you run a pipeline.

    That being said, if we have a general interface controlling how a
    pipeline scales, each runner could adapt [auto]scaling in their
    own way.
    For example, in a Flink job, each operator/stage's task slot is
    prorated by their key numbers; the maximum parallelism is
    throttled by task slot utilization.
    Another example, in a Dataflow job, each stage horizontally scales
    by CPU utilization; vertically scales by memory/disk utilization.

    +dev@beam.apache.org <mailto:dev@beam.apache.org>
    Let's use this thread to discuss how to configure a pipeline for
    runners so that they can scale workers appropriately without
    exposing runner-specific details to the Beam model.

    Ning.


    On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote:

        Hi Ning,

        I might have missed that in the discussion, but we talk about
        batch execution, am I right? In streaming, all operators
        (PTransforms) of a Pipeline are run in the same slots, thus
        the downsides are limited. You can enforce streaming mode
        using --streaming command-line argument. But yes, this might
        have other implications. For batch only it obviously makes
        sense to limit parallelism of a (fused) 'stage', which is not
        an transform-level concept, but rather a more complex union of
        transforms divided by shuffle barrier. Would you be willing to
        start a follow-up thread in @dev mailing list for this for
        deeper discussion?

         Jan

        On 4/20/23 19:18, Ning Kang via user wrote:
        Hi Jan,

        The approach works when your pipeline doesn't have too many
        operators. And the operator that needs the highest
        parallelism can only use at most #total_task_slots /
        #operators resources available in the cluster.

        Another downside is wasted resources for other smaller
        operators who cannot make full use of task slots assigned to
        them. You might see only 1/10 tasks running while the other
        9/10 tasks idle for an operator with parallelism 10,
        especially when it's doing some aggregation like a SUM.

        One redeeming method is that, for operators following another
        operator with high fanout, we can explicitly add a Reshuffle
        to allow a higher parallelism. But this circles back to the
        first downside: if your pipeline has exponentially high
        fanout through it, setting a single parallelism for the whole
        pipeline is not ideal because it limits the scalability of
        your pipeline significantly.

        Ning.


        On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský
        <je...@seznam.cz> wrote:

            Hi,

            this topic was discussed many years ago and the
            conclusion there was that setting the parallelism of
            individual operators via FlinkPipelineOptions (or
            ResourceHints) is be possible, but would be somewhat
            cumbersome. Although I understand that it "feels" weird
            to have high parallelism for operators with small inputs,
            does this actually bring any relevant performance impact?
            I always use parallelism based on the largest operator in
            the Pipeline and this seems to work just fine. Is there
            any particular need or measurable impact of such approach?

             Jan

            On 4/19/23 17:23, Nimalan Mahendran wrote:
            Same need here, using Flink runner. We are processing a
            pcollection (extracting features per element) then
            combining these into groups of features and running the
            next operator on those groups.

            Each group contains ~50 elements, so the parallelism of
            the operator upstream of the groupby should be higher,
            to be balanced with the downstream operator.

            On Tue, Apr 18, 2023 at 19:17 Jeff Zhang
            <zjf...@gmail.com> wrote:

                Hi Reuven,

                It would be better to set parallelism for operators,
                as I mentioned before, there may be multiple
                groupby, join operators in one pipeline, and their
                parallelism can be different due to different input
                data sizes.

                On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax
                <re...@google.com> wrote:

                    Jeff - does setting the global default work for
                    you, or do you need per-operator control? Seems
                    like it would be to add this to ResourceHints.

                    On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw
                    <rober...@google.com> wrote:

                        Yeah, I don't think we have a good
                        per-operator API for this. If we were to add
                        it, it probably belongs in ResourceHints.

                        On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax
                        <re...@google.com> wrote:

                            Looking at FlinkPipelineOptions, there
                            is a parallelism option you can set. I
                            believe this sets the default
                            parallelism for all Flink operators.

                            On Sun, Apr 16, 2023 at 7:20 PM Jeff
                            Zhang <zjf...@gmail.com> wrote:

                                Thanks Holden, this would work for
                                Spark, but Flink doesn't have such
                                kind of mechanism, so I am looking
                                for a general solution on the beam side.

                                On Mon, Apr 17, 2023 at 10:08 AM
                                Holden Karau <hol...@pigscanfly.ca>
                                wrote:

                                    To a (small) degree Sparks “new”
                                    AQE might be able to help
                                    depending on what kind of
                                    operations Beam is compiling it
                                    down to.

                                    Have you tried setting
                                    spark.sql.adaptive.enabled &
                                    
spark.sql.adaptive.coalescePartitions.enabled



                                    On Mon, Apr 17, 2023 at 10:34 AM
                                    Reuven Lax via user
                                    <u...@beam.apache.org> wrote:

                                        I see. Robert - what is the
                                        story for parallelism
                                        controls on GBK with the
                                        Spark or Flink runners?

                                        On Sun, Apr 16, 2023 at
                                        6:24 PM Jeff Zhang
                                        <zjf...@gmail.com> wrote:

                                            No, I don't use
                                            dataflow, I use Spark &
                                            Flink.


                                            On Mon, Apr 17, 2023 at
                                            8:08 AM Reuven Lax
                                            <re...@google.com> wrote:

                                                Are you running on
                                                the Dataflow runner?
                                                If so, Dataflow -
                                                unlike Spark and
                                                Flink - dynamically
                                                modifies the
                                                parallelism as the
                                                operator runs, so
                                                there is no need to
                                                have such controls.
                                                In fact these
                                                specific controls
                                                wouldn't make much
                                                sense for the way
                                                Dataflow implements
                                                these operators.

                                                On Sun, Apr 16, 2023
                                                at 12:25 AM Jeff
                                                Zhang
                                                <zjf...@gmail.com>
                                                wrote:

                                                    Just for
                                                    performance tuning like
                                                    in Spark and Flink.


                                                    On Sun, Apr 16,
                                                    2023 at 1:10 PM
                                                    Robert Bradshaw
                                                    via user
                                                    <u...@beam.apache.org>
                                                    wrote:

                                                        What are you
                                                        trying to
                                                        achieve by
                                                        setting the
                                                        parallelism?

                                                        On Sat, Apr
                                                        15, 2023 at
                                                        5:13 PM Jeff
                                                        Zhang
                                                        <zjf...@gmail.com>
                                                        wrote:

                                                            Thanks
                                                            Reuven,
                                                            what I
                                                            mean is
                                                            to set
                                                            the
                                                            parallelism
                                                            in
                                                            operator
                                                            level.
                                                            And the
                                                            input
                                                            size of
                                                            the
                                                            operator
                                                            is
                                                            unknown
                                                            at
                                                            compiling
                                                            stage if
                                                            it is
                                                            not a
                                                            source
                                                             operator,

                                                            Here's
                                                            an
                                                            example
                                                            of flink
                                                            
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
                                                            Spark also
                                                            support
                                                            to set
                                                            operator
                                                            level
                                                            parallelism
                                                            (see
                                                            groupByKey
                                                            and
                                                            reduceByKey):
                                                            
https://spark.apache.org/docs/latest/rdd-programming-guide.html


                                                            On Sun,
                                                            Apr 16,
                                                            2023 at
                                                            1:42 AM
                                                            Reuven
                                                            Lax via
                                                            user
                                                            
<u...@beam.apache.org>
                                                            wrote:

                                                                The
                                                                maximum
                                                                parallelism is
                                                                always
                                                                determined
                                                                by
                                                                the
                                                                parallelism
                                                                of
                                                                your
                                                                data.
                                                                If
                                                                you
                                                                do a
                                                                GroupByKey
                                                                for
                                                                example,
                                                                the
                                                                number
                                                                of
                                                                keys
                                                                in
                                                                your
                                                                data
                                                                determines
                                                                the
                                                                maximum
                                                                parallelism.


                                                                Beyond
                                                                the
                                                                limitations
                                                                in
                                                                your
                                                                data,
                                                                it
                                                                depends
                                                                on
                                                                your
                                                                execution
                                                                engine.
                                                                If
                                                                you're
                                                                using
                                                                Dataflow,
                                                                Dataflow
                                                                is
                                                                designed
                                                                to
                                                                automatically
                                                                determine
                                                                the
                                                                parallelism
                                                                (e.g.
                                                                work
                                                                will
                                                                be
                                                                dynamically
                                                                split
                                                                and
                                                                moved
                                                                around
                                                                between
                                                                workers,
                                                                the
                                                                number
                                                                of
                                                                workers
                                                                will
                                                                autoscale,
                                                                etc.),
                                                                so
                                                                there's
                                                                no
                                                                need
                                                                to
                                                                explicitly
                                                                set
                                                                the
                                                                parallelism
                                                                of
                                                                the
                                                                execution.

                                                                On
                                                                Sat,
                                                                Apr
                                                                15,
                                                                2023
                                                                at
                                                                1:12 AM
                                                                Jeff
                                                                Zhang
                                                                
<zjf...@gmail.com>
                                                                wrote:

                                                                    Besides
                                                                    the
                                                                    global
                                                                    parallelism 
of
                                                                    beam
                                                                    job,
                                                                    is
                                                                    there
                                                                    any
                                                                    way
                                                                    to
                                                                    set
                                                                    parallelism 
for
                                                                    individual
                                                                    operators
                                                                    like
                                                                    group
                                                                    by
                                                                    and
                                                                    join?
                                                                    I
                                                                    understand 
the
                                                                    parallelism 
setting
                                                                    depends
                                                                    on
                                                                    the
                                                                    underlying
                                                                    execution
                                                                    engine,
                                                                    but
                                                                    it
                                                                    is
                                                                    very
                                                                    common
                                                                    to
                                                                    set
                                                                    parallelism 
like
                                                                    group
                                                                    by
                                                                    and
                                                                    join
                                                                    in
                                                                    both
                                                                    spark
                                                                    &
                                                                    flink.








-- Best
                                                                    Regards

                                                                    Jeff
                                                                    Zhang



-- Best Regards

                                                            Jeff Zhang



-- Best Regards

                                                    Jeff Zhang



-- Best Regards

                                            Jeff Zhang

-- Twitter:
                                    https://twitter.com/holdenkarau
                                    Books (Learning Spark, High
                                    Performance Spark, etc.):
                                    https://amzn.to/2MaRAG9
                                    <https://amzn.to/2MaRAG9>
                                    YouTube Live Streams:
                                    https://www.youtube.com/user/holdenkarau



-- Best Regards

                                Jeff Zhang



-- Best Regards

                Jeff Zhang

Reply via email to