Hi Tao, computing a group window requires that the event-time timestamp of the DataStream is exposed as a time attribute (in your case as an event time attribute). If you register DataStream at the TableEnvironment, this has to be done in two steps:
1) assign timestamps and watermarks to the DataStream [1]: val kinesisStream = env .fromCollection(testData) .assignTimestampsAndWatermarks(yourAssigner) // yourAssigner should extract the event_timestamp field of the Avro record and assign watermarks 2) declare the event timestamp of the DataStream as an attribute in the schema of the table [2]: tableEnv.registerDataStream(streamName, kinesisStream, 'nd_key, 'concept_rank, 'event_timestamp.rowtime); Once event_timestamp is declared as time attribute, it can be used in window functions. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html#event-time 2017-12-05 1:40 GMT+01:00 Tao Xia <t...@udacity.com>: > Thanks for the quick response Fabian > > I have DataStream of avro objects. Not sure how to add a TIMESTAMP > attribute or convert the event_timestramp field to Timestamp Attribute for > my SQL use cases. Most docs only covers the Table API with static schema. > p.s. my Avro schema has 100+ fields. > Can you guide me how to prepare my query to aggregate by nd_key and > event_timestamp per hour? > > val testData = List( > > > UnifiedEvent.newBuilder().setNdKey("nd101").setConceptRank(10).setEventTimestamp(1512172415.longValue()).build(), > > UnifiedEvent.newBuilder().setNdKey("nd101").setConceptRank(15).setEventTimestamp(1512172415.longValue()).build(), > > UnifiedEvent.newBuilder().setNdKey("nd102").setConceptRank(20).setEventTimestamp(1512172415.longValue()).build(), > > UnifiedEvent.newBuilder().setNdKey("nd102").setConceptRank(25).setEventTimestamp(1512172415.longValue()).build() > ) > > val kinesisStream = env.fromCollection(testData) > > tableEnv.registerDataStream(streamName, avroStream); > > val query = "SELECT nd_key, sum(concept_rank) FROM "+streamName + " GROUP > BY nd_key" > > Thanks, > Tao > > On Mon, Dec 4, 2017 at 3:32 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> yes, Apache Calcite's group window functions are supported. >> >> The error message tells you that the attribute event_timestamp should be >> of type DATETIME (or TIMESTAMP) and not BIGINT. >> Please check the documentation for details [1]. >> >> Best, Fabian >> >> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >> dev/table/sql.html#group-windows >> >> 2017-12-04 22:17 GMT+01:00 Tao Xia <t...@udacity.com>: >> >>> Hi All, >>> Do you know if window function supported on SQL yet? >>> I got the error message when trying to use group function in SQL. >>> >>> My query below: >>> >>> val query = "SELECT nd_key, concept_rank, event_timestamp FROM "+streamName >>> + " GROUP BY TUMBLE(event_timestamp, INTERVAL '1' HOUR), nd_key" >>> >>> >>> Error Message: >>> Exception in thread "main" org.apache.flink.table.api.ValidationException: >>> SQL validation failed. From line 1, column 74 to line 1, column 115: Cannot >>> apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'. >>> Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)' >>> 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)' >>> at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(Fli >>> nkPlannerImpl.scala:93) >>> at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEn >>> vironment.scala:561) >>> at com.udacity.data.pipeline.AggregationJob$.main(AggregationJo >>> b.scala:43) >>> at com.udacity.data.pipeline.AggregationJob.main(AggregationJob.scala) >>> Caused by: org.apache.calcite.runtime.CalciteContextException: From >>> line 1, column 74 to line 1, column 115: Cannot apply 'TUMBLE' to arguments >>> of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'. Supported form(s): >>> 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)' >>> 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)' >>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >>> at sun.reflect.NativeConstructorAccessorImpl.newInstance(Native >>> ConstructorAccessorImpl.java:62) >>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(De >>> legatingConstructorAccessorImpl.java:45) >>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >>> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Reso >>> urces.java:463) >>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:803) >>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:788) >>> at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidati >>> onError(SqlValidatorImpl.java:4654) >>> at org.apache.calcite.sql.SqlCallBinding.newValidationSignature >>> Error(SqlCallBinding.java:284) >>> at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSi >>> ngleOperandType(FamilyOperandTypeChecker.java:92) >>> at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOp >>> erandTypes(FamilyOperandTypeChecker.java:109) >>> at org.apache.calcite.sql.type.CompositeOperandTypeChecker.chec >>> kOperandTypes(CompositeOperandTypeChecker.java:243) >>> at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOper >>> ator.java:659) >>> at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOpera >>> tor.java:432) >>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:287) >>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:223) >>> at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeV >>> isitor.visit(SqlValidatorImpl.java:5374) >>> at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeV >>> isitor.visit(SqlValidatorImpl.java:5361) >>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:138) >>> at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeI >>> mpl(SqlValidatorImpl.java:1595) >>> at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType( >>> SqlValidatorImpl.java:1580) >>> at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:225) >>> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateGro >>> upClause(SqlValidatorImpl.java:3824) >>> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSel >>> ect(SqlValidatorImpl.java:3210) >>> at org.apache.calcite.sql.validate.SelectNamespace.validateImpl >>> (SelectNamespace.java:60) >>> at org.apache.calcite.sql.validate.AbstractNamespace.validate(A >>> bstractNamespace.java:84) >>> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNam >>> espace(SqlValidatorImpl.java:945) >>> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQue >>> ry(SqlValidatorImpl.java:926) >>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226) >>> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSco >>> pedExpression(SqlValidatorImpl.java:901) >>> at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(Sq >>> lValidatorImpl.java:611) >>> at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(Fli >>> nkPlannerImpl.scala:89) >>> ... 3 more >>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: >>> Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL >>> HOUR>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)' >>> 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)' >>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) >>> at sun.reflect.NativeConstructorAccessorImpl.newInstance(Native >>> ConstructorAccessorImpl.java:62) >>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(De >>> legatingConstructorAccessorImpl.java:45) >>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >>> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Reso >>> urces.java:463) >>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) >>> ... 30 more >>> >> >> >