Hi,

Yes, it looks like your query is non deterministic because of `FIRST_VALUE`
used inside `GROUP BY`. If you have many different parallel sources, each
time you run your query your first value might be different. If that's the
case, you could try to confirm it with even smaller query:

       SELECT
          id2,
          FIRST_VALUE(attr) AS attr
        FROM table2
        GROUP BY id2

Best,
Piotrek

śr., 14 kwi 2021 o 14:45 Dylan Forciea <dy...@oseberg.io> napisał(a):

> I am running Flink 1.12.2, and I was trying to up the parallelism of my
> Flink SQL job to see what happened. However, once I did that, my results
> became nondeterministic. This happens whether I set the
> table.exec.resource.default-parallelism config option or I set the default
> local parallelism to something higher than 1. I would end up with less
> records in the end, and each time I ran the output record count would come
> out differently.
>
>
>
> I managed to distill an example, as pasted below (with attribute names
> changed to protect company proprietary info), that causes the issue. I feel
> like I managed to get it to happen with a LEFT JOIN rather than a FULL
> JOIN, but the distilled version wasn’t giving me wrong results with that.
> Maybe it has to do with joining to a table that was formed using a GROUP
> BY? Can somebody tell if I’m doing something that is known not to work, or
> if I have run across a bug?
>
>
>
> Regards,
>
> Dylan Forciea
>
>
>
>
>
> object Job {
>
>   def main(args: Array[String]): Unit = {
>
>     StreamExecutionEnvironment.setDefaultLocalParallelism(1)
>
>
>
>     val settings = EnvironmentSettings
> .newInstance().useBlinkPlanner().inStreamingMode().build()
>
>     val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
>     val streamTableEnv = StreamTableEnvironment.create(streamEnv,
> settings)
>
>
>
>     val configuration = streamTableEnv.getConfig().getConfiguration()
>
>     configuration.setInteger("table.exec.resource.default-parallelism", 16
> )
>
>
>
>     streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
>
>
>     streamTableEnv.executeSql(
>
>       """
>
>       CREATE TABLE table1 (
>
>         id1 STRING PRIMARY KEY NOT ENFORCED,
>
>         attr STRING
>
>       ) WITH (
>
>         'connector' = 'jdbc',
>
>         'url' = 'jdbc:postgresql://…',
>
>         'table-name' = 'table1’,
>
>         'username' = 'username',
>
>         'password' = 'password',
>
>         'scan.fetch-size' = '500',
>
>         'scan.auto-commit' = 'false'
>
>       )""")
>
>
>
>     streamTableEnv.executeSql(
>
>       """
>
>       CREATE TABLE table2 (
>
>         attr STRING,
>
>         id2 STRING
>
>       ) WITH (
>
>         'connector' = 'jdbc',
>
>         'url' = 'jdbc:postgresql://…',
>
>         'table-name' = 'table2',
>
>         'username' = 'username',
>
>         'password' = 'password',
>
>         'scan.fetch-size' = '500',
>
>         'scan.auto-commit' = 'false'
>
>       )""")
>
>
>
>     streamTableEnv.executeSql(
>
>       """
>
>       CREATE TABLE table3 (
>
>         attr STRING PRIMARY KEY NOT ENFORCED,
>
>         attr_mapped STRING
>
>       ) WITH (
>
>         'connector' = 'jdbc',
>
>         'url' = 'jdbc:postgresql://…',
>
>         'table-name' = ‘table3',
>
>         'username' = ‘username',
>
>         'password' = 'password',
>
>         'scan.fetch-size' = '500',
>
>         'scan.auto-commit' = 'false'
>
>       )""")
>
>
>
>     streamTableEnv.executeSql("""
>
>       CREATE TABLE sink (
>
>         id STRING PRIMARY KEY NOT ENFORCED,
>
>         attr STRING,
>
>         attr_mapped STRING
>
>       ) WITH (
>
>         'connector' = 'jdbc',
>
>         'url' = 'jdbc:postgresql://…,
>
>         'table-name' = 'sink',
>
>         'username' = 'username',
>
>         'password' = 'password',
>
>         'scan.fetch-size' = '500',
>
>         'scan.auto-commit' = 'false'
>
>       )""")
>
>
>
>     val view =
>
>       streamTableEnv.sqlQuery("""
>
>       SELECT
>
>         COALESCE(t1.id1, t2.id2) AS id,
>
>         COALESCE(t2.attr, t1.attr) AS operator,
>
>         COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
>
>       FROM table1 t1
>
>       FULL JOIN (
>
>         SELECT
>
>           id2,
>
>           FIRST_VALUE(attr) AS attr
>
>         FROM table2
>
>         GROUP BY id2
>
>       ) t2
>
>        ON (t1.id1 = t2.id2)
>
>       LEFT JOIN table3 t3
>
>         ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
>
>     streamTableEnv.createTemporaryView("view", view)
>
>
>
>     val statementSet = streamTableEnv.createStatementSet()
>
>     statementSet.addInsertSql("""
>
>       INSERT INTO sink SELECT * FROM view
>
>     """)
>
>
>
>     statementSet.execute().await()
>
>   }
>
> }
>
>
>
>
>

Reply via email to