Two stream as table1, table2. We know that group with regular join won't work
so we have to use time-windowed join. So here is my flink sql looks like:
*SELECT
a.account account,
SUM(a.value) + SUM(b.value),
UNIX_TIMESTAMP(TUMBLE_START(a.producer_timestamp, INTERVAL '3'
MINUTE))
FROM
(SELECT
account,
value,
producer_timestamp
FROM
table1) a,
(SELECT
account,
value,
producer_timestamp
FROM
table2) b
WHERE
a.account = b.account AND
a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3'
MINUTE AND b.producer_timestamp)
group by
a.account,
TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)*
But i still got error from flink:
/Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before.
Please check the documentation for the set of currently supported SQL
features.
at
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:450)
at
org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:369)
at
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
at
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1048)
at
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:962)
at
org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:922)
..../
I think i use time-windowed join just like this doc
says:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins.
But flink told me its a regular join. Is there anything wrong i haven't
notice?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/