Thanks a lot Fabian for the detailed response. I know all the duplicates are going to arrive within an hour max of the actual event. So using a 1 hour running session window should be fine for me.
Is the following the right way to do it in apache-flink-1.4.2? SELECT CONCAT_WS( '-', CAST(event_month AS VARCHAR), CAST(event_year AS VARCHAR), CAST(user_id AS VARCHAR) ), COUNT(event_id) AS event_count FROM ( SELECT user_id, event_id, MAX(MONTH(longToDateTime(rowtime))) as event_month, MAX(YEAR(longToDateTime(rowtime))) as event_year, FROM event_foo GROUP BY event_id, user_id, SESSION(rowtime, INTERVAL '1' HOUR) -- 1 hour running session window ) GROUP BY user_id, event_month, event_year We are also using idle state retention time to clean up unused state, but that is much longer (a week or month depending on the usecase). We will switch to count(DISTINCT) as soon as we move to newer Flink version. So the above nested select is going to be a stop gap until then. Thanks, Vinod On Mon, Jun 3, 2019 at 4:52 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Vinod, > > IIRC, support for DISTINCT aggregates was added in Flink 1.6.0 (released > August, 9th 2018) [1]. > Also note that by default, this query will accumulate more and more state, > i.e., for each grouping key it will hold all unique event_ids. > You could configure an idle state retention time to clean up unused state. > > Regarding the boundaries, with the current query they are fixed to one > month and sharply cut (as one would expect). > You could try to use a long running session window [3]. This would also > remove the need for the idle state configuration because Flink would know > when state can be discarded. > > Hope this helps, > Fabian > > [1] > https://flink.apache.org/news/2018/08/09/release-1.6.0.html#enhancing-sql-and-table-api > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/query_configuration.html#idle-state-retention-time > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#group-windows > > Am Do., 30. Mai 2019 um 02:18 Uhr schrieb Vinod Mehra <vme...@lyft.com>: > >> Another interesting thing is that if I add DISTINCT in the 2nd query it >> doesn't complain. But because of the inner-select it is a no-op because the >> inner select is doing the deduping: >> >> SELECT >> >> CONCAT_WS( >> >> '-', >> >> CAST(MONTH(row_datetime) AS VARCHAR), >> >> CAST(YEAR(row_datetime) AS VARCHAR), >> >> CAST(user_id AS VARCHAR) >> >> ), >> >> COUNT(*DISTINCT*(event_id)) AS event_count -- note the DISTINCT >> keyword here. Flink doesn't barf for this. >> >> FROM ( >> >> SELECT >> >> user_id, >> >> event_id, >> >> maxtimestamp(longToDateTime(rowtime)) as row_datetime >> >> FROM event_foo >> >> GROUP BY event_id, user_id >> >> ) >> >> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime) >> >> On Wed, May 29, 2019 at 5:15 PM Vinod Mehra <vme...@lyft.com> wrote: >> >>> More details on the error with query#1 that used COUNT(DISTINCT()): >>> >>> org.apache.flink.table.api.TableException: Cannot generate a valid >>> execution plan for the given query: >>> >>> FlinkLogicalCalc(expr#0..8=[{inputs}], expr#9=[_UTF-16LE'-'], >>> expr#10=[CAST($t1):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE >>> "ISO-8859-1$en_US$primary"], expr#11=[CAST($t2):VARCHAR(65536) CHARACTER >>> SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary"], >>> expr#12=[CAST($t0):VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE >>> "ISO-8859-1$en_US$primary"], expr#13=[CONCAT_WS($t9, $t10, $t11, $t12)], >>> EXPR$0=[$t13], mastercard_world_elite_monthly_rides_encoded=[$t8], >>> lower_boundary=[$t3], latency_marker=[$t4]) >>> FlinkLogicalJoin(condition=[AND(IS NOT DISTINCT FROM($0, $5), IS NOT >>> DISTINCT FROM($1, $6), IS NOT DISTINCT FROM($2, $7))], joinType=[inner]) >>> FlinkLogicalAggregate(group=[{0, 1, 2}], >>> lower_boundary=[mintimestamp($4)], latency_marker=[maxtimestamp($4)]) >>> FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)], >>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)], >>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)], >>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)], >>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'], >>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)], >>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3], >>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $f4=[$t9], $condition=[$t21]) >>> >>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]]) >>> FlinkLogicalAggregate(group=[{0, 1, 2}], >>> mastercard_world_elite_monthly_rides_encoded=[COUNT($3)]) >>> FlinkLogicalAggregate(group=[{0, 1, 2, 3}]) >>> FlinkLogicalCalc(expr#0..6=[{inputs}], expr#7=[FLAG(MONTH)], >>> expr#8=[CAST($t6):TIMESTAMP(3) NOT NULL], expr#9=[longToDateTime($t8)], >>> expr#10=[Reinterpret($t9)], expr#11=[86400000], expr#12=[/INT($t10, $t11)], >>> expr#13=[EXTRACT_DATE($t7, $t12)], expr#14=[FLAG(YEAR)], >>> expr#15=[EXTRACT_DATE($t14, $t12)], expr#16=[_UTF-16LE'USD'], >>> expr#17=[=($t4, $t16)], expr#18=[_UTF-16LE'MWE'], expr#19=[=($t2, $t18)], >>> expr#20=[NOT($t5)], expr#21=[AND($t17, $t19, $t20)], user_lyft_id=[$t3], >>> $f1=[$t13], $f2=[$t15], ride_id=[$t1], $condition=[$t21]) >>> >>> FlinkLogicalNativeTableScan(table=[[event_pay_passenger_payment_succeeded_for_rewards]]) >>> >>> This exception indicates that the query uses an unsupported SQL feature. >>> Please check the documentation for the set of currently supported SQL >>> features. >>> >>> at >>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:274) >>> at >>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:683) >>> at >>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730) >>> at >>> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:414) >>> >>> >>> >>> On Wed, May 29, 2019 at 1:49 PM Vinod Mehra <vme...@lyft.com> wrote: >>> >>>> Hi! >>>> >>>> We are using apache-flink-1.4.2. It seems this version doesn't support >>>> count(DISTINCT). I am trying to find a way to dedup the stream. So I tried: >>>> >>>> SELECT >>>> >>>> CONCAT_WS( >>>> >>>> '-', >>>> >>>> CAST(MONTH(longToDateTime(rowtime)) AS VARCHAR), >>>> >>>> CAST(YEAR(longToDateTime(rowtime)) AS VARCHAR), >>>> >>>> CAST(user_id AS VARCHAR) >>>> >>>> ), >>>> >>>> COUNT(*DISTINCT*(event_id)) AS event_count >>>> >>>> FROM event_foo >>>> >>>> GROUP BY user_id, MONTH(longToDateTime(rowtime)), >>>> YEAR(longToDateTime(rowtime)) >>>> >>>> >>>> (the duplicate events have the same 'event_id' (and user_id), the other >>>> fields e.g. timestamps may or may not be different) >>>> >>>> >>>> But that failed because DISTINCT is not supported. As a workaround I >>>> tried: >>>> >>>> SELECT >>>> >>>> CONCAT_WS( >>>> >>>> '-', >>>> >>>> CAST(MONTH(row_datetime) AS VARCHAR), >>>> >>>> CAST(YEAR(row_datetime) AS VARCHAR), >>>> >>>> CAST(user_id AS VARCHAR) >>>> >>>> ), >>>> >>>> COUNT(event_id) AS event_count >>>> >>>> FROM ( >>>> >>>> SELECT >>>> >>>> user_id, >>>> >>>> event_id, >>>> >>>> maxtimestamp(longToDateTime(rowtime)) as row_datetime >>>> >>>> FROM event_foo >>>> >>>> GROUP BY event_id, user_id >>>> >>>> ) >>>> >>>> GROUP BY user_id, MONTH(row_datetime), YEAR(row_datetime) >>>> >>>> I am hoping the inner SELECT to do the deduping because logically it is >>>> equivalent to a DISTINCT. This works in my functional testing. >>>> >>>> Will it also work if the dedups span different event buckets? I was >>>> hoping that as long as the events arrive within the state "retention time" >>>> in flink they should be deduped but I am new to Flink so I am not sure >>>> about that. Can someone please correct me if I am wrong? Is this a >>>> reasonable workaround for lack of DISTINCT support? Please let me know if >>>> there is a better way. >>>> >>>> Thanks, >>>> Vinod >>>> >>>>