Hi izual,

In such case, I think you should try COUNT DISTINCT instead of COUNT.
DISTINCT will help to deduplicate, so no matter how many times you received
id=1, the region count should always 3.

SELECT tblEvent.id, COUNT(DISTINCT tblDim.region) FROM tblEvent JOIN tblDim
FOR SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY
tblEvent.id

Best,
Jark


On Mon, 27 Apr 2020 at 23:41, Benchao Li <libenc...@gmail.com> wrote:

> Hi izual,
>
> IMO, implementing your own COUNT/SUM UDAF doesn't solve the problem.
> The state is not managed in UDAF, it's managed by aggregation operator,
> and
> your UDAF's aggregator will be handled by operator using state.
>
> izual <izual...@163.com> 于2020年4月27日周一 下午11:21写道:
>
>> Thanks, Benchao.
>>
>> Maybe change the dimension table will work, but this changes a lot,
>> include `size/count` is not the column of one dim table.
>> I notice that user can define Aggregate Functions[1],  but this page also
>> said:
>> > Accumulators are automatically backup-ed by Flink’s checkpointing
>> mechanism and restored
>> So is it right to implement my own COUNT/SUM UDF?
>>
>> [1].
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregation-functions
>>
>>
>>
>>
>>
>>
>> At 2020-04-27 17:32:14, "Benchao Li" <libenc...@gmail.com> wrote:
>>
>> Hi,
>>
>> There is indeed a state for the aggregation result, however we cannot
>> disable it, it's by design.
>> StreamQueryConfig.maxIdleStateRetentionTime can control how long the
>> state will be kept.
>> If you can ensure the time gap between two records of the same id larger
>> than, for example
>> 1 min, then setting retention time to 1min can resolve your issue.
>> If not, maybe you need to change your dimension table, making it return
>> the count directly instead
>> of return the details.
>>
>> izual <izual...@163.com> 于2020年4月27日周一 下午5:06写道:
>>
>>> I implements my DimTable by extends `LookupTableSource`[1], which stores
>>> data like:
>>>
>>> id=1 -> (SH, BJ, SD)
>>>
>>> id=2 -> (...)
>>>
>>> and then extends `TableFunction` to return the value corresponding to
>>> the lookup keys,and maybe return multi rows, for example, when lookupkeys
>>> is id=1, then in the `TableFunction.eval`
>>>
>>> ```
>>>
>>> collect('SH')
>>>
>>> collect('BJ')
>>>
>>> collect('SD')
>>>
>>> ```
>>>
>>>
>>> Now I want to get the region'count by id, which is from the tblEvent.id,
>>> sql is :
>>>
>>>
>>> SELECT tblEvent.id, COUNT(tblDim.region) FROM tblEvent JOIN tblDim FOR
>>> SYSTEM AS OF tblEvent.proctime ON tblEvent.id = tblDim.id GROUP BY
>>> tblEvent.id
>>>
>>>
>>> I expect the result of COUNT is always 3 for id = 1, no matter the id=1
>>> appears how many times.
>>>
>>> but the actual result is : 3, 6, 9, ...
>>>
>>>
>>> I think this is bcz the state mechanism behind COUNT, how to turn this
>>> off?
>>>
>>> Or what's the correct use for this?
>>> StreamQueryConfig.maxIdleStateRetentionTime or something?
>>>
>>>
>>> The reason not using state in flink:
>>> http://mail-archives.apache.org/mod_mbox/flink-dev/201901.mbox/%3cjira.13212450.1548753499000.193293.1548753540...@atlassian.jira%3E
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-for-lookups
>>>
>>>
>>>
>>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>
>>
>>
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>

Reply via email to