Hi,
We have one table A in database. We are loading that table into flink using 
Flink SQL JdbcCatalog.

Here is how we are loading the data
val catalog = new JdbcCatalog("my_catalog", "database_name", username, 
password, url)
streamTableEnvironment.registerCatalog("my_catalog", catalog)
streamTableEnvironment.useCatalog("my_catalog")

val query = "select timestamp, count from A"
val sourceTable = streamTableEnvironment.sqlQuery(query)
streamTableEnvironment.createTemporaryView("innerTable", sourceTable)

val aggregationQuery =
select window_end, sum(count)
from TABLE(TUMBLE(TABLE innerTable, DESCRIPTOR(timestamp), INTERVAL '10' 
minutes))
group by window_end

It throws following error
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. The window function TUMBLE(TABLE table_name, 
DESCRIPTOR(timecol), datetime interval[, datetime interval]) requires the 
timecol is a time attribute type, but is TIMESTAMP(6).

In short we want to apply windowing aggregation on an already existing column. 
How can we do that
Note - This is a batch processing

Thanks,
Suparn

Reply via email to