Hi Dylan,

The primary key ordering problem I mean above is about changelog. Batch
queries only emit a final result, and thus don't have changelog, so it's
safe to use batch mode.

The problem only exists in streaming mode with more than 1 parallelism.

Best,
Jark

On Fri, 16 Apr 2021 at 21:40, Dylan Forciea <dy...@oseberg.io> wrote:

> Jark,
>
>
>
> Thanks for the heads up! I didn’t see this behavior when running in batch
> mode with parallelism turned on. Is it safe to do this kind of join in
> batch mode right now, or am I just getting lucky?
>
>
>
> Dylan
>
>
>
> *From: *Jark Wu <imj...@gmail.com>
> *Date: *Friday, April 16, 2021 at 5:10 AM
> *To: *Dylan Forciea <dy...@oseberg.io>
> *Cc: *Timo Walther <twal...@apache.org>, Piotr Nowojski <
> pnowoj...@apache.org>, "user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Nondeterministic results with SQL job when parallelism is
> > 1
>
>
>
> HI Dylan,
>
>
>
> I think this has the same reason as
> https://issues.apache.org/jira/browse/FLINK-20374.
>
> The root cause is that changelogs are shuffled by `attr` at second join,
>
> and thus records with the same `id` will be shuffled to different join
> tasks (also different sink tasks).
>
> So the data arrived at sinks are not ordered on the sink primary key.
>
>
>
> We may need something like primary key ordering mechanism in the whole
> planner to fix this.
>
>
>
> Best,
>
> Jark
>
>
>
> On Thu, 15 Apr 2021 at 01:33, Dylan Forciea <dy...@oseberg.io> wrote:
>
> On a side note - I changed to use the batch mode per your suggestion Timo,
> and my job ran much faster and with deterministic counts with parallelism
> turned on. So I'll probably utilize that for now. However, it would still
> be nice to dig down into why streaming isn't working in case I need that in
> the future.
>
> Dylan
>
> On 4/14/21, 10:27 AM, "Dylan Forciea" <dy...@oseberg.io> wrote:
>
>     Timo,
>
>     Here is the plan (hopefully I properly cleansed it of company
> proprietary info without garbling it)
>
>     Dylan
>
>     == Abstract Syntax Tree ==
>     LogicalSink(table=[default_catalog.default_database.sink], fields=[id,
> attr, attr_mapped])
>     +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE(IS
> NOT NULL($3), $3, $1)], attr_mapped=[CASE(IS NOT NULL($6), $6, IS NOT
> NULL($3), $3, $1)])
>        +- LogicalJoin(condition=[=($4, $5)], joinType=[left])
>           :- LogicalProject(id1=[$0], attr=[$1], id2=[$2], attr0=[$3],
> $f4=[CASE(IS NOT NULL($3), $3, $1)])
>           :  +- LogicalJoin(condition=[=($0, $2)], joinType=[full])
>           :     :- LogicalTableScan(table=[[default_catalog,
> default_database, table1]])
>           :     +- LogicalAggregate(group=[{0}], attr=[MAX($1)])
>           :        +- LogicalProject(id2=[$1], attr=[$0])
>           :           +- LogicalTableScan(table=[[default_catalog,
> default_database, table2]])
>           +- LogicalTableScan(table=[[default_catalog, default_database,
> table3]])
>
>     == Optimized Logical Plan ==
>     Sink(table=[default_catalog.default_database.sink], fields=[id, attr,
> attr_mapped], changelogMode=[NONE])
>     +- Calc(select=[CASE(IS NOT NULL(id1), id1, id2) AS id, CASE(IS NOT
> NULL(attr0), attr0, attr) AS attr, CASE(IS NOT NULL(attr_mapped),
> attr_mapped, IS NOT NULL(attr0), attr0, attr) AS attr_mapped],
> changelogMode=[I,UB,UA,D])
>        +- Join(joinType=[LeftOuterJoin], where=[=($f4, attr)],
> select=[id1, attr, id2, attr0, $f4, attr, attr_mapped],
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey],
> changelogMode=[I,UB,UA,D])
>           :- Exchange(distribution=[hash[$f4]], changelogMode=[I,UB,UA,D])
>           :  +- Calc(select=[id1, attr, id2, attr0, CASE(IS NOT
> NULL(attr0), attr0, attr) AS $f4], changelogMode=[I,UB,UA,D])
>           :     +- Join(joinType=[FullOuterJoin], where=[=(id1, id2)],
> select=[id1, attr, id2, attr0], leftInputSpec=[JoinKeyContainsUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
>           :        :- Exchange(distribution=[hash[id1]], changelogMode=[I])
>           :        :  +- TableSourceScan(table=[[default_catalog,
> default_database, table1]], fields=[id1, attr], changelogMode=[I])
>           :        +- Exchange(distribution=[hash[id2]],
> changelogMode=[I,UB,UA])
>           :           +- GroupAggregate(groupBy=[id2], select=[id2,
> MAX(attr) AS attr], changelogMode=[I,UB,UA])
>           :              +- Exchange(distribution=[hash[id2]],
> changelogMode=[I])
>           :                 +- TableSourceScan(table=[[default_catalog,
> default_database, table2]], fields=[attr, id2], changelogMode=[I])
>           +- Exchange(distribution=[hash[attr]], changelogMode=[I])
>              +- TableSourceScan(table=[[default_catalog, default_database,
> table3]], fields=[attr, attr_mapped], changelogMode=[I])
>
>     == Physical Execution Plan ==
>     Stage 1 : Data Source
>         content : Source: TableSourceScan(table=[[default_catalog,
> default_database, table1]], fields=[id1, attr])
>
>     Stage 3 : Data Source
>         content : Source: TableSourceScan(table=[[default_catalog,
> default_database, table2]], fields=[attr, id2])
>
>         Stage 5 : Attr
>                 content : GroupAggregate(groupBy=[id2], select=[id2,
> MAX(attr) AS attr])
>                 ship_strategy : HASH
>
>                 Stage 7 : Attr
>                         content : Join(joinType=[FullOuterJoin],
> where=[(id1 = id2)], select=[id1, attr, id2, attr0],
> leftInputSpec=[JoinKeyContainsUniqueKey],
> rightInputSpec=[JoinKeyContainsUniqueKey])
>                         ship_strategy : HASH
>
>                         Stage 8 : Attr
>                                 content : Calc(select=[id1, attr, id2,
> attr0, (attr0 IS NOT NULL CASE attr0 CASE attr) AS $f4])
>                                 ship_strategy : FORWARD
>
>     Stage 10 : Data Source
>         content : Source: TableSourceScan(table=[[default_catalog,
> default_database, table3]], fields=[attr, attr_mapped])
>
>         Stage 12 : Attr
>                 content : Join(joinType=[LeftOuterJoin], where=[($f4 =
> attr)], select=[id1, attr, id2, attr0, $f4, attr, attr_mapped],
> leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
>                 ship_strategy : HASH
>
>                 Stage 13 : Attr
>                         content : Calc(select=[(id1 IS NOT NULL CASE id1
> CASE id2) AS id, (attr0 IS NOT NULL CASE attr0 CASE attr) AS attr,
> (attr_mapped IS NOT NULL CASE attr_mapped CASE attr0 IS NOT NULL CASE attr0
> CASE attr) AS attr_mapped])
>                         ship_strategy : FORWARD
>
>                         Stage 14 : Data Sink
>                                 content : Sink:
> Sink(table=[default_catalog.default_database.sink], fields=[id, attr,
> attr_mapped])
>                                 ship_strategy : FORWARD
>
>     On 4/14/21, 10:08 AM, "Timo Walther" <twal...@apache.org> wrote:
>
>         Can you share the resulting plan with us? Ideally with the
> ChangelogMode
>         detail enabled as well.
>
>         statementSet.explain(...)
>
>         Maybe this could help.
>
>         Regards,
>         Timo
>
>
>
>         On 14.04.21 16:47, Dylan Forciea wrote:
>         > Piotrek,
>         >
>         > I am looking at the count of records present in the sink table
> in
>         > Postgres after the entire job completes, not the number of
>         > inserts/retracts. I can see as the job runs that records are
> added and
>         > removed from the “sink” table. With parallelism set to 1, it
> always
>         > comes out to the same number (which is consistent with the
> number of ids
>         > in the source tables “table1” and “table2”), at about 491k
> records in
>         > table “sink” when the job is complete. With the parallelism set
> to 16,
>         > the “sink” table will have somewhere around 360k records +/- 20k
> when
>         > the job is complete. I truncate the “sink” table before I run
> the job,
>         > and this is a test environment where the source databases are
> static.
>         >
>         > I removed my line for setting to Batch mode per Timo’s
> suggestion, and
>         > am still running with MAX which should have deterministic output.
>         >
>         > Dylan
>         >
>         > *From: *Piotr Nowojski <pnowoj...@apache.org>
>         > *Date: *Wednesday, April 14, 2021 at 9:38 AM
>         > *To: *Dylan Forciea <dy...@oseberg.io>
>         > *Cc: *"user@flink.apache.org" <user@flink.apache.org>
>         > *Subject: *Re: Nondeterministic results with SQL job when
> parallelism is > 1
>         >
>         > Hi Dylan,
>         >
>         > But if you are running your query in Streaming mode, aren't you
> counting
>         > retractions from the FULL JOIN? AFAIK in Streaming mode in FULL
> JOIN,
>         > when the first record comes in it will be immediately emitted
> with NULLs
>         > (not matched, as the other table is empty). Later if a matching
> record
>         > is received from the second table, the previous result will be
> retracted
>         > and the new one, updated, will be re-emitted. Maybe this is what
> you are
>         > observing in the varying output?
>         >
>         > Maybe you could try to analyse how the results differ between
> different
>         > runs?
>         >
>         > Best,
>         >
>         > Piotrek
>         >
>         > śr., 14 kwi 2021 o 16:22 Dylan Forciea <dy...@oseberg.io
>         > <mailto:dy...@oseberg.io>> napisał(a):
>         >
>         >     I replaced the FIRST_VALUE with MAX to ensure that the
> results
>         >     should be identical even in their content, and my problem
> still
>         >     remains – I end up with a nondeterministic count of records
> being
>         >     emitted into the sink when the parallelism is over 1, and
> that count
>         >     is about 20-25% short (and not consistent) of what comes out
>         >     consistently when parallelism is set to 1.
>         >
>         >     Dylan
>         >
>         >     *From: *Dylan Forciea <dy...@oseberg.io <mailto:
> dy...@oseberg.io>>
>         >     *Date: *Wednesday, April 14, 2021 at 9:08 AM
>         >     *To: *Piotr Nowojski <pnowoj...@apache.org
>         >     <mailto:pnowoj...@apache.org>>
>         >     *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>"
>         >     <user@flink.apache.org <mailto:user@flink.apache.org>>
>         >     *Subject: *Re: Nondeterministic results with SQL job when
>         >     parallelism is > 1
>         >
>         >     Pitorek,
>         >
>         >     I was actually originally using a group function that WAS
>         >     deterministic (but was a custom UDF I made), but chose
> something
>         >     here built in. By non-deterministic, I mean that the number
> of
>         >     records coming out is not consistent. Since the FIRST_VALUE
> here is
>         >     on an attribute that is not part of the key, that shouldn’t
> affect
>         >     the number of records coming out I wouldn’t think.
>         >
>         >     Dylan
>         >
>         >     *From: *Piotr Nowojski <pnowoj...@apache.org
>         >     <mailto:pnowoj...@apache.org>>
>         >     *Date: *Wednesday, April 14, 2021 at 9:06 AM
>         >     *To: *Dylan Forciea <dy...@oseberg.io <mailto:
> dy...@oseberg.io>>
>         >     *Cc: *"user@flink.apache.org <mailto:user@flink.apache.org>"
>         >     <user@flink.apache.org <mailto:user@flink.apache.org>>
>         >     *Subject: *Re: Nondeterministic results with SQL job when
>         >     parallelism is > 1
>         >
>         >     Hi,
>         >
>         >     Yes, it looks like your query is non deterministic because of
>         >     `FIRST_VALUE` used inside `GROUP BY`. If you have many
> different
>         >     parallel sources, each time you run your query your first
> value
>         >     might be different. If that's the case, you could try to
> confirm it
>         >     with even smaller query:
>         >
>         >             SELECT
>         >                id2,
>         >                FIRST_VALUE(attr) AS attr
>         >              FROM table2
>         >              GROUP BY id2
>         >
>         >     Best,
>         >
>         >     Piotrek
>         >
>         >     śr., 14 kwi 2021 o 14:45 Dylan Forciea <dy...@oseberg.io
>         >     <mailto:dy...@oseberg.io>> napisał(a):
>         >
>         >         I am running Flink 1.12.2, and I was trying to up the
>         >         parallelism of my Flink SQL job to see what happened.
> However,
>         >         once I did that, my results became nondeterministic. This
>         >         happens whether I set the
>         >         table.exec.resource.default-parallelism config option or
> I set
>         >         the default local parallelism to something higher than
> 1. I
>         >         would end up with less records in the end, and each time
> I ran
>         >         the output record count would come out differently.
>         >
>         >         I managed to distill an example, as pasted below (with
> attribute
>         >         names changed to protect company proprietary info), that
> causes
>         >         the issue. I feel like I managed to get it to happen
> with a LEFT
>         >         JOIN rather than a FULL JOIN, but the distilled version
> wasn’t
>         >         giving me wrong results with that. Maybe it has to do
> with
>         >         joining to a table that was formed using a GROUP BY? Can
>         >         somebody tell if I’m doing something that is known not
> to work,
>         >         or if I have run across a bug?
>         >
>         >         Regards,
>         >
>         >         Dylan Forciea
>         >
>         >         objectJob{
>         >
>         >         defmain(args: Array[String]): Unit= {
>         >
>         >         StreamExecutionEnvironment.setDefaultLocalParallelism(1)
>         >
>         >         valsettings=
>         >
>  EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>         >
>         >         valstreamEnv=
> StreamExecutionEnvironment.getExecutionEnvironment
>         >
>         >         valstreamTableEnv=
> StreamTableEnvironment.create(streamEnv,
>         >         settings)
>         >
>         >         valconfiguration=
> streamTableEnv.getConfig().getConfiguration()
>         >
>         >
>         >
>  configuration.setInteger("table.exec.resource.default-parallelism",
>         >         16)
>         >
>         >
> streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
>         >
>         >              streamTableEnv.executeSql(
>         >
>         >         """
>         >
>         >                CREATE TABLE table1 (
>         >
>         >                  id1 STRING PRIMARY KEY NOT ENFORCED,
>         >
>         >                  attr STRING
>         >
>         >                ) WITH (
>         >
>         >                  'connector' = 'jdbc',
>         >
>         >                  'url' = 'jdbc:postgresql://…',
>         >
>         >                  'table-name' = 'table1’,
>         >
>         >                  'username' = 'username',
>         >
>         >                  'password' = 'password',
>         >
>         >                  'scan.fetch-size' = '500',
>         >
>         >                  'scan.auto-commit' = 'false'
>         >
>         >                )""")
>         >
>         >              streamTableEnv.executeSql(
>         >
>         >         """
>         >
>         >                CREATE TABLE table2 (
>         >
>         >                  attr STRING,
>         >
>         >                  id2 STRING
>         >
>         >                ) WITH (
>         >
>         >                  'connector' = 'jdbc',
>         >
>         >                  'url' = 'jdbc:postgresql://…',
>         >
>         >                  'table-name' = 'table2',
>         >
>         >                  'username' = 'username',
>         >
>         >                  'password' = 'password',
>         >
>         >                  'scan.fetch-size' = '500',
>         >
>         >                  'scan.auto-commit' = 'false'
>         >
>         >                )""")
>         >
>         >              streamTableEnv.executeSql(
>         >
>         >         """
>         >
>         >                CREATE TABLE table3 (
>         >
>         >                  attr STRING PRIMARY KEY NOT ENFORCED,
>         >
>         >                  attr_mapped STRING
>         >
>         >                ) WITH (
>         >
>         >                  'connector' = 'jdbc',
>         >
>         >                  'url' = 'jdbc:postgresql://…',
>         >
>         >                  'table-name' = ‘table3',
>         >
>         >                  'username' = ‘username',
>         >
>         >                  'password' = 'password',
>         >
>         >                  'scan.fetch-size' = '500',
>         >
>         >                  'scan.auto-commit' = 'false'
>         >
>         >                )""")
>         >
>         >              streamTableEnv.executeSql("""
>         >
>         >                CREATE TABLE sink (
>         >
>         >                  id STRING PRIMARY KEY NOT ENFORCED,
>         >
>         >                  attr STRING,
>         >
>         >                  attr_mapped STRING
>         >
>         >                ) WITH (
>         >
>         >                  'connector' = 'jdbc',
>         >
>         >                  'url' = 'jdbc:postgresql://…,
>         >
>         >                  'table-name' = 'sink',
>         >
>         >                  'username' = 'username',
>         >
>         >                  'password' = 'password',
>         >
>         >                  'scan.fetch-size' = '500',
>         >
>         >                  'scan.auto-commit' = 'false'
>         >
>         >                )""")
>         >
>         >         valview=
>         >
>         >                streamTableEnv.sqlQuery("""
>         >
>         >                SELECT
>         >
>         >                  COALESCE(t1.id1, t2.id2) AS id,
>         >
>         >                  COALESCE(t2.attr, t1.attr) AS attr,
>         >
>         >                  COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS
> attr_mapped
>         >
>         >                FROM table1 t1
>         >
>         >                FULL JOIN (
>         >
>         >                  SELECT
>         >
>         >                    id2,
>         >
>         >                    FIRST_VALUE(attr) AS attr
>         >
>         >                  FROM table2
>         >
>         >                  GROUP BY id2
>         >
>         >                ) t2
>         >
>         >                 ON (t1.id1 = t2.id2)
>         >
>         >                LEFT JOIN table3 t3
>         >
>         >                  ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
>         >
>         >              streamTableEnv.createTemporaryView("view", view)
>         >
>         >         valstatementSet= streamTableEnv.createStatementSet()
>         >
>         >              statementSet.addInsertSql("""
>         >
>         >                INSERT INTO sink SELECT * FROM view
>         >
>         >              """)
>         >
>         >              statementSet.execute().await()
>         >
>         >            }
>         >
>         >         }
>         >
>
>
>

Reply via email to