Hi Dylan, The primary key ordering problem I mean above is about changelog. Batch queries only emit a final result, and thus don't have changelog, so it's safe to use batch mode.
The problem only exists in streaming mode with more than 1 parallelism. Best, Jark On Fri, 16 Apr 2021 at 21:40, Dylan Forciea <dy...@oseberg.io> wrote: > Jark, > > > > Thanks for the heads up! I didn’t see this behavior when running in batch > mode with parallelism turned on. Is it safe to do this kind of join in > batch mode right now, or am I just getting lucky? > > > > Dylan > > > > *From: *Jark Wu <imj...@gmail.com> > *Date: *Friday, April 16, 2021 at 5:10 AM > *To: *Dylan Forciea <dy...@oseberg.io> > *Cc: *Timo Walther <twal...@apache.org>, Piotr Nowojski < > pnowoj...@apache.org>, "user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Nondeterministic results with SQL job when parallelism is > > 1 > > > > HI Dylan, > > > > I think this has the same reason as > https://issues.apache.org/jira/browse/FLINK-20374. > > The root cause is that changelogs are shuffled by `attr` at second join, > > and thus records with the same `id` will be shuffled to different join > tasks (also different sink tasks). > > So the data arrived at sinks are not ordered on the sink primary key. > > > > We may need something like primary key ordering mechanism in the whole > planner to fix this. > > > > Best, > > Jark > > > > On Thu, 15 Apr 2021 at 01:33, Dylan Forciea <dy...@oseberg.io> wrote: > > On a side note - I changed to use the batch mode per your suggestion Timo, > and my job ran much faster and with deterministic counts with parallelism > turned on. So I'll probably utilize that for now. However, it would still > be nice to dig down into why streaming isn't working in case I need that in > the future. > > Dylan > > On 4/14/21, 10:27 AM, "Dylan Forciea" <dy...@oseberg.io> wrote: > > Timo, > > Here is the plan (hopefully I properly cleansed it of company > proprietary info without garbling it) > > Dylan > > == Abstract Syntax Tree == > LogicalSink(table=[default_catalog.default_database.sink], fields=[id, > attr, attr_mapped]) > +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS > NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT > NULL($3), $3, $1)]) > +- LogicalJoin(condition=[=($4, $5)], joinType=[left]) > :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], > $f4=[CASE(IS NOT NULL($3), $3, $1)]) > : +- LogicalJoin(condition=[=($0, $2)], joinType=[full]) > : :- LogicalTableScan(table=[[default_catalog, > default_database, table1]]) > : +- LogicalAggregate(group=[{0}], attr=[MAX($1)]) > : +- LogicalProject(id2=[$1], attr=[$0]) > : +- LogicalTableScan(table=[[default_catalog, > default_database, table2]]) > +- LogicalTableScan(table=[[default_catalog, default_database, > table3]]) > > == Optimized Logical Plan == > Sink(table=[default_catalog.default_database.sink], fields=[id, attr, > attr_mapped], changelogMode=[NONE]) > +- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT > NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), > attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped], > changelogMode=[I,UB,UA,D]) > +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], > select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], > leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], > changelogMode=[I,UB,UA,D]) > :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D]) > : +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT > NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D]) > : +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], > select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D]) > : :- Exchange(distribution=[hash[id1]], changelogMode=[I]) > : : +- TableSourceScan(table=[[default_catalog, > default_database, table1]], fields=[id1, attr], changelogMode=[I]) > : +- Exchange(distribution=[hash[id2]], > changelogMode=[I,UB,UA]) > : +- GroupAggregate(groupBy=[id2], select=[id2, > MAX(attr) AS attr], changelogMode=[I,UB,UA]) > : +- Exchange(distribution=[hash[id2]], > changelogMode=[I]) > : +- TableSourceScan(table=[[default_catalog, > default_database, table2]], fields=[attr, id2], changelogMode=[I]) > +- Exchange(distribution=[hash[attr]], changelogMode=[I]) > +- TableSourceScan(table=[[default_catalog, default_database, > table3]], fields=[attr, attr_mapped], changelogMode=[I]) > > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: TableSourceScan(table=[[default_catalog, > default_database, table1]], fields=[id1, attr]) > > Stage 3 : Data Source > content : Source: TableSourceScan(table=[[default_catalog, > default_database, table2]], fields=[attr, id2]) > > Stage 5 : Attr > content : GroupAggregate(groupBy=[id2], select=[id2, > MAX(attr) AS attr]) > ship_strategy : HASH > > Stage 7 : Attr > content : Join(joinType=[FullOuterJoin], > where=[(id1 = id2)], select=[id1, attr, id2, attr0], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey]) > ship_strategy : HASH > > Stage 8 : Attr > content : Calc(select=[id1, attr, id2, > attr0, (attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4]) > ship_strategy : FORWARD > > Stage 10 : Data Source > content : Source: TableSourceScan(table=[[default_catalog, > default_database, table3]], fields=[attr, attr_mapped]) > > Stage 12 : Attr > content : Join(joinType=[LeftOuterJoin], where=[($f4 = > attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], > leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) > ship_strategy : HASH > > Stage 13 : Attr > content : Calc(select=[(id1 IS NOT NULL CASE id1 > CASE id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, > (attr_mapped IS NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 > CASE attr) AS attr_mapped]) > ship_strategy : FORWARD > > Stage 14 : Data Sink > content : Sink: > Sink(table=[default_catalog.default_database.sink], fields=[id, attr, > attr_mapped]) > ship_strategy : FORWARD > > On 4/14/21, 10:08 AM, "Timo Walther" <twal...@apache.org> wrote: > > Can you share the resulting plan with us? Ideally with the > ChangelogMode > detail enabled as well. > > statementSet.explain(...) > > Maybe this could help. > > Regards, > Timo > > > > On 14.04.21 16:47, Dylan Forciea wrote: > > Piotrek, > > > > I am looking at the count of records present in the sink table > in > > Postgres after the entire job completes, not the number of > > inserts/retracts. I can see as the job runs that records are > added and > > removed from the “sink” table. With parallelism set to 1, it > always > > comes out to the same number (which is consistent with the > number of ids > > in the source tables “table1” and “table2”), at about 491k > records in > > table “sink” when the job is complete. With the parallelism set > to 16, > > the “sink” table will have somewhere around 360k records +/- 20k > when > > the job is complete. I truncate the “sink” table before I run > the job, > > and this is a test environment where the source databases are > static. > > > > I removed my line for setting to Batch mode per Timo’s > suggestion, and > > am still running with MAX which should have deterministic output. > > > > Dylan > > > > *From: *Piotr Nowojski <pnowoj...@apache.org> > > *Date: *Wednesday, April 14, 2021 at 9:38 AM > > *To: *Dylan Forciea <dy...@oseberg.io> > > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > > *Subject: *Re: Nondeterministic results with SQL job when > parallelism is > 1 > > > > Hi Dylan, > > > > But if you are running your query in Streaming mode, aren't you > counting > > retractions from the FULL JOIN? AFAIK in Streaming mode in FULL > JOIN, > > when the first record comes in it will be immediately emitted > with NULLs > > (not matched, as the other table is empty). Later if a matching > record > > is received from the second table, the previous result will be > retracted > > and the new one, updated, will be re-emitted. Maybe this is what > you are > > observing in the varying output? > > > > Maybe you could try to analyse how the results differ between > different > > runs? > > > > Best, > > > > Piotrek > > > > śr., 14 kwi 2021 o 16:22 Dylan Forciea <dy...@oseberg.io > > <mailto:dy...@oseberg.io>> napisał(a): > > > > I replaced the FIRST_VALUE with MAX to ensure that the > results > > should be identical even in their content, and my problem > still > > remains – I end up with a nondeterministic count of records > being > > emitted into the sink when the parallelism is over 1, and > that count > > is about 20-25% short (and not consistent) of what comes out > > consistently when parallelism is set to 1. > > > > Dylan > > > > *From: *Dylan Forciea <dy...@oseberg.io <mailto: > dy...@oseberg.io>> > > *Date: *Wednesday, April 14, 2021 at 9:08 AM > > *To: *Piotr Nowojski <pnowoj...@apache.org > > <mailto:pnowoj...@apache.org>> > > *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>" > > <user@flink.apache.org <mailto:user@flink.apache.org>> > > *Subject: *Re: Nondeterministic results with SQL job when > > parallelism is > 1 > > > > Pitorek, > > > > I was actually originally using a group function that WAS > > deterministic (but was a custom UDF I made), but chose > something > > here built in. By non-deterministic, I mean that the number > of > > records coming out is not consistent. Since the FIRST_VALUE > here is > > on an attribute that is not part of the key, that shouldn’t > affect > > the number of records coming out I wouldn’t think. > > > > Dylan > > > > *From: *Piotr Nowojski <pnowoj...@apache.org > > <mailto:pnowoj...@apache.org>> > > *Date: *Wednesday, April 14, 2021 at 9:06 AM > > *To: *Dylan Forciea <dy...@oseberg.io <mailto: > dy...@oseberg.io>> > > *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>" > > <user@flink.apache.org <mailto:user@flink.apache.org>> > > *Subject: *Re: Nondeterministic results with SQL job when > > parallelism is > 1 > > > > Hi, > > > > Yes, it looks like your query is non deterministic because of > > `FIRST_VALUE` used inside `GROUP BY`. If you have many > different > > parallel sources, each time you run your query your first > value > > might be different. If that's the case, you could try to > confirm it > > with even smaller query: > > > > SELECT > > id2, > > FIRST_VALUE(attr) AS attr > > FROM table2 > > GROUP BY id2 > > > > Best, > > > > Piotrek > > > > śr., 14 kwi 2021 o 14:45 Dylan Forciea <dy...@oseberg.io > > <mailto:dy...@oseberg.io>> napisał(a): > > > > I am running Flink 1.12.2, and I was trying to up the > > parallelism of my Flink SQL job to see what happened. > However, > > once I did that, my results became nondeterministic. This > > happens whether I set the > > table.exec.resource.default-parallelism config option or > I set > > the default local parallelism to something higher than > 1. I > > would end up with less records in the end, and each time > I ran > > the output record count would come out differently. > > > > I managed to distill an example, as pasted below (with > attribute > > names changed to protect company proprietary info), that > causes > > the issue. I feel like I managed to get it to happen > with a LEFT > > JOIN rather than a FULL JOIN, but the distilled version > wasn’t > > giving me wrong results with that. Maybe it has to do > with > > joining to a table that was formed using a GROUP BY? Can > > somebody tell if I’m doing something that is known not > to work, > > or if I have run across a bug? > > > > Regards, > > > > Dylan Forciea > > > > objectJob{ > > > > defmain(args: Array[String]): Unit= { > > > > StreamExecutionEnvironment.setDefaultLocalParallelism(1) > > > > valsettings= > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > > > > valstreamEnv= > StreamExecutionEnvironment.getExecutionEnvironment > > > > valstreamTableEnv= > StreamTableEnvironment.create(streamEnv, > > settings) > > > > valconfiguration= > streamTableEnv.getConfig().getConfiguration() > > > > > > > configuration.setInteger("table.exec.resource.default-parallelism", > > 16) > > > > > streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); > > > > streamTableEnv.executeSql( > > > > """ > > > > CREATE TABLE table1 ( > > > > id1 STRING PRIMARY KEY NOT ENFORCED, > > > > attr STRING > > > > ) WITH ( > > > > 'connector' = 'jdbc', > > > > 'url' = 'jdbc:postgresql://…', > > > > 'table-name' = 'table1’, > > > > 'username' = 'username', > > > > 'password' = 'password', > > > > 'scan.fetch-size' = '500', > > > > 'scan.auto-commit' = 'false' > > > > )""") > > > > streamTableEnv.executeSql( > > > > """ > > > > CREATE TABLE table2 ( > > > > attr STRING, > > > > id2 STRING > > > > ) WITH ( > > > > 'connector' = 'jdbc', > > > > 'url' = 'jdbc:postgresql://…', > > > > 'table-name' = 'table2', > > > > 'username' = 'username', > > > > 'password' = 'password', > > > > 'scan.fetch-size' = '500', > > > > 'scan.auto-commit' = 'false' > > > > )""") > > > > streamTableEnv.executeSql( > > > > """ > > > > CREATE TABLE table3 ( > > > > attr STRING PRIMARY KEY NOT ENFORCED, > > > > attr_mapped STRING > > > > ) WITH ( > > > > 'connector' = 'jdbc', > > > > 'url' = 'jdbc:postgresql://…', > > > > 'table-name' = ‘table3', > > > > 'username' = ‘username', > > > > 'password' = 'password', > > > > 'scan.fetch-size' = '500', > > > > 'scan.auto-commit' = 'false' > > > > )""") > > > > streamTableEnv.executeSql(""" > > > > CREATE TABLE sink ( > > > > id STRING PRIMARY KEY NOT ENFORCED, > > > > attr STRING, > > > > attr_mapped STRING > > > > ) WITH ( > > > > 'connector' = 'jdbc', > > > > 'url' = 'jdbc:postgresql://…, > > > > 'table-name' = 'sink', > > > > 'username' = 'username', > > > > 'password' = 'password', > > > > 'scan.fetch-size' = '500', > > > > 'scan.auto-commit' = 'false' > > > > )""") > > > > valview= > > > > streamTableEnv.sqlQuery(""" > > > > SELECT > > > > COALESCE(t1.id1, t2.id2) AS id, > > > > COALESCE(t2.attr, t1.attr) AS attr, > > > > COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS > attr_mapped > > > > FROM table1 t1 > > > > FULL JOIN ( > > > > SELECT > > > > id2, > > > > FIRST_VALUE(attr) AS attr > > > > FROM table2 > > > > GROUP BY id2 > > > > ) t2 > > > > ON (t1.id1 = t2.id2) > > > > LEFT JOIN table3 t3 > > > > ON (COALESCE(t2.attr, t1.attr) = t3.attr)""") > > > > streamTableEnv.createTemporaryView("view", view) > > > > valstatementSet= streamTableEnv.createStatementSet() > > > > statementSet.addInsertSql(""" > > > > INSERT INTO sink SELECT * FROM view > > > > """) > > > > statementSet.execute().await() > > > > } > > > > } > > > > >