Timo,
Here is the plan (hopefully I properly cleansed it of company proprietary info
without garbling it)
Dylan
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink], fields=[id, attr,
attr_mapped])
+- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT
NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3,
$1)])
+- LogicalJoin(condition=[=($4, $5)], joinType=[left])
:- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], $f4=[CASE(IS
NOT NULL($3), $3, $1)])
: +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
: :- LogicalTableScan(table=[[default_catalog, default_database,
table1]])
: +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
: +- LogicalProject(id2=[$1], attr=[$0])
: +- LogicalTableScan(table=[[default_catalog,
default_database, table2]])
+- LogicalTableScan(table=[[default_catalog, default_database, table3]])
== Optimized Logical Plan ==
Sink(table=[default_catalog.default_database.sink], fields=[id, attr,
attr_mapped], changelogMode=[NONE])
+- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT
NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped,
IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D])
+- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, attr,
id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
:- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
: +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), attr0,
attr) AS $f4], changelogMode=[I,UB,UA,D])
: +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], select=[id1,
attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
: :- Exchange(distribution=[hash[id1]], changelogMode=[I])
: : +- TableSourceScan(table=[[default_catalog, default_database,
table1]], fields=[id1, attr], changelogMode=[I])
: +- Exchange(distribution=[hash[id2]], changelogMode=[I,UB,UA])
: +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS
attr], changelogMode=[I,UB,UA])
: +- Exchange(distribution=[hash[id2]], changelogMode=[I])
: +- TableSourceScan(table=[[default_catalog,
default_database, table2]], fields=[attr, id2], changelogMode=[I])
+- Exchange(distribution=[hash[attr]], changelogMode=[I])
+- TableSourceScan(table=[[default_catalog, default_database,
table3]], fields=[attr, attr_mapped], changelogMode=[I])
== Physical Execution Plan ==
Stage 1 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, table1]], fields=[id1, attr])
Stage 3 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, table2]], fields=[attr, id2])
Stage 5 : Attr
content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr)
AS attr])
ship_strategy : HASH
Stage 7 : Attr
content : Join(joinType=[FullOuterJoin], where=[(id1 =
id2)], select=[id1, attr, id2, attr0],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])
ship_strategy : HASH
Stage 8 : Attr
content : Calc(select=[id1, attr, id2, attr0,
(attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4])
ship_strategy : FORWARD
Stage 10 : Data Source
content : Source: TableSourceScan(table=[[default_catalog,
default_database, table3]], fields=[attr, attr_mapped])
Stage 12 : Attr
content : Join(joinType=[LeftOuterJoin], where=[($f4 = attr)],
select=[id1, attr, id2, attr0, $f4, attr, attr_mapped],
leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
ship_strategy : HASH
Stage 13 : Attr
content : Calc(select=[(id1 IS NOT NULL CASE id1 CASE
id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, (attr_mapped IS
NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 CASE attr) AS
attr_mapped])
ship_strategy : FORWARD
Stage 14 : Data Sink
content : Sink:
Sink(table=[default_catalog.default_database.sink], fields=[id, attr,
attr_mapped])
ship_strategy : FORWARD
On 4/14/21, 10:08 AM, "Timo Walther" <[email protected]> wrote:
Can you share the resulting plan with us? Ideally with the ChangelogMode
detail enabled as well.
statementSet.explain(...)
Maybe this could help.
Regards,
Timo
On 14.04.21 16:47, Dylan Forciea wrote:
> Piotrek,
>
> I am looking at the count of records present in the sink table in
> Postgres after the entire job completes, not the number of
> inserts/retracts. I can see as the job runs that records are added and
> removed from the “sink” table. With parallelism set to 1, it always
> comes out to the same number (which is consistent with the number of ids
> in the source tables “table1” and “table2”), at about 491k records in
> table “sink” when the job is complete. With the parallelism set to 16,
> the “sink” table will have somewhere around 360k records +/- 20k when
> the job is complete. I truncate the “sink” table before I run the job,
> and this is a test environment where the source databases are static.
>
> I removed my line for setting to Batch mode per Timo’s suggestion, and
> am still running with MAX which should have deterministic output.
>
> Dylan
>
> *From: *Piotr Nowojski <[email protected]>
> *Date: *Wednesday, April 14, 2021 at 9:38 AM
> *To: *Dylan Forciea <[email protected]>
> *Cc: *"[email protected]" <[email protected]>
> *Subject: *Re: Nondeterministic results with SQL job when parallelism is
> 1
>
> Hi Dylan,
>
> But if you are running your query in Streaming mode, aren't you counting
> retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN,
> when the first record comes in it will be immediately emitted with NULLs
> (not matched, as the other table is empty). Later if a matching record
> is received from the second table, the previous result will be retracted
> and the new one, updated, will be re-emitted. Maybe this is what you are
> observing in the varying output?
>
> Maybe you could try to analyse how the results differ between different
> runs?
>
> Best,
>
> Piotrek
>
> śr., 14 kwi 2021 o 16:22 Dylan Forciea <[email protected]
> <mailto:[email protected]>> napisał(a):
>
> I replaced the FIRST_VALUE with MAX to ensure that the results
> should be identical even in their content, and my problem still
> remains – I end up with a nondeterministic count of records being
> emitted into the sink when the parallelism is over 1, and that count
> is about 20-25% short (and not consistent) of what comes out
> consistently when parallelism is set to 1.
>
> Dylan
>
> *From: *Dylan Forciea <[email protected] <mailto:[email protected]>>
> *Date: *Wednesday, April 14, 2021 at 9:08 AM
> *To: *Piotr Nowojski <[email protected]
> <mailto:[email protected]>>
> *Cc: *"[email protected] <mailto:[email protected]>"
> <[email protected] <mailto:[email protected]>>
> *Subject: *Re: Nondeterministic results with SQL job when
> parallelism is > 1
>
> Pitorek,
>
> I was actually originally using a group function that WAS
> deterministic (but was a custom UDF I made), but chose something
> here built in. By non-deterministic, I mean that the number of
> records coming out is not consistent. Since the FIRST_VALUE here is
> on an attribute that is not part of the key, that shouldn’t affect
> the number of records coming out I wouldn’t think.
>
> Dylan
>
> *From: *Piotr Nowojski <[email protected]
> <mailto:[email protected]>>
> *Date: *Wednesday, April 14, 2021 at 9:06 AM
> *To: *Dylan Forciea <[email protected] <mailto:[email protected]>>
> *Cc: *"[email protected] <mailto:[email protected]>"
> <[email protected] <mailto:[email protected]>>
> *Subject: *Re: Nondeterministic results with SQL job when
> parallelism is > 1
>
> Hi,
>
> Yes, it looks like your query is non deterministic because of
> `FIRST_VALUE` used inside `GROUP BY`. If you have many different
> parallel sources, each time you run your query your first value
> might be different. If that's the case, you could try to confirm it
> with even smaller query:
>
> SELECT
> id2,
> FIRST_VALUE(attr) AS attr
> FROM table2
> GROUP BY id2
>
> Best,
>
> Piotrek
>
> śr., 14 kwi 2021 o 14:45 Dylan Forciea <[email protected]
> <mailto:[email protected]>> napisał(a):
>
> I am running Flink 1.12.2, and I was trying to up the
> parallelism of my Flink SQL job to see what happened. However,
> once I did that, my results became nondeterministic. This
> happens whether I set the
> table.exec.resource.default-parallelism config option or I set
> the default local parallelism to something higher than 1. I
> would end up with less records in the end, and each time I ran
> the output record count would come out differently.
>
> I managed to distill an example, as pasted below (with attribute
> names changed to protect company proprietary info), that causes
> the issue. I feel like I managed to get it to happen with a LEFT
> JOIN rather than a FULL JOIN, but the distilled version wasn’t
> giving me wrong results with that. Maybe it has to do with
> joining to a table that was formed using a GROUP BY? Can
> somebody tell if I’m doing something that is known not to work,
> or if I have run across a bug?
>
> Regards,
>
> Dylan Forciea
>
> objectJob{
>
> defmain(args: Array[String]): Unit= {
>
> StreamExecutionEnvironment.setDefaultLocalParallelism(1)
>
> valsettings=
>
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>
> valstreamEnv= StreamExecutionEnvironment.getExecutionEnvironment
>
> valstreamTableEnv= StreamTableEnvironment.create(streamEnv,
> settings)
>
> valconfiguration= streamTableEnv.getConfig().getConfiguration()
>
>
>
configuration.setInteger("table.exec.resource.default-parallelism",
> 16)
>
> streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
> streamTableEnv.executeSql(
>
> """
>
> CREATE TABLE table1 (
>
> id1 STRING PRIMARY KEY NOT ENFORCED,
>
> attr STRING
>
> ) WITH (
>
> 'connector' = 'jdbc',
>
> 'url' = 'jdbc:postgresql://…',
>
> 'table-name' = 'table1’,
>
> 'username' = 'username',
>
> 'password' = 'password',
>
> 'scan.fetch-size' = '500',
>
> 'scan.auto-commit' = 'false'
>
> )""")
>
> streamTableEnv.executeSql(
>
> """
>
> CREATE TABLE table2 (
>
> attr STRING,
>
> id2 STRING
>
> ) WITH (
>
> 'connector' = 'jdbc',
>
> 'url' = 'jdbc:postgresql://…',
>
> 'table-name' = 'table2',
>
> 'username' = 'username',
>
> 'password' = 'password',
>
> 'scan.fetch-size' = '500',
>
> 'scan.auto-commit' = 'false'
>
> )""")
>
> streamTableEnv.executeSql(
>
> """
>
> CREATE TABLE table3 (
>
> attr STRING PRIMARY KEY NOT ENFORCED,
>
> attr_mapped STRING
>
> ) WITH (
>
> 'connector' = 'jdbc',
>
> 'url' = 'jdbc:postgresql://…',
>
> 'table-name' = ‘table3',
>
> 'username' = ‘username',
>
> 'password' = 'password',
>
> 'scan.fetch-size' = '500',
>
> 'scan.auto-commit' = 'false'
>
> )""")
>
> streamTableEnv.executeSql("""
>
> CREATE TABLE sink (
>
> id STRING PRIMARY KEY NOT ENFORCED,
>
> attr STRING,
>
> attr_mapped STRING
>
> ) WITH (
>
> 'connector' = 'jdbc',
>
> 'url' = 'jdbc:postgresql://…,
>
> 'table-name' = 'sink',
>
> 'username' = 'username',
>
> 'password' = 'password',
>
> 'scan.fetch-size' = '500',
>
> 'scan.auto-commit' = 'false'
>
> )""")
>
> valview=
>
> streamTableEnv.sqlQuery("""
>
> SELECT
>
> COALESCE(t1.id1, t2.id2) AS id,
>
> COALESCE(t2.attr, t1.attr) AS attr,
>
> COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
>
> FROM table1 t1
>
> FULL JOIN (
>
> SELECT
>
> id2,
>
> FIRST_VALUE(attr) AS attr
>
> FROM table2
>
> GROUP BY id2
>
> ) t2
>
> ON (t1.id1 = t2.id2)
>
> LEFT JOIN table3 t3
>
> ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
>
> streamTableEnv.createTemporaryView("view", view)
>
> valstatementSet= streamTableEnv.createStatementSet()
>
> statementSet.addInsertSql("""
>
> INSERT INTO sink SELECT * FROM view
>
> """)
>
> statementSet.execute().await()
>
> }
>
> }
>