Hi izual, In such case, I think you should try COUNT DISTINCT instead of COUNT. DISTINCT will help to deduplicate, so no matter how many times you received id=1, the region count should always 3.
SELECT tblEvent.id, COUNT(DISTINCT tblDim.region) FROM tblEvent JOIN tblDim FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY tblEvent.id Best, Jark On Mon, 27 Apr 2020 at 23:41, Benchao Li <libenc...@gmail.com> wrote: > Hi izual, > > IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem. > The state is not managed in UDAF, it's managed by aggregation operator, > and > your UDAF's aggregator will be handled by operator using state. > > izual <izual...@163.com> 于2020年4月27日周一 下午11:21写道: > >> Thanks, Benchao. >> >> Maybe change the dimension table will work, but this changes a lot, >> include `size/count` is not the column of one dim table. >> I notice that user can define Aggregate Functions[1], but this page also >> said: >> > Accumulators are automatically backup-ed by Flink’s checkpointing >> mechanism and restored >> So is it right to implement my own COUNT/SUM UDF? >> >> [1]. >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions >> >> >> >> >> >> >> At 2020-04-27 17:32:14, "Benchao Li" <libenc...@gmail.com> wrote: >> >> Hi, >> >> There is indeed a state for the aggregation result, however we cannot >> disable it, it's by design. >> StreamQueryConfig.maxIdleStateRetentionTime can control how long the >> state will be kept. >> If you can ensure the time gap between two records of the same id larger >> than, for example >> 1 min, then setting retention time to 1min can resolve your issue. >> If not, maybe you need to change your dimension table, making it return >> the count directly instead >> of return the details. >> >> izual <izual...@163.com> 于2020年4月27日周一 下午5:06写道: >> >>> I implements my DimTable by extends `LookupTableSource`[1], which stores >>> data like: >>> >>> id=1 -> (SH, BJ, SD) >>> >>> id=2 -> (...) >>> >>> and then extends `TableFunction` to return the value corresponding to >>> the lookup keys,and maybe return multi rows, for example, when lookupkeys >>> is id=1, then in the `TableFunction.eval` >>> >>> ``` >>> >>> collect('SH') >>> >>> collect('BJ') >>> >>> collect('SD') >>> >>> ``` >>> >>> >>> Now I want to get the region'count by id, which is from the tblEvent.id, >>> sql is : >>> >>> >>> SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR >>> SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY >>> tblEvent.id >>> >>> >>> I expect the result of COUNT is always 3 for id = 1, no matter the id=1 >>> appears how many times. >>> >>> but the actual result is : 3, 6, 9, ... >>> >>> >>> I think this is bcz the state mechanism behind COUNT, how to turn this >>> off? >>> >>> Or what's the correct use for this? >>> StreamQueryConfig.maxIdleStateRetentionTime or something? >>> >>> >>> The reason not using state in flink: >>> http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups >>> >>> >>> >>> >> >> >> -- >> >> Benchao Li >> School of Electronics Engineering and Computer Science, Peking University >> Tel:+86-15650713730 >> Email: libenc...@gmail.com; libenc...@pku.edu.cn >> >> >> >> >> > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn > >