Hi!
We are using apache-flink-1.4.2. It seems this version doesn't support
count(DISTINCT). I am trying to find a way to dedup the stream. So I tried:
SELECT
CONCAT_WS(
'-',
CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR),
CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR),
CAST(user_id AS VARCHAR)
),
COUNT(*DISTINCT*(event_id)) AS event_count
FROM event_foo
GROUP BY user_id, MONTH(longToDateTime(rowtime)),
YEAR(longToDateTime(rowtime))
(the duplicate events have the same 'event_id' (and user_id), the other
fields e.g. timestamps may or may not be different)
But that failed because DISTINCT is not supported. As a workaround I tried:
SELECT
CONCAT_WS(
'-',
CAST(MONTH(row_datetime) AS VARCHAR),
CAST(YEAR(row_datetime) AS VARCHAR),
CAST(user_id AS VARCHAR)
),
COUNT(event_id) AS event_count
FROM (
SELECT
user_id,
event_id,
maxtimestamp(longToDateTime(rowtime)) as row_datetime
FROM event_foo
GROUP BY event_id, user_id
)
GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)
I am hoping the inner SELECT to do the deduping because logically it is
equivalent to a DISTINCT. This works in my functional testing.
Will it also work if the dedups span different event buckets? I was hoping
that as long as the events arrive within the state "retention time" in
flink they should be deduped but I am new to Flink so I am not sure about
that. Can someone please correct me if I am wrong? Is this a reasonable
workaround for lack of DISTINCT support? Please let me know if there is a
better way.
Thanks,
Vinod