[
https://issues.apache.org/jira/browse/FLINK-20128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17231347#comment-17231347
]
Shengkai Fang commented on FLINK-20128:
---------------------------------------
[~dian.fu] In my local enviroment, my java code are
{code:java}
// Some comments here
tEnv.executeSql(
"CREATE TABLE datagen(\n" +
" foo INT,\n" +
" id AS mod(foo, 2),\n" +
" message_time AS
to_timestamp(from_unixtime(FLOOR(foo/2))),\n" +
" WATERMARK FOR message_time
AS message_time\n" +
") WITH (" +
" 'connector' = 'datagen'," +
" 'rows-per-second'='2'," +
"
'fields.foo.kind'='sequence'," +
" 'fields.foo.start'='0'," +
" 'fields.foo.end'='19'" +
")"
);
tEnv.executeSql(
"SELECT foo, id, avg(foo) OVER w AS lagfoo \n" +
"FROM datagen \n" +
"WINDOW w AS (PARTITION BY id
ORDER BY message_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)"
).print();
{code}
The results are
{code}
+----+-------------+-------------+-------------+
| op | foo | id | lagfoo |
+----+-------------+-------------+-------------+
| +I | 0 | 0 | 0 |
| +I | 1 | 1 | 1 |
| +I | 3 | 1 | 2 |
| +I | 2 | 0 | 1 |
| +I | 4 | 0 | 2 |
| +I | 5 | 1 | 3 |
| +I | 7 | 1 | 4 |
| +I | 6 | 0 | 3 |
| +I | 8 | 0 | 4 |
| +I | 9 | 1 | 5 |
| +I | 10 | 0 | 5 |
| +I | 12 | 0 | 6 |
| +I | 11 | 1 | 6 |
| +I | 13 | 1 | 7 |
| +I | 14 | 0 | 7 |
| +I | 16 | 0 | 8 |
| +I | 15 | 1 | 8 |
| +I | 17 | 1 | 9 |
| +I | 18 | 0 | 9 |
| +I | 19 | 1 | 10 |
+----+-------------+-------------+-------------+
{code}
I am not sure the code above is as same as the python code.
> Data loss for over windows with rows unbounded preceding
> --------------------------------------------------------
>
> Key: FLINK-20128
> URL: https://issues.apache.org/jira/browse/FLINK-20128
> Project: Flink
> Issue Type: Bug
> Components: API / Python, Table SQL / Planner
> Affects Versions: 1.12.0, 1.11.2
> Reporter: Thilo Schneider
> Priority: Major
>
> When using partitioned, unbounded over windows, all but one partitions are
> dropped in the output dataset:
> {code:python}
> # Setup
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> from biafflink import debug_print_table
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> t_env = StreamTableEnvironment.create( environment_settings=env_settings)
> t_env.get_config().get_configuration().set_integer("table.exec.resource.default-parallelism",
> 1)
> t_env.execute_sql("""
> CREATE TABLE datagen (
> foo INT,
> id AS mod(foo, 2),
> message_time AS to_timestamp(from_unixtime(FLOOR(foo/2))),
> WATERMARK FOR message_time AS message_time
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second'='2',
> 'fields.foo.kind'='sequence',
> 'fields.foo.start'='0',
> 'fields.foo.end'='19'
> )""")
> t_env.execute_sql("CREATE TABLE output (foo INT, id INT, lagfoo INT) WITH
> ('connector' = 'print')")
> {code}
> Using bounded over windows, everything works as expected:
> {code:python}
> t = t_env.sql_query("""
> SELECT foo, id, avg(foo) OVER w AS lagfoo
> FROM datagen
> WINDOW w AS (PARTITION BY id ORDER BY message_time ROWS BETWEEN 1
> PRECEDING AND CURRENT ROW)""")
> t.execute_insert("output")
> {code}
> yields
> {code:python}
> +I(0,0,0)
> +I(1,1,1)
> +I(2,0,1)
> +I(3,1,2)
> +I(4,0,3)
> ...
> {code}
> If we change the window to unbounded preceding:
> {code:python}
> t = t_env.sql_query("""
> SELECT foo, id, avg(foo) OVER w AS lagfoo
> FROM datagen
> WINDOW w AS (PARTITION BY id ORDER BY message_time ROWS BETWEEN UNBOUNDED
> PRECEDING AND CURRENT ROW)""")
> t.execute_insert("output")
> {code}
> we loose all of id == 1:
> {code:python}
> +I(0,0,0)
> +I(2,0,1)
> +I(4,0,2)
> +I(6,0,3)
> +I(8,0,4)
> ...
> {code}
> I observed this problem with various aggregate functions and both under
> 1.11.2 and 1.12rc1.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)