Hi,

this topic was discussed many years ago and the conclusion there was that setting the parallelism of individual operators via FlinkPipelineOptions (or ResourceHints) is be possible, but would be somewhat cumbersome. Although I understand that it "feels" weird to have high parallelism for operators with small inputs, does this actually bring any relevant performance impact? I always use parallelism based on the largest operator in the Pipeline and this seems to work just fine. Is there any particular need or measurable impact of such approach?

 Jan

On 4/19/23 17:23, Nimalan Mahendran wrote:
Same need here, using Flink runner. We are processing a pcollection (extracting features per element) then combining these into groups of features and running the next operator on those groups.

Each group contains ~50 elements, so the parallelism of the operator upstream of the groupby should be higher, to be balanced with the downstream operator.

On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zjf...@gmail.com> wrote:

    Hi Reuven,

    It would be better to set parallelism for operators, as I
    mentioned before, there may be multiple groupby, join operators in
    one pipeline, and their parallelism can be different due to
    different input data sizes.

    On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:

        Jeff - does setting the global default work for you, or do you
        need per-operator control? Seems like it would be to add this
        to ResourceHints.

        On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw
        <rober...@google.com> wrote:

            Yeah, I don't think we have a good per-operator API for
            this. If we were to add it, it probably belongs in
            ResourceHints.

            On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax
            <re...@google.com> wrote:

                Looking at FlinkPipelineOptions, there is a
                parallelism option you can set. I believe this sets
                the default parallelism for all Flink operators.

                On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang
                <zjf...@gmail.com> wrote:

                    Thanks Holden, this would work for Spark, but
                    Flink doesn't have such kind of mechanism, so I am
                    looking for a general solution on the beam side.

                    On Mon, Apr 17, 2023 at 10:08 AM Holden Karau
                    <hol...@pigscanfly.ca> wrote:

                        To a (small) degree Sparks “new” AQE might be
                        able to help depending on what kind of
                        operations Beam is compiling it down to.

                        Have you tried setting
                        spark.sql.adaptive.enabled &
                        spark.sql.adaptive.coalescePartitions.enabled



                        On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax
                        via user <user@beam.apache.org> wrote:

                            I see. Robert - what is the story for
                            parallelism controls on GBK with the Spark
                            or Flink runners?

                            On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang
                            <zjf...@gmail.com> wrote:

                                No, I don't use dataflow, I use Spark
                                & Flink.


                                On Mon, Apr 17, 2023 at 8:08 AM Reuven
                                Lax <re...@google.com> wrote:

                                    Are you running on the Dataflow
                                    runner? If so, Dataflow - unlike
                                    Spark and Flink - dynamically
                                    modifies the parallelism as the
                                    operator runs, so there is no need
                                    to have such controls. In fact
                                    these specific controls wouldn't
                                    make much sense for the way
                                    Dataflow implements these operators.

                                    On Sun, Apr 16, 2023 at 12:25 AM
                                    Jeff Zhang <zjf...@gmail.com> wrote:

                                        Just for
                                        performance tuning like in
                                        Spark and Flink.


                                        On Sun, Apr 16, 2023 at
                                        1:10 PM Robert Bradshaw via
                                        user <user@beam.apache.org> wrote:

                                            What are you trying to
                                            achieve by setting the
                                            parallelism?

                                            On Sat, Apr 15, 2023 at
                                            5:13 PM Jeff Zhang
                                            <zjf...@gmail.com> wrote:

                                                Thanks Reuven, what I
                                                mean is to set the
                                                parallelism in
                                                operator level. And
                                                the input size of the
                                                operator is unknown at
                                                compiling stage if it
                                                is not a source
                                                 operator,

                                                Here's an example of
                                                flink
                                                
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
                                                Spark also support to
                                                set operator level
                                                parallelism (see
                                                groupByKey and
                                                reduceByKey):
                                                
https://spark.apache.org/docs/latest/rdd-programming-guide.html


                                                On Sun, Apr 16, 2023
                                                at 1:42 AM Reuven Lax
                                                via user
                                                <user@beam.apache.org>
                                                wrote:

                                                    The maximum
                                                    parallelism is
                                                    always determined
                                                    by the parallelism
                                                    of your data. If
                                                    you do a
                                                    GroupByKey for
                                                    example, the
                                                    number of keys in
                                                    your data
                                                    determines the
                                                    maximum parallelism.

                                                    Beyond the
                                                    limitations in
                                                    your data, it
                                                    depends on your
                                                    execution engine.
                                                    If you're using
                                                    Dataflow, Dataflow
                                                    is designed to
                                                    automatically
                                                    determine the
                                                    parallelism (e.g.
                                                    work will be
                                                    dynamically split
                                                    and moved around
                                                    between workers,
                                                    the number of
                                                    workers will
                                                    autoscale, etc.),
                                                    so there's no need
                                                    to explicitly set
                                                    the parallelism of
                                                    the execution.

                                                    On Sat, Apr 15,
                                                    2023 at 1:12 AM
                                                    Jeff Zhang
                                                    <zjf...@gmail.com>
                                                    wrote:

                                                        Besides the
                                                        global
                                                        parallelism of
                                                        beam job, is
                                                        there any way
                                                        to set
                                                        parallelism for
                                                        individual
                                                        operators like
                                                        group by and
                                                        join? I
                                                        understand the
                                                        parallelism setting
                                                        depends on the
                                                        underlying
                                                        execution
                                                        engine, but it
                                                        is very common
                                                        to set
                                                        parallelism like
                                                        group by and
                                                        join in both
                                                        spark & flink.







-- Best Regards

                                                        Jeff Zhang



-- Best Regards

                                                Jeff Zhang



-- Best Regards

                                        Jeff Zhang



-- Best Regards

                                Jeff Zhang

-- Twitter: https://twitter.com/holdenkarau
                        Books (Learning Spark, High Performance Spark,
                        etc.): https://amzn.to/2MaRAG9
                        <https://amzn.to/2MaRAG9>
                        YouTube Live Streams:
                        https://www.youtube.com/user/holdenkarau



-- Best Regards

                    Jeff Zhang



-- Best Regards

    Jeff Zhang

Reply via email to