Hi, We have one table A in database. We are loading that table into flink using Flink SQL JdbcCatalog.
Here is how we are loading the data val catalog = new JdbcCatalog("my_catalog", "database_name", username, password, url) streamTableEnvironment.registerCatalog("my_catalog", catalog) streamTableEnvironment.useCatalog("my_catalog") val query = "select timestamp, count from A" val sourceTable = streamTableEnvironment.sqlQuery(query) streamTableEnvironment.createTemporaryView("innerTable", sourceTable) val aggregationQuery = select window_end, sum(count) from TABLE(TUMBLE(TABLE innerTable, DESCRIPTOR(timestamp), INTERVAL '10' minutes)) group by window_end It throws following error Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. The window function TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval[, datetime interval]) requires the timecol is a time attribute type, but is TIMESTAMP(6). In short we want to apply windowing aggregation on an already existing column. How can we do that Note - This is a batch processing Thanks, Suparn