Hi Dylan,

streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

is currently not supported by the Table & SQL API. For now,

val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

determines the mode. Thus, I would remove the line again.

If you want to use `inBatchMode()`, you can use the unified TableEnvironment that is not connected to the StreamExecutionEnvironment:

TableEnvironment.create(settings);

So Pitorek's answer hopefully makes more sense now.

Regards,
Timo


On 14.04.21 16:08, Dylan Forciea wrote:
Pitorek,

I was actually originally using a group function that WAS deterministic (but was a custom UDF I made), but chose something here built in. By non-deterministic, I mean that the number of records coming out is not consistent. Since the FIRST_VALUE here is on an attribute that is not part of the key, that shouldn’t affect the number of records coming out I wouldn’t think.

Dylan

*From: *Piotr Nowojski <pnowoj...@apache.org>
*Date: *Wednesday, April 14, 2021 at 9:06 AM
*To: *Dylan Forciea <dy...@oseberg.io>
*Cc: *"user@flink.apache.org" <user@flink.apache.org>
*Subject: *Re: Nondeterministic results with SQL job when parallelism is > 1

Hi,

Yes, it looks like your query is non deterministic because of `FIRST_VALUE` used inside `GROUP BY`. If you have many different parallel sources, each time you run your query your first value might be different. If that's the case, you could try to confirm it with even smaller query:

        SELECT
           id2,
           FIRST_VALUE(attr) AS attr
         FROM table2
         GROUP BY id2

Best,

Piotrek

śr., 14 kwi 2021 o 14:45 Dylan Forciea <dy...@oseberg.io <mailto:dy...@oseberg.io>> napisał(a):

    I am running Flink 1.12.2, and I was trying to up the parallelism of
    my Flink SQL job to see what happened. However, once I did that, my
    results became nondeterministic. This happens whether I set the
    table.exec.resource.default-parallelism config option or I set the
    default local parallelism to something higher than 1. I would end up
    with less records in the end, and each time I ran the output record
    count would come out differently.

    I managed to distill an example, as pasted below (with attribute
    names changed to protect company proprietary info), that causes the
    issue. I feel like I managed to get it to happen with a LEFT JOIN
    rather than a FULL JOIN, but the distilled version wasn’t giving me
    wrong results with that. Maybe it has to do with joining to a table
    that was formed using a GROUP BY? Can somebody tell if I’m doing
    something that is known not to work, or if I have run across a bug?

    Regards,

    Dylan Forciea

    objectJob{

    defmain(args: Array[String]): Unit= {

    StreamExecutionEnvironment.setDefaultLocalParallelism(1)

    valsettings=
    
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    valstreamEnv= StreamExecutionEnvironment.getExecutionEnvironment

    valstreamTableEnv= StreamTableEnvironment.create(streamEnv, settings)

    valconfiguration= streamTableEnv.getConfig().getConfiguration()

configuration.setInteger("table.exec.resource.default-parallelism", 16)

         streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

         streamTableEnv.executeSql(

    """

           CREATE TABLE table1 (

             id1 STRING PRIMARY KEY NOT ENFORCED,

             attr STRING

           ) WITH (

             'connector' = 'jdbc',

             'url' = 'jdbc:postgresql://…',

             'table-name' = 'table1’,

             'username' = 'username',

             'password' = 'password',

             'scan.fetch-size' = '500',

             'scan.auto-commit' = 'false'

           )""")

         streamTableEnv.executeSql(

    """

           CREATE TABLE table2 (

             attr STRING,

             id2 STRING

           ) WITH (

             'connector' = 'jdbc',

             'url' = 'jdbc:postgresql://…',

             'table-name' = 'table2',

             'username' = 'username',

             'password' = 'password',

             'scan.fetch-size' = '500',

             'scan.auto-commit' = 'false'

           )""")

         streamTableEnv.executeSql(

    """

           CREATE TABLE table3 (

             attr STRING PRIMARY KEY NOT ENFORCED,

             attr_mapped STRING

           ) WITH (

             'connector' = 'jdbc',

             'url' = 'jdbc:postgresql://…',

             'table-name' = ‘table3',

             'username' = ‘username',

             'password' = 'password',

             'scan.fetch-size' = '500',

             'scan.auto-commit' = 'false'

           )""")

         streamTableEnv.executeSql("""

           CREATE TABLE sink (

             id STRING PRIMARY KEY NOT ENFORCED,

             attr STRING,

             attr_mapped STRING

           ) WITH (

             'connector' = 'jdbc',

             'url' = 'jdbc:postgresql://…,

             'table-name' = 'sink',

             'username' = 'username',

             'password' = 'password',

             'scan.fetch-size' = '500',

             'scan.auto-commit' = 'false'

           )""")

    valview=

           streamTableEnv.sqlQuery("""

           SELECT

             COALESCE(t1.id1, t2.id2) AS id,

             COALESCE(t2.attr, t1.attr) AS operator,

             COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped

           FROM table1 t1

           FULL JOIN (

             SELECT

               id2,

               FIRST_VALUE(attr) AS attr

             FROM table2

             GROUP BY id2

           ) t2

            ON (t1.id1 = t2.id2)

           LEFT JOIN table3 t3

             ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")

         streamTableEnv.createTemporaryView("view", view)

    valstatementSet= streamTableEnv.createStatementSet()

         statementSet.addInsertSql("""

           INSERT INTO sink SELECT * FROM view

         """)

         statementSet.execute().await()

       }

    }


Reply via email to