Hi Dylan,

But if you are running your query in Streaming mode, aren't you counting
retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, when
the first record comes in it will be immediately emitted with NULLs (not
matched, as the other table is empty). Later if a matching record is
received from the second table, the previous result will be retracted and
the new one, updated, will be re-emitted. Maybe this is what you are
observing in the varying output?

Maybe you could try to analyse how the results differ between different
runs?

Best,
Piotrek

śr., 14 kwi 2021 o 16:22 Dylan Forciea <dy...@oseberg.io> napisał(a):

> I replaced the FIRST_VALUE with MAX to ensure that the results should be
> identical even in their content, and my problem still remains – I end up
> with a nondeterministic count of records being emitted into the sink when
> the parallelism is over 1, and that count is about 20-25% short (and not
> consistent) of what comes out consistently when parallelism is set to 1.
>
>
>
> Dylan
>
>
>
> *From: *Dylan Forciea <dy...@oseberg.io>
> *Date: *Wednesday, April 14, 2021 at 9:08 AM
> *To: *Piotr Nowojski <pnowoj...@apache.org>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Nondeterministic results with SQL job when parallelism is
> > 1
>
>
>
> Pitorek,
>
>
>
> I was actually originally using a group function that WAS deterministic
> (but was a custom UDF I made), but chose something here built in. By
> non-deterministic, I mean that the number of records coming out is not
> consistent. Since the FIRST_VALUE here is on an attribute that is not part
> of the key, that shouldn’t affect the number of records coming out I
> wouldn’t think.
>
>
>
> Dylan
>
>
>
> *From: *Piotr Nowojski <pnowoj...@apache.org>
> *Date: *Wednesday, April 14, 2021 at 9:06 AM
> *To: *Dylan Forciea <dy...@oseberg.io>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Nondeterministic results with SQL job when parallelism is
> > 1
>
>
>
> Hi,
>
>
>
> Yes, it looks like your query is non deterministic because of
> `FIRST_VALUE` used inside `GROUP BY`. If you have many different parallel
> sources, each time you run your query your first value might be different.
> If that's the case, you could try to confirm it with even smaller query:
>
>
>
>        SELECT
>           id2,
>           FIRST_VALUE(attr) AS attr
>         FROM table2
>         GROUP BY id2
>
>
>
> Best,
>
> Piotrek
>
>
>
> śr., 14 kwi 2021 o 14:45 Dylan Forciea <dy...@oseberg.io> napisał(a):
>
> I am running Flink 1.12.2, and I was trying to up the parallelism of my
> Flink SQL job to see what happened. However, once I did that, my results
> became nondeterministic. This happens whether I set the
> table.exec.resource.default-parallelism config option or I set the default
> local parallelism to something higher than 1. I would end up with less
> records in the end, and each time I ran the output record count would come
> out differently.
>
>
>
> I managed to distill an example, as pasted below (with attribute names
> changed to protect company proprietary info), that causes the issue. I feel
> like I managed to get it to happen with a LEFT JOIN rather than a FULL
> JOIN, but the distilled version wasn’t giving me wrong results with that.
> Maybe it has to do with joining to a table that was formed using a GROUP
> BY? Can somebody tell if I’m doing something that is known not to work, or
> if I have run across a bug?
>
>
>
> Regards,
>
> Dylan Forciea
>
>
>
>
>
> object Job {
>
>   def main(args: Array[String]): Unit = {
>
>     StreamExecutionEnvironment.setDefaultLocalParallelism(1)
>
>
>
>     val settings = EnvironmentSettings
> .newInstance().useBlinkPlanner().inStreamingMode().build()
>
>     val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
>     val streamTableEnv = StreamTableEnvironment.create(streamEnv,
> settings)
>
>
>
>     val configuration = streamTableEnv.getConfig().getConfiguration()
>
>     configuration.setInteger("table.exec.resource.default-parallelism", 16
> )
>
>
>
>     streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
>
>
>     streamTableEnv.executeSql(
>
>       """
>
>       CREATE TABLE table1 (
>
>         id1 STRING PRIMARY KEY NOT ENFORCED,
>
>         attr STRING
>
>       ) WITH (
>
>         'connector' = 'jdbc',
>
>         'url' = 'jdbc:postgresql://…',
>
>         'table-name' = 'table1’,
>
>         'username' = 'username',
>
>         'password' = 'password',
>
>         'scan.fetch-size' = '500',
>
>         'scan.auto-commit' = 'false'
>
>       )""")
>
>
>
>     streamTableEnv.executeSql(
>
>       """
>
>       CREATE TABLE table2 (
>
>         attr STRING,
>
>         id2 STRING
>
>       ) WITH (
>
>         'connector' = 'jdbc',
>
>         'url' = 'jdbc:postgresql://…',
>
>         'table-name' = 'table2',
>
>         'username' = 'username',
>
>         'password' = 'password',
>
>         'scan.fetch-size' = '500',
>
>         'scan.auto-commit' = 'false'
>
>       )""")
>
>
>
>     streamTableEnv.executeSql(
>
>       """
>
>       CREATE TABLE table3 (
>
>         attr STRING PRIMARY KEY NOT ENFORCED,
>
>         attr_mapped STRING
>
>       ) WITH (
>
>         'connector' = 'jdbc',
>
>         'url' = 'jdbc:postgresql://…',
>
>         'table-name' = ‘table3',
>
>         'username' = ‘username',
>
>         'password' = 'password',
>
>         'scan.fetch-size' = '500',
>
>         'scan.auto-commit' = 'false'
>
>       )""")
>
>
>
>     streamTableEnv.executeSql("""
>
>       CREATE TABLE sink (
>
>         id STRING PRIMARY KEY NOT ENFORCED,
>
>         attr STRING,
>
>         attr_mapped STRING
>
>       ) WITH (
>
>         'connector' = 'jdbc',
>
>         'url' = 'jdbc:postgresql://…,
>
>         'table-name' = 'sink',
>
>         'username' = 'username',
>
>         'password' = 'password',
>
>         'scan.fetch-size' = '500',
>
>         'scan.auto-commit' = 'false'
>
>       )""")
>
>
>
>     val view =
>
>       streamTableEnv.sqlQuery("""
>
>       SELECT
>
>         COALESCE(t1.id1, t2.id2) AS id,
>
>         COALESCE(t2.attr, t1.attr) AS operator,
>
>         COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
>
>       FROM table1 t1
>
>       FULL JOIN (
>
>         SELECT
>
>           id2,
>
>           FIRST_VALUE(attr) AS attr
>
>         FROM table2
>
>         GROUP BY id2
>
>       ) t2
>
>        ON (t1.id1 = t2.id2)
>
>       LEFT JOIN table3 t3
>
>         ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
>
>     streamTableEnv.createTemporaryView("view", view)
>
>
>
>     val statementSet = streamTableEnv.createStatementSet()
>
>     statementSet.addInsertSql("""
>
>       INSERT INTO sink SELECT * FROM view
>
>     """)
>
>
>
>     statementSet.execute().await()
>
>   }
>
> }
>
>
>
>
>
>

Reply via email to