Hi all, I am on Flink 1.12.3. So here’s the scenario: I have a Kafka topic as a source, where each record repsents a change to an append only audit log. The kafka record has the following fields:
I am trying to calculate the average item count and duration per operation. I first converted the kafka source to an append only data stream and then I attempted to run the following SQL: Table workLogTable = tableEnv.fromDataStream(workLogStream)
tableEnv.createTemporaryView("work_log", workLogTable); Table workLogCntTable = tableEnv.sqlQuery("select operation_id, operation, max(start_ts) as start_ts, max(end_ts) as end_ts, count(*) as item_count, max(audit_ts) as audit_ts, max(event_time) as max_event_time" + " FROM work_log GROUP BY operation_id, operation"); tableEnv.createTemporaryView("work_log_cnt", workLogCntTable); tableEnv.executeSql("select max(audit_ts), operation, avg(item_count) as average_item_count, AVG(end_ts - start_ts) as avg_duration from" + " work_log_cnt" + " GROUP BY TUMBLE(max_event_time, INTERVAL '1' SECOND), operation").print(); The problem I am having is that I am unable to preserve the event time between the first view and the second. I am getting this error: caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 139 to line 1, column 181: Cannot apply '$TUMBLE' to arguments of type '$TUMBLE(<BIGINT>, <INTERVAL SECOND>)'. Supported form(s):
'$TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)' My guess is that the max function in the first query is converting the event time from DATETIME type to a BigInt. I am not sure how to apply an aggregate to the event time in the first query such that the event time from the original
kafka stream can be used in the second view. Is there a way to make this work? Thanks, Joe |
- Windowed Aggregation With Event Time over a Temporary Vie... Joseph Lorenzini