Hi! We are using apache-flink-1.4.2. It seems this version doesn't support count(DISTINCT). I am trying to find a way to dedup the stream. So I tried:
SELECT CONCAT_WS( '-', CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR), CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR), CAST(user_id AS VARCHAR) ), COUNT(*DISTINCT*(event_id)) AS event_count FROM event_foo GROUP BY user_id, MONTH(longToDateTime(rowtime)), YEAR(longToDateTime(rowtime)) (the duplicate events have the same 'event_id' (and user_id), the other fields e.g. timestamps may or may not be different) But that failed because DISTINCT is not supported. As a workaround I tried: SELECT CONCAT_WS( '-', CAST(MONTH(row_datetime) AS VARCHAR), CAST(YEAR(row_datetime) AS VARCHAR), CAST(user_id AS VARCHAR) ), COUNT(event_id) AS event_count FROM ( SELECT user_id, event_id, maxtimestamp(longToDateTime(rowtime)) as row_datetime FROM event_foo GROUP BY event_id, user_id ) GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime) I am hoping the inner SELECT to do the deduping because logically it is equivalent to a DISTINCT. This works in my functional testing. Will it also work if the dedups span different event buckets? I was hoping that as long as the events arrive within the state "retention time" in flink they should be deduped but I am new to Flink so I am not sure about that. Can someone please correct me if I am wrong? Is this a reasonable workaround for lack of DISTINCT support? Please let me know if there is a better way. Thanks, Vinod