I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink
SQL job to see what happened. However, once I did that, my results became
nondeterministic. This happens whether I set the
table.exec.resource.default-parallelism config option or I set the default
local parallelism to something higher than 1. I would end up with less records
in the end, and each time I ran the output record count would come out
differently.
I managed to distill an example, as pasted below (with attribute names changed
to protect company proprietary info), that causes the issue. I feel like I
managed to get it to happen with a LEFT JOIN rather than a FULL JOIN, but the
distilled version wasn’t giving me wrong results with that. Maybe it has to do
with joining to a table that was formed using a GROUP BY? Can somebody tell if
I’m doing something that is known not to work, or if I have run across a bug?
Regards,
Dylan Forciea
object Job {
def main(args: Array[String]): Unit = {
StreamExecutionEnvironment.setDefaultLocalParallelism(1)
val settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)
val configuration = streamTableEnv.getConfig().getConfiguration()
configuration.setInteger("table.exec.resource.default-parallelism", 16)
streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamTableEnv.executeSql(
"""
CREATE TABLE table1 (
id1 STRING PRIMARY KEY NOT ENFORCED,
attr STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table1’,
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
)""")
streamTableEnv.executeSql(
"""
CREATE TABLE table2 (
attr STRING,
id2 STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table2',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
)""")
streamTableEnv.executeSql(
"""
CREATE TABLE table3 (
attr STRING PRIMARY KEY NOT ENFORCED,
attr_mapped STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = ‘table3',
'username' = ‘username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
)""")
streamTableEnv.executeSql("""
CREATE TABLE sink (
id STRING PRIMARY KEY NOT ENFORCED,
attr STRING,
attr_mapped STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…,
'table-name' = 'sink',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
)""")
val view =
streamTableEnv.sqlQuery("""
SELECT
COALESCE(t1.id1, t2.id2) AS id,
COALESCE(t2.attr, t1.attr) AS operator,
COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
FROM table1 t1
FULL JOIN (
SELECT
id2,
FIRST_VALUE(attr) AS attr
FROM table2
GROUP BY id2
) t2
ON (t1.id1 = t2.id2)
LEFT JOIN table3 t3
ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
streamTableEnv.createTemporaryView("view", view)
val statementSet = streamTableEnv.createStatementSet()
statementSet.addInsertSql("""
INSERT INTO sink SELECT * FROM view
""")
statementSet.execute().await()
}
}