Hi Dylan,
streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
is currently not supported by the Table & SQL API. For now,
val settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
determines the mode. Thus, I would remove the line again.
If you want to use `inBatchMode()`, you can use the unified
TableEnvironment that is not connected to the StreamExecutionEnvironment:
TableEnvironment.create(settings);
So Pitorek's answer hopefully makes more sense now.
Regards,
Timo
On 14.04.21 16:08, Dylan Forciea wrote:
Pitorek,
I was actually originally using a group function that WAS deterministic
(but was a custom UDF I made), but chose something here built in. By
non-deterministic, I mean that the number of records coming out is not
consistent. Since the FIRST_VALUE here is on an attribute that is not
part of the key, that shouldn’t affect the number of records coming out
I wouldn’t think.
Dylan
*From: *Piotr Nowojski <pnowoj...@apache.org>
*Date: *Wednesday, April 14, 2021 at 9:06 AM
*To: *Dylan Forciea <dy...@oseberg.io>
*Cc: *"user@flink.apache.org" <user@flink.apache.org>
*Subject: *Re: Nondeterministic results with SQL job when parallelism is > 1
Hi,
Yes, it looks like your query is non deterministic because of
`FIRST_VALUE` used inside `GROUP BY`. If you have many different
parallel sources, each time you run your query your first value might be
different. If that's the case, you could try to confirm it with even
smaller query:
SELECT
id2,
FIRST_VALUE(attr) AS attr
FROM table2
GROUP BY id2
Best,
Piotrek
śr., 14 kwi 2021 o 14:45 Dylan Forciea <dy...@oseberg.io
<mailto:dy...@oseberg.io>> napisał(a):
I am running Flink 1.12.2, and I was trying to up the parallelism of
my Flink SQL job to see what happened. However, once I did that, my
results became nondeterministic. This happens whether I set the
table.exec.resource.default-parallelism config option or I set the
default local parallelism to something higher than 1. I would end up
with less records in the end, and each time I ran the output record
count would come out differently.
I managed to distill an example, as pasted below (with attribute
names changed to protect company proprietary info), that causes the
issue. I feel like I managed to get it to happen with a LEFT JOIN
rather than a FULL JOIN, but the distilled version wasn’t giving me
wrong results with that. Maybe it has to do with joining to a table
that was formed using a GROUP BY? Can somebody tell if I’m doing
something that is known not to work, or if I have run across a bug?
Regards,
Dylan Forciea
objectJob{
defmain(args: Array[String]): Unit= {
StreamExecutionEnvironment.setDefaultLocalParallelism(1)
valsettings=
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
valstreamEnv= StreamExecutionEnvironment.getExecutionEnvironment
valstreamTableEnv= StreamTableEnvironment.create(streamEnv, settings)
valconfiguration= streamTableEnv.getConfig().getConfiguration()
configuration.setInteger("table.exec.resource.default-parallelism", 16)
streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamTableEnv.executeSql(
"""
CREATE TABLE table1 (
id1 STRING PRIMARY KEY NOT ENFORCED,
attr STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table1’,
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
)""")
streamTableEnv.executeSql(
"""
CREATE TABLE table2 (
attr STRING,
id2 STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = 'table2',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
)""")
streamTableEnv.executeSql(
"""
CREATE TABLE table3 (
attr STRING PRIMARY KEY NOT ENFORCED,
attr_mapped STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…',
'table-name' = ‘table3',
'username' = ‘username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
)""")
streamTableEnv.executeSql("""
CREATE TABLE sink (
id STRING PRIMARY KEY NOT ENFORCED,
attr STRING,
attr_mapped STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://…,
'table-name' = 'sink',
'username' = 'username',
'password' = 'password',
'scan.fetch-size' = '500',
'scan.auto-commit' = 'false'
)""")
valview=
streamTableEnv.sqlQuery("""
SELECT
COALESCE(t1.id1, t2.id2) AS id,
COALESCE(t2.attr, t1.attr) AS operator,
COALESCE(t3.attr_mapped, t2.attr, t1.attr) AS attr_mapped
FROM table1 t1
FULL JOIN (
SELECT
id2,
FIRST_VALUE(attr) AS attr
FROM table2
GROUP BY id2
) t2
ON (t1.id1 = t2.id2)
LEFT JOIN table3 t3
ON (COALESCE(t2.attr, t1.attr) = t3.attr)""")
streamTableEnv.createTemporaryView("view", view)
valstatementSet= streamTableEnv.createStatementSet()
statementSet.addInsertSql("""
INSERT INTO sink SELECT * FROM view
""")
statementSet.execute().await()
}
}