Timo,

Here is the plan (hopefully I properly cleansed it of company proprietary info 
without garbling it)

Dylan

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink], fields=[id, attr, 
attr_mapped])
+- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS NOT 
NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT NULL($3), $3, 
$1)])
   +- LogicalJoin(condition=[=($4, $5)], joinType=[left])
      :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3], $f4=[CASE(IS 
NOT NULL($3), $3, $1)])
      :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
      :     :- LogicalTableScan(table=[[default_catalog, default_database, 
table1]])
      :     +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
      :        +- LogicalProject(id2=[$1], attr=[$0])
      :           +- LogicalTableScan(table=[[default_catalog, 
default_database, table2]])
      +- LogicalTableScan(table=[[default_catalog, default_database, table3]])

== Optimized Logical Plan ==
Sink(table=[default_catalog.default_database.sink], fields=[id, attr, 
attr_mapped], changelogMode=[NONE])
+- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT 
NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped), attr_mapped, 
IS NOT NULL(attr0), attr0, attr) AS attr_mapped], changelogMode=[I,UB,UA,D])
   +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)], select=[id1, attr, 
id2, attr0, $f4, attr, attr_mapped], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
      :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
      :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT NULL(attr0), attr0, 
attr) AS $f4], changelogMode=[I,UB,UA,D])
      :     +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)], select=[id1, 
attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
      :        :- Exchange(distribution=[hash[id1]], changelogMode=[I])
      :        :  +- TableSourceScan(table=[[default_catalog, default_database, 
table1]], fields=[id1, attr], changelogMode=[I])
      :        +- Exchange(distribution=[hash[id2]], changelogMode=[I,UB,UA])
      :           +- GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) AS 
attr], changelogMode=[I,UB,UA])
      :              +- Exchange(distribution=[hash[id2]], changelogMode=[I])
      :                 +- TableSourceScan(table=[[default_catalog, 
default_database, table2]], fields=[attr, id2], changelogMode=[I])
      +- Exchange(distribution=[hash[attr]], changelogMode=[I])
         +- TableSourceScan(table=[[default_catalog, default_database, 
table3]], fields=[attr, attr_mapped], changelogMode=[I])

== Physical Execution Plan ==
Stage 1 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table1]], fields=[id1, attr])

Stage 3 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table2]], fields=[attr, id2])

        Stage 5 : Attr
                content : GroupAggregate(groupBy=[id2], select=[id2, MAX(attr) 
AS attr])
                ship_strategy : HASH

                Stage 7 : Attr
                        content : Join(joinType=[FullOuterJoin], where=[(id1 = 
id2)], select=[id1, attr, id2, attr0], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
                        ship_strategy : HASH

                        Stage 8 : Attr
                                content : Calc(select=[id1, attr, id2, attr0, 
(attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4])
                                ship_strategy : FORWARD

Stage 10 : Data Source
        content : Source: TableSourceScan(table=[[default_catalog, 
default_database, table3]], fields=[attr, attr_mapped])

        Stage 12 : Attr
                content : Join(joinType=[LeftOuterJoin], where=[($f4 = attr)], 
select=[id1, attr, id2, attr0, $f4, attr, attr_mapped], 
leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
                ship_strategy : HASH

                Stage 13 : Attr
                        content : Calc(select=[(id1 IS NOT NULL CASE id1 CASE 
id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr, (attr_mapped IS 
NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0 CASE attr) AS 
attr_mapped])
                        ship_strategy : FORWARD

                        Stage 14 : Data Sink
                                content : Sink: 
Sink(table=[default_catalog.default_database.sink], fields=[id, attr, 
attr_mapped])
                                ship_strategy : FORWARD

On 4/14/21, 10:08 AM, "Timo Walther" <twal...@apache.org> wrote:

    Can you share the resulting plan with us? Ideally with the ChangelogMode 
    detail enabled as well.

    statementSet.explain(...)

    Maybe this could help.

    Regards,
    Timo



    On 14.04.21 16:47, Dylan Forciea wrote:
    > Piotrek,
    > 
    > I am looking at the count of records present in the sink table in 
    > Postgres after the entire job completes, not the number of 
    > inserts/retracts. I can see as the job runs that records are added and 
    > removed from the “sink” table. With parallelism set to 1, it always 
    > comes out to the same number (which is consistent with the number of ids 
    > in the source tables “table1” and “table2”), at about 491k records in 
    > table “sink” when the job is complete. With the parallelism set to 16, 
    > the “sink” table will have somewhere around 360k records +/- 20k when 
    > the job is complete. I truncate the “sink” table before I run the job, 
    > and this is a test environment where the source databases are static.
    > 
    > I removed my line for setting to Batch mode per Timo’s suggestion, and 
    > am still running with MAX which should have deterministic output.
    > 
    > Dylan
    > 
    > *From: *Piotr Nowojski <pnowoj...@apache.org>
    > *Date: *Wednesday, April 14, 2021 at 9:38 AM
    > *To: *Dylan Forciea <dy...@oseberg.io>
    > *Cc: *"user@flink.apache.org" <user@flink.apache.org>
    > *Subject: *Re: Nondeterministic results with SQL job when parallelism is 
> 1
    > 
    > Hi Dylan,
    > 
    > But if you are running your query in Streaming mode, aren't you counting 
    > retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, 
    > when the first record comes in it will be immediately emitted with NULLs 
    > (not matched, as the other table is empty). Later if a matching record 
    > is received from the second table, the previous result will be retracted 
    > and the new one, updated, will be re-emitted. Maybe this is what you are 
    > observing in the varying output?
    > 
    > Maybe you could try to analyse how the results differ between different 
    > runs?
    > 
    > Best,
    > 
    > Piotrek
    > 
    > śr., 14 kwi 2021 o 16:22 Dylan Forciea <dy...@oseberg.io 
    > <mailto:dy...@oseberg.io>> napisał(a):
    > 
    >     I replaced the FIRST_VALUE with MAX to ensure that the results
    >     should be identical even in their content, and my problem still
    >     remains – I end up with a nondeterministic count of records being
    >     emitted into the sink when the parallelism is over 1, and that count
    >     is about 20-25% short (and not consistent) of what comes out
    >     consistently when parallelism is set to 1.
    > 
    >     Dylan
    > 
    >     *From: *Dylan Forciea <dy...@oseberg.io <mailto:dy...@oseberg.io>>
    >     *Date: *Wednesday, April 14, 2021 at 9:08 AM
    >     *To: *Piotr Nowojski <pnowoj...@apache.org
    >     <mailto:pnowoj...@apache.org>>
    >     *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>"
    >     <user@flink.apache.org <mailto:user@flink.apache.org>>
    >     *Subject: *Re: Nondeterministic results with SQL job when
    >     parallelism is > 1
    > 
    >     Pitorek,
    > 
    >     I was actually originally using a group function that WAS
    >     deterministic (but was a custom UDF I made), but chose something
    >     here built in. By non-deterministic, I mean that the number of
    >     records coming out is not consistent. Since the FIRST_VALUE here is
    >     on an attribute that is not part of the key, that shouldn’t affect
    >     the number of records coming out I wouldn’t think.
    > 
    >     Dylan
    > 
    >     *From: *Piotr Nowojski <pnowoj...@apache.org
    >     <mailto:pnowoj...@apache.org>>
    >     *Date: *Wednesday, April 14, 2021 at 9:06 AM
    >     *To: *Dylan Forciea <dy...@oseberg.io <mailto:dy...@oseberg.io>>
    >     *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>"
    >     <user@flink.apache.org <mailto:user@flink.apache.org>>
    >     *Subject: *Re: Nondeterministic results with SQL job when
    >     parallelism is > 1
    > 
    >     Hi,
    > 
    >     Yes, it looks like your query is non deterministic because of
    >     `FIRST_VALUE` used inside `GROUP BY`. If you have many different
    >     parallel sources, each time you run your query your first value
    >     might be different. If that's the case, you could try to confirm it
    >     with even smaller query:
    > 
    >             SELECT
    >                id2,
    >                FIRST_VALUE(attr) AS attr
    >              FROM table2
    >              GROUP BY id2
    > 
    >     Best,
    > 
    >     Piotrek
    > 
    >     śr., 14 kwi 2021 o 14:45 Dylan Forciea <dy...@oseberg.io
    >     <mailto:dy...@oseberg.io>> napisał(a):
    > 
    >         I am running Flink 1.12.2, and I was trying to up the
    >         parallelism of my Flink SQL job to see what happened. However,
    >         once I did that, my results became nondeterministic. This
    >         happens whether I set the
    >         table.exec.resource.default-parallelism config option or I set
    >         the default local parallelism to something higher than 1. I
    >         would end up with less records in the end, and each time I ran
    >         the output record count would come out differently.
    > 
    >         I managed to distill an example, as pasted below (with attribute
    >         names changed to protect company proprietary info), that causes
    >         the issue. I feel like I managed to get it to happen with a LEFT
    >         JOIN rather than a FULL JOIN, but the distilled version wasn’t
    >         giving me wrong results with that. Maybe it has to do with
    >         joining to a table that was formed using a GROUP BY? Can
    >         somebody tell if I’m doing something that is known not to work,
    >         or if I have run across a bug?
    > 
    >         Regards,
    > 
    >         Dylan Forciea
    > 
    >         objectJob{
    > 
    >         defmain(args: Array[String]): Unit= {
    > 
    >         StreamExecutionEnvironment.setDefaultLocalParallelism(1)
    > 
    >         valsettings=
    >         
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    > 
    >         valstreamEnv= StreamExecutionEnvironment.getExecutionEnvironment
    > 
    >         valstreamTableEnv= StreamTableEnvironment.create(streamEnv,
    >         settings)
    > 
    >         valconfiguration= streamTableEnv.getConfig().getConfiguration()
    > 
    >             
    >         
configuration.setInteger("table.exec.resource.default-parallelism",
    >         16)
    > 
    >              streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
    > 
    >              streamTableEnv.executeSql(
    > 
    >         """
    > 
    >                CREATE TABLE table1 (
    > 
    >                  id1 STRING PRIMARY KEY NOT ENFORCED,
    > 
    >                  attr STRING
    > 
    >                ) WITH (
    > 
    >                  'connector' = 'jdbc',
    > 
    >                  'url' = 'jdbc:postgresql://…',
    > 
    >                  'table-name' = 'table1’,
    > 
    >                  'username' = 'username',
    > 
    >                  'password' = 'password',
    > 
    >                  'scan.fetch-size' = '500',
    > 
    >                  'scan.auto-commit' = 'false'
    > 
    >                )""")
    > 
    >              streamTableEnv.executeSql(
    > 
    >         """
    > 
    >                CREATE TABLE table2 (
    > 
    >                  attr STRING,
    > 
    >                  id2 STRING
    > 
    >                ) WITH (
    > 
    >                  'connector' = 'jdbc',
    > 
    >                  'url' = 'jdbc:postgresql://…',
    > 
    >                  'table-name' = 'table2',
    > 
    >                  'username' = 'username',
    > 
    >                  'password' = 'password',
    > 
    >                  'scan.fetch-size' = '500',
    > 
    >                  'scan.auto-commit' = 'false'
    > 
    >                )""")
    > 
    >              streamTableEnv.executeSql(
    > 
    >         """
    > 
    >                CREATE TABLE table3 (
    > 
    >                  attr STRING PRIMARY KEY NOT ENFORCED,
    > 
    >                  attr_mapped STRING
    > 
    >                ) WITH (
    > 
    >                  'connector' = 'jdbc',
    > 
    >                  'url' = 'jdbc:postgresql://…',
    > 
    >                  'table-name' = ‘table3',
    > 
    >                  'username' = ‘username',
    > 
    >                  'password' = 'password',
    > 
    >                  'scan.fetch-size' = '500',
    > 
    >                  'scan.auto-commit' = 'false'
    > 
    >                )""")
    > 
    >              streamTableEnv.executeSql("""
    > 
    >                CREATE TABLE sink (
    > 
    >                  id STRING PRIMARY KEY NOT ENFORCED,
    > 
    >                  attr STRING,
    > 
    >                  attr_mapped STRING
    > 
    >                ) WITH (
    > 
    >                  'connector' = 'jdbc',
    > 
    >                  'url' = 'jdbc:postgresql://…,
    > 
    >                  'table-name' = 'sink',
    > 
    >                  'username' = 'username',
    > 
    >                  'password' = 'password',
    > 
    >                  'scan.fetch-size' = '500',
    > 
    >                  'scan.auto-commit' = 'false'
    > 
    >                )""")
    > 
    >         valview=
    > 
    >                streamTableEnv.sqlQuery("""
    > 
    >                SELECT
    > 
    >                  COALESCE(t1.id1, t2.id2) AS id,
    > 
    >                  COALESCE(t2.attr, t1.attr) AS attr,
    > 
    >                  COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
    > 
    >                FROM table1 t1
    > 
    >                FULL JOIN (
    > 
    >                  SELECT
    > 
    >                    id2,
    > 
    >                    FIRST_VALUE(attr) AS attr
    > 
    >                  FROM table2
    > 
    >                  GROUP BY id2
    > 
    >                ) t2
    > 
    >                 ON (t1.id1 = t2.id2)
    > 
    >                LEFT JOIN table3 t3
    > 
    >                  ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
    > 
    >              streamTableEnv.createTemporaryView("view", view)
    > 
    >         valstatementSet= streamTableEnv.createStatementSet()
    > 
    >              statementSet.addInsertSql("""
    > 
    >                INSERT INTO sink SELECT * FROM view
    > 
    >              """)
    > 
    >              statementSet.execute().await()
    > 
    >            }
    > 
    >         }
    > 


Reply via email to