Hi, Could you check your query again? I could not reproduce your issue on latest master. I had to adjust your query slightly though:
SELECT a.account, (SUM(a.`value`) + SUM(b.`value`)) as `result`, TUMBLE_START(a.producer_timestamp, INTERVAL '3' MINUTE) FROM (SELECT account, `value`, producer_timestamp FROM table1) a, (SELECT account, `value`, producer_timestamp FROM table2) b WHERE a.account = b.account AND a.producer_timestamp BETWEEN b.producer_timestamp - INTERVAL '3' MINUTE AND b.producer_timestamp group by a.account, TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE) Best, Dawid On 04/01/2020 04:06, Kurt Young wrote: > Looks like a bug to me, could you fire an issue for this? > > Best, > Kurt > > > On Thu, Jan 2, 2020 at 9:06 PM jeremyji <18868129...@163.com > <mailto:18868129...@163.com>> wrote: > > Two stream as table1, table2. We know that group with regular join > won't work > so we have to use time-windowed join. So here is my flink sql > looks like: > > *SELECT > a.account account, > SUM(a.value) + SUM(b.value), > UNIX_TIMESTAMP(TUMBLE_START(a.producer_timestamp, INTERVAL '3' > MINUTE)) > FROM > (SELECT > account, > value, > producer_timestamp > FROM > table1) a, > (SELECT > account, > value, > producer_timestamp > FROM > table2) b > WHERE > a.account = b.account AND > a.producer_timestamp BETWEEN b.producer_timestamp - > INTERVAL '3' > MINUTE AND b.producer_timestamp) > group by > a.account, > TUMBLE(a.producer_timestamp, INTERVAL '3' MINUTE)* > But i still got error from flink: > > /Rowtime attributes must not be in the input rows of a regular > join. As a > workaround you can cast the time attributes of input tables to > TIMESTAMP > before. > Please check the documentation for the set of currently supported SQL > features. > at > > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:450) > at > > org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:369) > at > > org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814) > at > > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860) > at > > org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344) > at > > org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1048) > at > > org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:962) > at > > org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:922) > ..../ > I think i use time-windowed join just like this doc > > says:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#time-windowed-joins. > But flink told me its a regular join. Is there anything wrong i > haven't > notice? > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
signature.asc
Description: OpenPGP digital signature