HI Dylan, I think this has the same reason as https://issues.apache.org/jira/browse/FLINK-20374. The root cause is that changelogs are shuffled by `attr` at second join, and thus records with the same `id` will be shuffled to different join tasks (also different sink tasks). So the data arrived at sinks are not ordered on the sink primary key.
We may need something like primary key ordering mechanism in the whole planner to fix this. Best, Jark On Thu, 15 Apr 2021 at 01:33, Dylan Forciea <dy...@oseberg.io> wrote: > On a side note - I changed to use the batch mode per your suggestion Timo, > and my job ran much faster and with deterministic counts with parallelism > turned on. So I'll probably utilize that for now. However, it would still > be nice to dig down into why streaming isn't working in case I need that in > the future. > > Dylan > > On 4/14/21, 10:27 AM, "Dylan Forciea" <dy...@oseberg.io> wrote: > > Timo, > > Here is the plan (hopefully I properly cleansed it of company > proprietary info without garbling it) > > Dylan > > == Abstract Syntax Tree == > LogicalSink(table=[default_catalog.default_database.sink], fields=[id, > attr, attr_mapped]) > +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS > NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT > NULL($3), $3, $1)]) > +- LogicalJoin(condition=[=($4, $5)], joinType=[left]) > :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], > $f4=[CASE(IS NOT NULL($3), $3, $1)]) > : +- LogicalJoin(condition=[=($0, $2)], joinType=[full]) > : :- LogicalTableScan(table=[[default_catalog, > default_database, table1]]) > : +- LogicalAggregate(group=[{0}], attr=[MAX($1)]) > : +- LogicalProject(id2=[$1], attr=[$0]) > : +- LogicalTableScan(table=[[default_catalog, > default_database, table2]]) > +- LogicalTableScan(table=[[default_catalog, default_database, > table3]]) > > == Optimized Logical Plan == > Sink(table=[default_catalog.default_database.sink], fields=[id, attr, > attr_mapped], changelogMode=[NONE]) > +- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT > NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), > attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped], > changelogMode=[I,UB,UA,D]) > +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], > select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], > leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], > changelogMode=[I,UB,UA,D]) > :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D]) > : +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT > NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D]) > : +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], > select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D]) > : :- Exchange(distribution=[hash[id1]], changelogMode=[I]) > : : +- TableSourceScan(table=[[default_catalog, > default_database, table1]], fields=[id1, attr], changelogMode=[I]) > : +- Exchange(distribution=[hash[id2]], > changelogMode=[I,UB,UA]) > : +- GroupAggregate(groupBy=[id2], select=[id2, > MAX(attr) AS attr], changelogMode=[I,UB,UA]) > : +- Exchange(distribution=[hash[id2]], > changelogMode=[I]) > : +- TableSourceScan(table=[[default_catalog, > default_database, table2]], fields=[attr, id2], changelogMode=[I]) > +- Exchange(distribution=[hash[attr]], changelogMode=[I]) > +- TableSourceScan(table=[[default_catalog, default_database, > table3]], fields=[attr, attr_mapped], changelogMode=[I]) > > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: TableSourceScan(table=[[default_catalog, > default_database, table1]], fields=[id1, attr]) > > Stage 3 : Data Source > content : Source: TableSourceScan(table=[[default_catalog, > default_database, table2]], fields=[attr, id2]) > > Stage 5 : Attr > content : GroupAggregate(groupBy=[id2], select=[id2, > MAX(attr) AS attr]) > ship_strategy : HASH > > Stage 7 : Attr > content : Join(joinType=[FullOuterJoin], > where=[(id1 = id2)], select=[id1, attr, id2, attr0], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey]) > ship_strategy : HASH > > Stage 8 : Attr > content : Calc(select=[id1, attr, id2, > attr0, (attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4]) > ship_strategy : FORWARD > > Stage 10 : Data Source > content : Source: TableSourceScan(table=[[default_catalog, > default_database, table3]], fields=[attr, attr_mapped]) > > Stage 12 : Attr > content : Join(joinType=[LeftOuterJoin], where=[($f4 = > attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], > leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) > ship_strategy : HASH > > Stage 13 : Attr > content : Calc(select=[(id1 IS NOT NULL CASE id1 > CASE id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, > (attr_mapped IS NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 > CASE attr) AS attr_mapped]) > ship_strategy : FORWARD > > Stage 14 : Data Sink > content : Sink: > Sink(table=[default_catalog.default_database.sink], fields=[id, attr, > attr_mapped]) > ship_strategy : FORWARD > > On 4/14/21, 10:08 AM, "Timo Walther" <twal...@apache.org> wrote: > > Can you share the resulting plan with us? Ideally with the > ChangelogMode > detail enabled as well. > > statementSet.explain(...) > > Maybe this could help. > > Regards, > Timo > > > > On 14.04.21 16:47, Dylan Forciea wrote: > > Piotrek, > > > > I am looking at the count of records present in the sink table > in > > Postgres after the entire job completes, not the number of > > inserts/retracts. I can see as the job runs that records are > added and > > removed from the “sink” table. With parallelism set to 1, it > always > > comes out to the same number (which is consistent with the > number of ids > > in the source tables “table1” and “table2”), at about 491k > records in > > table “sink” when the job is complete. With the parallelism set > to 16, > > the “sink” table will have somewhere around 360k records +/- 20k > when > > the job is complete. I truncate the “sink” table before I run > the job, > > and this is a test environment where the source databases are > static. > > > > I removed my line for setting to Batch mode per Timo’s > suggestion, and > > am still running with MAX which should have deterministic output. > > > > Dylan > > > > *From: *Piotr Nowojski <pnowoj...@apache.org> > > *Date: *Wednesday, April 14, 2021 at 9:38 AM > > *To: *Dylan Forciea <dy...@oseberg.io> > > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > > *Subject: *Re: Nondeterministic results with SQL job when > parallelism is > 1 > > > > Hi Dylan, > > > > But if you are running your query in Streaming mode, aren't you > counting > > retractions from the FULL JOIN? AFAIK in Streaming mode in FULL > JOIN, > > when the first record comes in it will be immediately emitted > with NULLs > > (not matched, as the other table is empty). Later if a matching > record > > is received from the second table, the previous result will be > retracted > > and the new one, updated, will be re-emitted. Maybe this is what > you are > > observing in the varying output? > > > > Maybe you could try to analyse how the results differ between > different > > runs? > > > > Best, > > > > Piotrek > > > > śr., 14 kwi 2021 o 16:22 Dylan Forciea <dy...@oseberg.io > > <mailto:dy...@oseberg.io>> napisał(a): > > > > I replaced the FIRST_VALUE with MAX to ensure that the > results > > should be identical even in their content, and my problem > still > > remains – I end up with a nondeterministic count of records > being > > emitted into the sink when the parallelism is over 1, and > that count > > is about 20-25% short (and not consistent) of what comes out > > consistently when parallelism is set to 1. > > > > Dylan > > > > *From: *Dylan Forciea <dy...@oseberg.io <mailto: > dy...@oseberg.io>> > > *Date: *Wednesday, April 14, 2021 at 9:08 AM > > *To: *Piotr Nowojski <pnowoj...@apache.org > > <mailto:pnowoj...@apache.org>> > > *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>" > > <user@flink.apache.org <mailto:user@flink.apache.org>> > > *Subject: *Re: Nondeterministic results with SQL job when > > parallelism is > 1 > > > > Pitorek, > > > > I was actually originally using a group function that WAS > > deterministic (but was a custom UDF I made), but chose > something > > here built in. By non-deterministic, I mean that the number > of > > records coming out is not consistent. Since the FIRST_VALUE > here is > > on an attribute that is not part of the key, that shouldn’t > affect > > the number of records coming out I wouldn’t think. > > > > Dylan > > > > *From: *Piotr Nowojski <pnowoj...@apache.org > > <mailto:pnowoj...@apache.org>> > > *Date: *Wednesday, April 14, 2021 at 9:06 AM > > *To: *Dylan Forciea <dy...@oseberg.io <mailto: > dy...@oseberg.io>> > > *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>" > > <user@flink.apache.org <mailto:user@flink.apache.org>> > > *Subject: *Re: Nondeterministic results with SQL job when > > parallelism is > 1 > > > > Hi, > > > > Yes, it looks like your query is non deterministic because of > > `FIRST_VALUE` used inside `GROUP BY`. If you have many > different > > parallel sources, each time you run your query your first > value > > might be different. If that's the case, you could try to > confirm it > > with even smaller query: > > > > SELECT > > id2, > > FIRST_VALUE(attr) AS attr > > FROM table2 > > GROUP BY id2 > > > > Best, > > > > Piotrek > > > > śr., 14 kwi 2021 o 14:45 Dylan Forciea <dy...@oseberg.io > > <mailto:dy...@oseberg.io>> napisał(a): > > > > I am running Flink 1.12.2, and I was trying to up the > > parallelism of my Flink SQL job to see what happened. > However, > > once I did that, my results became nondeterministic. This > > happens whether I set the > > table.exec.resource.default-parallelism config option or > I set > > the default local parallelism to something higher than > 1. I > > would end up with less records in the end, and each time > I ran > > the output record count would come out differently. > > > > I managed to distill an example, as pasted below (with > attribute > > names changed to protect company proprietary info), that > causes > > the issue. I feel like I managed to get it to happen > with a LEFT > > JOIN rather than a FULL JOIN, but the distilled version > wasn’t > > giving me wrong results with that. Maybe it has to do > with > > joining to a table that was formed using a GROUP BY? Can > > somebody tell if I’m doing something that is known not > to work, > > or if I have run across a bug? > > > > Regards, > > > > Dylan Forciea > > > > objectJob{ > > > > defmain(args: Array[String]): Unit= { > > > > StreamExecutionEnvironment.setDefaultLocalParallelism(1) > > > > valsettings= > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > > > > valstreamEnv= > StreamExecutionEnvironment.getExecutionEnvironment > > > > valstreamTableEnv= > StreamTableEnvironment.create(streamEnv, > > settings) > > > > valconfiguration= > streamTableEnv.getConfig().getConfiguration() > > > > > > > configuration.setInteger("table.exec.resource.default-parallelism", > > 16) > > > > > streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); > > > > streamTableEnv.executeSql( > > > > """ > > > > CREATE TABLE table1 ( > > > > id1 STRING PRIMARY KEY NOT ENFORCED, > > > > attr STRING > > > > ) WITH ( > > > > 'connector' = 'jdbc', > > > > 'url' = 'jdbc:postgresql://…', > > > > 'table-name' = 'table1’, > > > > 'username' = 'username', > > > > 'password' = 'password', > > > > 'scan.fetch-size' = '500', > > > > 'scan.auto-commit' = 'false' > > > > )""") > > > > streamTableEnv.executeSql( > > > > """ > > > > CREATE TABLE table2 ( > > > > attr STRING, > > > > id2 STRING > > > > ) WITH ( > > > > 'connector' = 'jdbc', > > > > 'url' = 'jdbc:postgresql://…', > > > > 'table-name' = 'table2', > > > > 'username' = 'username', > > > > 'password' = 'password', > > > > 'scan.fetch-size' = '500', > > > > 'scan.auto-commit' = 'false' > > > > )""") > > > > streamTableEnv.executeSql( > > > > """ > > > > CREATE TABLE table3 ( > > > > attr STRING PRIMARY KEY NOT ENFORCED, > > > > attr_mapped STRING > > > > ) WITH ( > > > > 'connector' = 'jdbc', > > > > 'url' = 'jdbc:postgresql://…', > > > > 'table-name' = ‘table3', > > > > 'username' = ‘username', > > > > 'password' = 'password', > > > > 'scan.fetch-size' = '500', > > > > 'scan.auto-commit' = 'false' > > > > )""") > > > > streamTableEnv.executeSql(""" > > > > CREATE TABLE sink ( > > > > id STRING PRIMARY KEY NOT ENFORCED, > > > > attr STRING, > > > > attr_mapped STRING > > > > ) WITH ( > > > > 'connector' = 'jdbc', > > > > 'url' = 'jdbc:postgresql://…, > > > > 'table-name' = 'sink', > > > > 'username' = 'username', > > > > 'password' = 'password', > > > > 'scan.fetch-size' = '500', > > > > 'scan.auto-commit' = 'false' > > > > )""") > > > > valview= > > > > streamTableEnv.sqlQuery(""" > > > > SELECT > > > > COALESCE(t1.id1, t2.id2) AS id, > > > > COALESCE(t2.attr, t1.attr) AS attr, > > > > COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS > attr_mapped > > > > FROM table1 t1 > > > > FULL JOIN ( > > > > SELECT > > > > id2, > > > > FIRST_VALUE(attr) AS attr > > > > FROM table2 > > > > GROUP BY id2 > > > > ) t2 > > > > ON (t1.id1 = t2.id2) > > > > LEFT JOIN table3 t3 > > > > ON (COALESCE(t2.attr, t1.attr) = t3.attr)""") > > > > streamTableEnv.createTemporaryView("view", view) > > > > valstatementSet= streamTableEnv.createStatementSet() > > > > statementSet.addInsertSql(""" > > > > INSERT INTO sink SELECT * FROM view > > > > """) > > > > statementSet.execute().await() > > > > } > > > > } > > > > > >