Timo, Here is the plan (hopefully I properly cleansed it of company proprietary info without garbling it)
Dylan == Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped]) +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, $1)]) +- LogicalJoin(condition=[=($4, $5)], joinType=[left]) :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], $f4=[CASE(IS NOT NULL($3), $3, $1)]) : +- LogicalJoin(condition=[=($0, $2)], joinType=[full]) : :- LogicalTableScan(table=[[default_catalog, default_database, table1]]) : +- LogicalAggregate(group=[{0}], attr=[MAX($1)]) : +- LogicalProject(id2=[$1], attr=[$0]) : +- LogicalTableScan(table=[[default_catalog, default_database, table2]]) +- LogicalTableScan(table=[[default_catalog, default_database, table3]]) == Optimized Logical Plan == Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped], changelogMode=[NONE]) +- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D]) +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D]) :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D]) : +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D]) : +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D]) : :- Exchange(distribution=[hash[id1]], changelogMode=[I]) : : +- TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr], changelogMode=[I]) : +- Exchange(distribution=[hash[id2]], changelogMode=[I,UB,UA]) : +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr], changelogMode=[I,UB,UA]) : +- Exchange(distribution=[hash[id2]], changelogMode=[I]) : +- TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2], changelogMode=[I]) +- Exchange(distribution=[hash[attr]], changelogMode=[I]) +- TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped], changelogMode=[I]) == Physical Execution Plan == Stage 1 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, table1]], fields=[id1, attr]) Stage 3 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, table2]], fields=[attr, id2]) Stage 5 : Attr content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS attr]) ship_strategy : HASH Stage 7 : Attr content : Join(joinType=[FullOuterJoin], where=[(id1 = id2)], select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ship_strategy : HASH Stage 8 : Attr content : Calc(select=[id1, attr, id2, attr0, (attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4]) ship_strategy : FORWARD Stage 10 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, table3]], fields=[attr, attr_mapped]) Stage 12 : Attr content : Join(joinType=[LeftOuterJoin], where=[($f4 = attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) ship_strategy : HASH Stage 13 : Attr content : Calc(select=[(id1 IS NOT NULL CASE id1 CASE id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, (attr_mapped IS NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 CASE attr) AS attr_mapped]) ship_strategy : FORWARD Stage 14 : Data Sink content : Sink: Sink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped]) ship_strategy : FORWARD On 4/14/21, 10:08 AM, "Timo Walther" <twal...@apache.org> wrote: Can you share the resulting plan with us? Ideally with the ChangelogMode detail enabled as well. statementSet.explain(...) Maybe this could help. Regards, Timo On 14.04.21 16:47, Dylan Forciea wrote: > Piotrek, > > I am looking at the count of records present in the sink table in > Postgres after the entire job completes, not the number of > inserts/retracts. I can see as the job runs that records are added and > removed from the “sink” table. With parallelism set to 1, it always > comes out to the same number (which is consistent with the number of ids > in the source tables “table1” and “table2”), at about 491k records in > table “sink” when the job is complete. With the parallelism set to 16, > the “sink” table will have somewhere around 360k records +/- 20k when > the job is complete. I truncate the “sink” table before I run the job, > and this is a test environment where the source databases are static. > > I removed my line for setting to Batch mode per Timo’s suggestion, and > am still running with MAX which should have deterministic output. > > Dylan > > *From: *Piotr Nowojski <pnowoj...@apache.org> > *Date: *Wednesday, April 14, 2021 at 9:38 AM > *To: *Dylan Forciea <dy...@oseberg.io> > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Nondeterministic results with SQL job when parallelism is > 1 > > Hi Dylan, > > But if you are running your query in Streaming mode, aren't you counting > retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, > when the first record comes in it will be immediately emitted with NULLs > (not matched, as the other table is empty). Later if a matching record > is received from the second table, the previous result will be retracted > and the new one, updated, will be re-emitted. Maybe this is what you are > observing in the varying output? > > Maybe you could try to analyse how the results differ between different > runs? > > Best, > > Piotrek > > śr., 14 kwi 2021 o 16:22 Dylan Forciea <dy...@oseberg.io > <mailto:dy...@oseberg.io>> napisał(a): > > I replaced the FIRST_VALUE with MAX to ensure that the results > should be identical even in their content, and my problem still > remains – I end up with a nondeterministic count of records being > emitted into the sink when the parallelism is over 1, and that count > is about 20-25% short (and not consistent) of what comes out > consistently when parallelism is set to 1. > > Dylan > > *From: *Dylan Forciea <dy...@oseberg.io <mailto:dy...@oseberg.io>> > *Date: *Wednesday, April 14, 2021 at 9:08 AM > *To: *Piotr Nowojski <pnowoj...@apache.org > <mailto:pnowoj...@apache.org>> > *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>" > <user@flink.apache.org <mailto:user@flink.apache.org>> > *Subject: *Re: Nondeterministic results with SQL job when > parallelism is > 1 > > Pitorek, > > I was actually originally using a group function that WAS > deterministic (but was a custom UDF I made), but chose something > here built in. By non-deterministic, I mean that the number of > records coming out is not consistent. Since the FIRST_VALUE here is > on an attribute that is not part of the key, that shouldn’t affect > the number of records coming out I wouldn’t think. > > Dylan > > *From: *Piotr Nowojski <pnowoj...@apache.org > <mailto:pnowoj...@apache.org>> > *Date: *Wednesday, April 14, 2021 at 9:06 AM > *To: *Dylan Forciea <dy...@oseberg.io <mailto:dy...@oseberg.io>> > *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>" > <user@flink.apache.org <mailto:user@flink.apache.org>> > *Subject: *Re: Nondeterministic results with SQL job when > parallelism is > 1 > > Hi, > > Yes, it looks like your query is non deterministic because of > `FIRST_VALUE` used inside `GROUP BY`. If you have many different > parallel sources, each time you run your query your first value > might be different. If that's the case, you could try to confirm it > with even smaller query: > > SELECT > id2, > FIRST_VALUE(attr) AS attr > FROM table2 > GROUP BY id2 > > Best, > > Piotrek > > śr., 14 kwi 2021 o 14:45 Dylan Forciea <dy...@oseberg.io > <mailto:dy...@oseberg.io>> napisał(a): > > I am running Flink 1.12.2, and I was trying to up the > parallelism of my Flink SQL job to see what happened. However, > once I did that, my results became nondeterministic. This > happens whether I set the > table.exec.resource.default-parallelism config option or I set > the default local parallelism to something higher than 1. I > would end up with less records in the end, and each time I ran > the output record count would come out differently. > > I managed to distill an example, as pasted below (with attribute > names changed to protect company proprietary info), that causes > the issue. I feel like I managed to get it to happen with a LEFT > JOIN rather than a FULL JOIN, but the distilled version wasn’t > giving me wrong results with that. Maybe it has to do with > joining to a table that was formed using a GROUP BY? Can > somebody tell if I’m doing something that is known not to work, > or if I have run across a bug? > > Regards, > > Dylan Forciea > > objectJob{ > > defmain(args: Array[String]): Unit= { > > StreamExecutionEnvironment.setDefaultLocalParallelism(1) > > valsettings= > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > > valstreamEnv= StreamExecutionEnvironment.getExecutionEnvironment > > valstreamTableEnv= StreamTableEnvironment.create(streamEnv, > settings) > > valconfiguration= streamTableEnv.getConfig().getConfiguration() > > > configuration.setInteger("table.exec.resource.default-parallelism", > 16) > > streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); > > streamTableEnv.executeSql( > > """ > > CREATE TABLE table1 ( > > id1 STRING PRIMARY KEY NOT ENFORCED, > > attr STRING > > ) WITH ( > > 'connector' = 'jdbc', > > 'url' = 'jdbc:postgresql://…', > > 'table-name' = 'table1’, > > 'username' = 'username', > > 'password' = 'password', > > 'scan.fetch-size' = '500', > > 'scan.auto-commit' = 'false' > > )""") > > streamTableEnv.executeSql( > > """ > > CREATE TABLE table2 ( > > attr STRING, > > id2 STRING > > ) WITH ( > > 'connector' = 'jdbc', > > 'url' = 'jdbc:postgresql://…', > > 'table-name' = 'table2', > > 'username' = 'username', > > 'password' = 'password', > > 'scan.fetch-size' = '500', > > 'scan.auto-commit' = 'false' > > )""") > > streamTableEnv.executeSql( > > """ > > CREATE TABLE table3 ( > > attr STRING PRIMARY KEY NOT ENFORCED, > > attr_mapped STRING > > ) WITH ( > > 'connector' = 'jdbc', > > 'url' = 'jdbc:postgresql://…', > > 'table-name' = ‘table3', > > 'username' = ‘username', > > 'password' = 'password', > > 'scan.fetch-size' = '500', > > 'scan.auto-commit' = 'false' > > )""") > > streamTableEnv.executeSql(""" > > CREATE TABLE sink ( > > id STRING PRIMARY KEY NOT ENFORCED, > > attr STRING, > > attr_mapped STRING > > ) WITH ( > > 'connector' = 'jdbc', > > 'url' = 'jdbc:postgresql://…, > > 'table-name' = 'sink', > > 'username' = 'username', > > 'password' = 'password', > > 'scan.fetch-size' = '500', > > 'scan.auto-commit' = 'false' > > )""") > > valview= > > streamTableEnv.sqlQuery(""" > > SELECT > > COALESCE(t1.id1, t2.id2) AS id, > > COALESCE(t2.attr, t1.attr) AS attr, > > COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped > > FROM table1 t1 > > FULL JOIN ( > > SELECT > > id2, > > FIRST_VALUE(attr) AS attr > > FROM table2 > > GROUP BY id2 > > ) t2 > > ON (t1.id1 = t2.id2) > > LEFT JOIN table3 t3 > > ON (COALESCE(t2.attr, t1.attr) = t3.attr)""") > > streamTableEnv.createTemporaryView("view", view) > > valstatementSet= streamTableEnv.createStatementSet() > > statementSet.addInsertSql(""" > > INSERT INTO sink SELECT * FROM view > > """) > > statementSet.execute().await() > > } > > } >