Hi!

We are using apache-flink-1.4.2. It seems this version doesn't support
count(DISTINCT). I am trying to find a way to dedup the stream. So I tried:

SELECT

    CONCAT_WS(

      '-',

      CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR),

      CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR),

      CAST(user_id AS VARCHAR)

    ),

    COUNT(*DISTINCT*(event_id)) AS event_count

FROM event_foo

GROUP BY user_id, MONTH(longToDateTime(rowtime)),
YEAR(longToDateTime(rowtime))


(the duplicate events have the same 'event_id' (and user_id), the other
fields e.g. timestamps may or may not be different)


But that failed because DISTINCT is not supported. As a workaround I tried:

SELECT

    CONCAT_WS(

      '-',

      CAST(MONTH(row_datetime) AS VARCHAR),

      CAST(YEAR(row_datetime) AS VARCHAR),

      CAST(user_id AS VARCHAR)

    ),

    COUNT(event_id) AS event_count

FROM (

    SELECT

        user_id,

        event_id,

        maxtimestamp(longToDateTime(rowtime)) as row_datetime

    FROM event_foo

    GROUP BY event_id, user_id

)

GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime)

I am hoping the inner SELECT to do the deduping because logically it is
equivalent to a DISTINCT. This works in my functional testing.

Will it also work if the dedups span different event buckets? I was hoping
that as long as the events arrive within the state "retention time" in
flink they should be deduped but I am new to Flink so I am not sure about
that. Can someone please correct me if I am wrong? Is this a reasonable
workaround for lack of DISTINCT support? Please let me know if there is a
better way.

Thanks,
Vinod

Reply via email to