Hi Timo, thanks for your help! I tried to follow the examples in the tests but I still have the same issue. I changed my schema and added an additional field "rowtime". My schema now is: root |-- rowtime: org.apache.flink.table.expressions.RowtimeAttribute(expr: GenericType<org.apache.flink.table.expressions.Expression>) |-- time: Long |-- host: String
If I run the code: table.select('rowtime).toDataStream[Row].print() I get: RowtimeAttribute(1495580133000) RowtimeAttribute(1495580143000) RowtimeAttribute(1495580153000) But If I run: table.window(Tumble over 1.minutes on 'rowtime as 'w).groupBy('host, 'w).select('host) I still get the previous error: TumblingGroupWindow('w, 'rowtime, 60000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment. I'm using a Kafka09TableSource as data source, but it doesn't allow me to specify the timestamp assigner. I think the actual consumer is not exposed to the user so I cannot really call assignTimestampsAndWatermarks. May that be the problem? Should we expose that function so that we can assign timestamp and watermark to a TableSource? The time characteristic in the execution environment is set to EventTime in my code. Cheers, Enrico On Wed, May 24, 2017 at 2:08 AM, Timo Walther <twal...@apache.org> wrote: > Hi Enrico, > > the docs of the 1.3-SNAPSHOT are a bit out of sync right now, but they > will be updated in the next days/1-2 weeks. > > We recently introduced so-called "time indicators". These are attributes > that correspond to Flink's time and watermarks. You declare a logical field > that represents Flink's internal time in a table program. > > In your example you need to append a "time.rowtime" or "time.proctime" to > your table schema definition. > > You can find some examples here: > https://github.com/apache/flink/blob/master/flink- > libraries/flink-table/src/test/scala/org/apache/flink/ > table/runtime/datastream/TimeAttributesITCase.scala > > If you have further question, feel free to ask them. It helps us to > improve the documenation. > > Regards, > Timo > > > > Am 24.05.17 um 04:15 schrieb enrico canzonieri: > > Hi, > I'm trying to window and groupBy a stream using the table api, but I get > ValidationException in the windowing function. > Here is the relevant code: > > tableEnv.registerTableSource(schema.getName, src) > val table = tableEnv.scan(schema.getName) > val t = table.window(Tumble over 1.minutes on 'time as 'w).groupBy('host, > 'w).select('host) > > "time" is defined as Long in my schema. The error I get is: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > TumblingGroupWindow('w, 'time, 60000.millis) is invalid: Tumbling window > expects a time attribute for grouping in a stream environment. > > I also tried to define a window that was using processing time, but what > described in the documentation "Tumble over 1.minutes as 'w" doesn't > seem to work anymore. Specifically it seems that a window now always > expects the "on" call. > > Has anybody encountered this issue? I'm using Flink 1.3-SNAPSHOT. > > thanks > > >