Bo Cui created FLINK-32017:
------------------------------
Summary: DISTINCT COUNT result is incorrect with ttl
Key: FLINK-32017
URL: https://issues.apache.org/jira/browse/FLINK-32017
Project: Flink
Issue Type: Bug
Affects Versions: 1.17.0, 1.15.0, 1.12.0, 1.18.0
Reporter: Bo Cui
SQL: SELECT COUNT(DISTINCT `c`) FROM Table1
and set ttl to 10s
and Flink will generate code:
{code:java}
public final class GroupAggsHandler$15 implements
org.apache.flink.table.runtime.generated.AggsHandleFunction {
long agg0_count;
boolean agg0_countIsNull;
private transient
org.apache.flink.table.runtime.typeutils.ExternalSerializer
externalSerializer$0;
private transient
org.apache.flink.table.runtime.typeutils.ExternalSerializer
externalSerializer$1;
private org.apache.flink.table.runtime.dataview.StateMapView
distinctAcc_0_dataview;
private org.apache.flink.table.data.binary.BinaryRawValueData
distinctAcc_0_dataview_raw_value;
private org.apache.flink.table.api.dataview.MapView distinct_view_0;
org.apache.flink.table.data.GenericRowData acc$3 = new
org.apache.flink.table.data.GenericRowData(2);
org.apache.flink.table.data.GenericRowData acc$5 = new
org.apache.flink.table.data.GenericRowData(2);
org.apache.flink.table.data.GenericRowData aggValue$14 = new
org.apache.flink.table.data.GenericRowData(1);
private org.apache.flink.table.runtime.dataview.StateDataViewStore
store;
public GroupAggsHandler$15(java.lang.Object[] references) throws
Exception {
externalSerializer$0 =
(((org.apache.flink.table.runtime.typeutils.ExternalSerializer) references[0]));
externalSerializer$1 =
(((org.apache.flink.table.runtime.typeutils.ExternalSerializer) references[1]));
}
private org.apache.flink.api.common.functions.RuntimeContext
getRuntimeContext() {
return store.getRuntimeContext();
}
@Override
public void
open(org.apache.flink.table.runtime.dataview.StateDataViewStore store) throws
Exception {
this.store = store;
distinctAcc_0_dataview =
(org.apache.flink.table.runtime.dataview.StateMapView)
store.getStateMapView("distinctAcc_0", true, externalSerializer$0,
externalSerializer$1);
distinctAcc_0_dataview_raw_value =
org.apache.flink.table.data.binary.BinaryRawValueData.fromObject(distinctAcc_0_dataview);
distinct_view_0 = distinctAcc_0_dataview;
}
@Override
public void accumulate(org.apache.flink.table.data.RowData accInput)
throws Exception {
int field$7;
boolean isNull$7;
boolean isNull$9;
long result$10;
isNull$7 = accInput.isNullAt(0);
field$7 = -1;
if (!isNull$7) {
field$7 = accInput.getInt(0);
}
java.lang.Integer distinctKey$8 = (java.lang.Integer) field$7;
if (isNull$7) {
distinctKey$8 = null;
}
java.lang.Long value$12 = (java.lang.Long)
distinct_view_0.get(distinctKey$8);
if (value$12 == null) {
value$12 = 0L;
}
boolean is_distinct_value_changed_0 = false;
long existed$13 = ((long) value$12) & (1L << 0);
if (existed$13 == 0) { // not existed
value$12 = ((long) value$12) | (1L << 0);
is_distinct_value_changed_0 = true;
long result$11 = -1L;
boolean isNull$11;
if (isNull$7) {
// --- Cast section generated by
org.apache.flink.table.planner.functions.casting.IdentityCastRule
// --- End cast section
isNull$11 = agg0_countIsNull;
if (!isNull$11) {
result$11 = agg0_count;
}
}
else {
isNull$9 = agg0_countIsNull || false;
result$10 = -1L;
if (!isNull$9) {
result$10 = (long) (agg0_count + ((long) 1L));
}
// --- Cast section generated by
org.apache.flink.table.planner.functions.casting.IdentityCastRule
// --- End cast section
isNull$11 = isNull$9;
if (!isNull$11) {
result$11 = result$10;
}
}
agg0_count = result$11;;
agg0_countIsNull = isNull$11;
}
if (is_distinct_value_changed_0) {
distinct_view_0.put(distinctKey$8, value$12);
}
}
@Override
public void setAccumulators(org.apache.flink.table.data.RowData acc)
throws Exception {
long field$6;
boolean isNull$6;
isNull$6 = acc.isNullAt(0);
field$6 = -1L;
if (!isNull$6) {
field$6 = acc.getLong(0);
}
distinct_view_0 = distinctAcc_0_dataview;
agg0_count = field$6;;
agg0_countIsNull = isNull$6;
}
}
{code}
and distinctAcc_0_dataview and GroupAggFunctionttl#accState are the 10s,
GroupAggFunctionttl#accState save `Count`, distinctAcc_0_dataview save
`distinct`.
Reproduction:
1) input 1 record, distinctAcc_0_dataview is updated to 1 and accState is
updated to 1
2) after 5 s, input same record , distinctAcc_0_dataview remains unchanged and
accState is updated to 1
3) after 11 s, distinctAcc_0_dataview expiration
4) after 12 s, input same record , distinctAcc_0_dataview is updated to 1 and
accState is updated to 2
--
This message was sent by Atlassian Jira
(v8.20.10#820010)