Hi,

This question has been posted on Stackoverflow, Slack and now the mailing
list. Please don't spam the different channels for getting support on your
question. This is a voluntary service, run by community members.

Best regards,

Martijn Visser

On Tue, Nov 22, 2022 at 7:27 AM Suparn Lele (sulele) <sul...@cisco.com>
wrote:

> Hi,
> We have one table A in database. We are loading that table into flink
> using Flink SQL JdbcCatalog.
>
> Here is how we are loading the data
> val catalog = new JdbcCatalog("my_catalog", "database_name", username,
> password, url)
> streamTableEnvironment.registerCatalog("my_catalog", catalog)
> streamTableEnvironment.useCatalog("my_catalog")
>
> val query = "select timestamp, count from A"
> val sourceTable = streamTableEnvironment.sqlQuery(query)
> streamTableEnvironment.createTemporaryView("innerTable", sourceTable)
>
> val aggregationQuery =
> select window_end, sum(count)
> from TABLE(TUMBLE(TABLE innerTable, DESCRIPTOR(timestamp), INTERVAL '10'
> minutes))
> group by window_end
>
> It throws following error
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. The window function TUMBLE(TABLE table_name,
> DESCRIPTOR(timecol), datetime interval[, datetime interval]) requires the
> timecol is a time attribute type, but is TIMESTAMP(6).
>
> In short we want to apply windowing aggregation on an already existing
> column. How can we do that
> *Note - This is a batch processing*
>
> Thanks,
> Suparn
>

Reply via email to