Hi all,

 

I am on Flink 1.12.3.

 

So here’s the scenario: I have a Kafka topic as a source, where each record repsents a change to an append only audit log. The kafka record has the following fields:

 

  • id (unique identifier for that audit log entry)
  • operation id (is shared across multiple records)
  • operation (string)
  • start_ts (TIMESTAMP(3))
  • end_ts (TIMESTAMP(3))

 

I am trying to calculate the average item count and duration per operation. I first converted the kafka source to an append only data stream and then I attempted to run the following SQL:

 

      Table workLogTable = tableEnv.fromDataStream(workLogStream)      

      tableEnv.createTemporaryView("work_log", workLogTable);

         Table workLogCntTable = tableEnv.sqlQuery("select operation_id, operation, max(start_ts) as start_ts, max(end_ts) as end_ts, count(*) as item_count, max(audit_ts) as audit_ts, max(event_time) as max_event_time" +

            " FROM work_log GROUP BY operation_id, operation");

        tableEnv.createTemporaryView("work_log_cnt", workLogCntTable);

        tableEnv.executeSql("select max(audit_ts), operation, avg(item_count) as average_item_count, AVG(end_ts - start_ts) as avg_duration from" +

            " work_log_cnt" +

            " GROUP BY TUMBLE(max_event_time, INTERVAL '1' SECOND), operation").print();

 

The problem I am having is that I am unable to preserve the event time between the first view and the second. I am getting this error:

 

caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 139 to line 1, column 181: Cannot apply '$TUMBLE' to arguments of type '$TUMBLE(<BIGINT>, <INTERVAL SECOND>)'. Supported form(s): '$TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'

 

My guess is that the max function in the first query is converting the event time from DATETIME type to a BigInt. I am not sure how to apply an aggregate to the event time in the first query such that the event time from the original kafka stream can be used in the second view. Is there a way to make this work?

 

Thanks,

Joe

 

Privileged/Confidential Information may be contained in this message. If you are not the addressee indicated in this message (or responsible for delivery of the message to such person), you may not copy or deliver this message to anyone. In such case, you should destroy this message and kindly notify the sender by reply email. Please advise immediately if you or your employer does not consent to Internet email for messages of this kind. Opinions, conclusions and other information in this message that do not relate to the official business of my firm shall be understood as neither given nor endorsed by it.

Reply via email to