Hi, The problem is that sTime is not a Time Attribute[1], which has to be aligned with watermarks mechanism. Right now you cannot create a time attribute from within TableFunction, as far as I know.
What you could do is to do the splitting logic in DataStream API and register a proper table with implemented watermarks in TableEnvironment. Then you can apply the windowing on a table prepared that way. Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/streaming.html#time-attributes On 23/10/2018 06:45, maidangdang44 wrote: > below is one line of my source, the body containes the user logs: > { > body: [ > "user1,url1,2018-10-23 00:00:00;user2,url2,2018-10-23 > 00:01:00;user3,url3,2018-10-23 00:02:00" > ] > } > > > I user LATERAL TABLE and a User-Defined TableFunction flatmap the > source to a new table log, and I want to group by the time and > username, here is my code: > > public class BodySplitFun extends TableFunction<Tuple3<String, String, > Long>> { > private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd > HH:mm:ss"); > public void eval(Row bodyRow) { > String body = bodyRow.getField(0).toString(); > String[] lines = body.split(";"); > for (String line : lines) { > String user = line.split(",")[0]; > String url = line.split(",")[1]; > String sTime = line.split(",")[2]; > collect(new Tuple3<>(user, url, > sdf.parse(sTime).getTime()); > } > } > } > } > > tblEnv.registerFunction("bodySplit", new BodySplitFun()); > tblEnv.sqlUpdate( > "select > count(username) > from > ( > SELECT > username, > url, > sTime > FROM > mySource LEFT JOIN LATERAL TABLE(bodySplit(body)) > as T(username, url, sTime) ON TRUE > ) > log > group by > TUMBLE(log.sTime, INTERVAL '1' MINUTE), log.username"); > > when I run my program, I got these error message: > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: > Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL > DAY>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)' > 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)' > at > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) > at > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) > ... 49 more > > how can I group by the sTime in the table log? > > > > >
signature.asc
Description: OpenPGP digital signature