Hi izual, IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem. The state is not managed in UDAF, it's managed by aggregation operator, and your UDAF's aggregator will be handled by operator using state.
izual <izual...@163.com> 于2020年4月27日周一 下午11:21写道: > Thanks, Benchao. > > Maybe change the dimension table will work, but this changes a lot, > include `size/count` is not the column of one dim table. > I notice that user can define Aggregate Functions[1], but this page also > said: > > Accumulators are automatically backup-ed by Flink’s checkpointing > mechanism and restored > So is it right to implement my own COUNT/SUM UDF? > > [1]. > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions > > > > > > > At 2020-04-27 17:32:14, "Benchao Li" <libenc...@gmail.com> wrote: > > Hi, > > There is indeed a state for the aggregation result, however we cannot > disable it, it's by design. > StreamQueryConfig.maxIdleStateRetentionTime can control how long the state > will be kept. > If you can ensure the time gap between two records of the same id larger > than, for example > 1 min, then setting retention time to 1min can resolve your issue. > If not, maybe you need to change your dimension table, making it return > the count directly instead > of return the details. > > izual <izual...@163.com> 于2020年4月27日周一 下午5:06写道: > >> I implements my DimTable by extends `LookupTableSource`[1], which stores >> data like: >> >> id=1 -> (SH, BJ, SD) >> >> id=2 -> (...) >> >> and then extends `TableFunction` to return the value corresponding to the >> lookup keys,and maybe return multi rows, for example, when lookupkeys is >> id=1, then in the `TableFunction.eval` >> >> ``` >> >> collect('SH') >> >> collect('BJ') >> >> collect('SD') >> >> ``` >> >> >> Now I want to get the region'count by id, which is from the tblEvent.id, >> sql is : >> >> >> SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR >> SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY >> tblEvent.id >> >> >> I expect the result of COUNT is always 3 for id = 1, no matter the id=1 >> appears how many times. >> >> but the actual result is : 3, 6, 9, ... >> >> >> I think this is bcz the state mechanism behind COUNT, how to turn this >> off? >> >> Or what's the correct use for this? >> StreamQueryConfig.maxIdleStateRetentionTime or something? >> >> >> The reason not using state in flink: >> http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups >> >> >> >> > > > -- > > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn > > > > > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn