Hi Tao,

computing a group window requires that the event-time timestamp of the
DataStream is exposed as a time attribute (in your case as an event time
attribute).
If you register DataStream at the TableEnvironment, this has to be done in
two steps:

1) assign timestamps and watermarks to the DataStream [1]:

val kinesisStream = env
  .fromCollection(testData)
  .assignTimestampsAndWatermarks(yourAssigner) // yourAssigner should
extract the event_timestamp field of the Avro record and assign
watermarks

2) declare the event timestamp of the DataStream as an attribute in the
schema of the table [2]:

tableEnv.registerDataStream(streamName, kinesisStream, 'nd_key,
'concept_rank, 'event_timestamp.rowtime);


Once event_timestamp is declared as time attribute, it can be used in
window functions.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html#event-time

2017-12-05 1:40 GMT+01:00 Tao Xia <t...@udacity.com>:

> Thanks for the quick response Fabian
>
> I have DataStream of avro objects.  Not sure how to add a TIMESTAMP
> attribute or convert the event_timestramp field to Timestamp Attribute for
> my SQL use cases.  Most docs only covers the Table API with static schema.
> p.s. my Avro schema has 100+ fields.
> Can you guide me how to prepare my query to aggregate by nd_key and
> event_timestamp per hour?
>
> val testData = List(
>
>   
> UnifiedEvent.newBuilder().setNdKey("nd101").setConceptRank(10).setEventTimestamp(1512172415.longValue()).build(),
>   
> UnifiedEvent.newBuilder().setNdKey("nd101").setConceptRank(15).setEventTimestamp(1512172415.longValue()).build(),
>   
> UnifiedEvent.newBuilder().setNdKey("nd102").setConceptRank(20).setEventTimestamp(1512172415.longValue()).build(),
>   
> UnifiedEvent.newBuilder().setNdKey("nd102").setConceptRank(25).setEventTimestamp(1512172415.longValue()).build()
> )
>
> val kinesisStream = env.fromCollection(testData)
>
> tableEnv.registerDataStream(streamName, avroStream);
>
> val query = "SELECT nd_key, sum(concept_rank) FROM "+streamName + " GROUP
> BY nd_key"
>
> Thanks,
> Tao
>
> On Mon, Dec 4, 2017 at 3:32 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> yes, Apache Calcite's group window functions are supported.
>>
>> The error message tells you that the attribute event_timestamp should be
>> of type DATETIME (or TIMESTAMP) and not BIGINT.
>> Please check the documentation for details [1].
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/table/sql.html#group-windows
>>
>> 2017-12-04 22:17 GMT+01:00 Tao Xia <t...@udacity.com>:
>>
>>> Hi All,
>>>   Do you know if window function supported on SQL yet?
>>>   I got the error message when trying to use group function in SQL.
>>>
>>> My query below:
>>>
>>> val query = "SELECT nd_key, concept_rank, event_timestamp FROM "+streamName 
>>> + " GROUP BY TUMBLE(event_timestamp, INTERVAL '1' HOUR), nd_key"
>>>
>>>
>>> Error Message:
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException:
>>> SQL validation failed. From line 1, column 74 to line 1, column 115: Cannot
>>> apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'.
>>> Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
>>> 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)'
>>> at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(Fli
>>> nkPlannerImpl.scala:93)
>>> at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEn
>>> vironment.scala:561)
>>> at com.udacity.data.pipeline.AggregationJob$.main(AggregationJo
>>> b.scala:43)
>>> at com.udacity.data.pipeline.AggregationJob.main(AggregationJob.scala)
>>> Caused by: org.apache.calcite.runtime.CalciteContextException: From
>>> line 1, column 74 to line 1, column 115: Cannot apply 'TUMBLE' to arguments
>>> of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'. Supported form(s):
>>> 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
>>> 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)'
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance(Native
>>> ConstructorAccessorImpl.java:62)
>>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(De
>>> legatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Reso
>>> urces.java:463)
>>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:803)
>>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:788)
>>> at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidati
>>> onError(SqlValidatorImpl.java:4654)
>>> at org.apache.calcite.sql.SqlCallBinding.newValidationSignature
>>> Error(SqlCallBinding.java:284)
>>> at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSi
>>> ngleOperandType(FamilyOperandTypeChecker.java:92)
>>> at org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOp
>>> erandTypes(FamilyOperandTypeChecker.java:109)
>>> at org.apache.calcite.sql.type.CompositeOperandTypeChecker.chec
>>> kOperandTypes(CompositeOperandTypeChecker.java:243)
>>> at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOper
>>> ator.java:659)
>>> at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOpera
>>> tor.java:432)
>>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:287)
>>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:223)
>>> at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeV
>>> isitor.visit(SqlValidatorImpl.java:5374)
>>> at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeV
>>> isitor.visit(SqlValidatorImpl.java:5361)
>>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:138)
>>> at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeI
>>> mpl(SqlValidatorImpl.java:1595)
>>> at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(
>>> SqlValidatorImpl.java:1580)
>>> at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:225)
>>> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateGro
>>> upClause(SqlValidatorImpl.java:3824)
>>> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSel
>>> ect(SqlValidatorImpl.java:3210)
>>> at org.apache.calcite.sql.validate.SelectNamespace.validateImpl
>>> (SelectNamespace.java:60)
>>> at org.apache.calcite.sql.validate.AbstractNamespace.validate(A
>>> bstractNamespace.java:84)
>>> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNam
>>> espace(SqlValidatorImpl.java:945)
>>> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQue
>>> ry(SqlValidatorImpl.java:926)
>>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226)
>>> at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSco
>>> pedExpression(SqlValidatorImpl.java:901)
>>> at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(Sq
>>> lValidatorImpl.java:611)
>>> at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(Fli
>>> nkPlannerImpl.scala:89)
>>> ... 3 more
>>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
>>> Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL
>>> HOUR>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
>>> 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>, <TIME>)'
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance(Native
>>> ConstructorAccessorImpl.java:62)
>>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(De
>>> legatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>> at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Reso
>>> urces.java:463)
>>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
>>> ... 30 more
>>>
>>
>>
>

Reply via email to