Hi izual,

IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
The state is not managed in UDAF, it's managed by aggregation operator, and
your UDAF's aggregator will be handled by operator using state.

izual <izual...@163.com> 于2020年4月27日周一 下午11:21写道:

> Thanks, Benchao.
>
> Maybe change the dimension table will work, but this changes a lot,
> include `size/count` is not the column of one dim table.
> I notice that user can define Aggregate Functions[1],  but this page also
> said:
> > Accumulators are automatically backup-ed by Flink’s checkpointing
> mechanism and restored
> So is it right to implement my own COUNT/SUM UDF?
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions
>
>
>
>
>
>
> At 2020-04-27 17:32:14, "Benchao Li" <libenc...@gmail.com> wrote:
>
> Hi,
>
> There is indeed a state for the aggregation result, however we cannot
> disable it, it's by design.
> StreamQueryConfig.maxIdleStateRetentionTime can control how long the state
> will be kept.
> If you can ensure the time gap between two records of the same id larger
> than, for example
> 1 min, then setting retention time to 1min can resolve your issue.
> If not, maybe you need to change your dimension table, making it return
> the count directly instead
> of return the details.
>
> izual <izual...@163.com> 于2020年4月27日周一 下午5:06写道:
>
>> I implements my DimTable by extends `LookupTableSource`[1], which stores
>> data like:
>>
>> id=1 -> (SH, BJ, SD)
>>
>> id=2 -> (...)
>>
>> and then extends `TableFunction` to return the value corresponding to the
>> lookup keys,and maybe return multi rows, for example, when lookupkeys is
>> id=1, then in the `TableFunction.eval`
>>
>> ```
>>
>> collect('SH')
>>
>> collect('BJ')
>>
>> collect('SD')
>>
>> ```
>>
>>
>> Now I want to get the region'count by id, which is from the tblEvent.id,
>> sql is :
>>
>>
>> SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR
>> SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY
>> tblEvent.id
>>
>>
>> I expect the result of COUNT is always 3 for id = 1, no matter the id=1
>> appears how many times.
>>
>> but the actual result is : 3, 6, 9, ...
>>
>>
>> I think this is bcz the state mechanism behind COUNT, how to turn this
>> off?
>>
>> Or what's the correct use for this?
>> StreamQueryConfig.maxIdleStateRetentionTime or something?
>>
>>
>> The reason not using state in flink:
>> http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups
>>
>>
>>
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn

Reply via email to