Bo Cui created FLINK-32017: ------------------------------ Summary: DISTINCT COUNT result is incorrect with ttl Key: FLINK-32017 URL: https://issues.apache.org/jira/browse/FLINK-32017 Project: Flink Issue Type: Bug Affects Versions: 1.17.0, 1.15.0, 1.12.0, 1.18.0 Reporter: Bo Cui
SQL: SELECT COUNT(DISTINCT `c`) FROM Table1 and set ttl to 10s and Flink will generate code: {code:java} public final class GroupAggsHandler$15 implements org.apache.flink.table.runtime.generated.AggsHandleFunction { long agg0_count; boolean agg0_countIsNull; private transient org.apache.flink.table.runtime.typeutils.ExternalSerializer externalSerializer$0; private transient org.apache.flink.table.runtime.typeutils.ExternalSerializer externalSerializer$1; private org.apache.flink.table.runtime.dataview.StateMapView distinctAcc_0_dataview; private org.apache.flink.table.data.binary.BinaryRawValueData distinctAcc_0_dataview_raw_value; private org.apache.flink.table.api.dataview.MapView distinct_view_0; org.apache.flink.table.data.GenericRowData acc$3 = new org.apache.flink.table.data.GenericRowData(2); org.apache.flink.table.data.GenericRowData acc$5 = new org.apache.flink.table.data.GenericRowData(2); org.apache.flink.table.data.GenericRowData aggValue$14 = new org.apache.flink.table.data.GenericRowData(1); private org.apache.flink.table.runtime.dataview.StateDataViewStore store; public GroupAggsHandler$15(java.lang.Object[] references) throws Exception { externalSerializer$0 = (((org.apache.flink.table.runtime.typeutils.ExternalSerializer) references[0])); externalSerializer$1 = (((org.apache.flink.table.runtime.typeutils.ExternalSerializer) references[1])); } private org.apache.flink.api.common.functions.RuntimeContext getRuntimeContext() { return store.getRuntimeContext(); } @Override public void open(org.apache.flink.table.runtime.dataview.StateDataViewStore store) throws Exception { this.store = store; distinctAcc_0_dataview = (org.apache.flink.table.runtime.dataview.StateMapView) store.getStateMapView("distinctAcc_0", true, externalSerializer$0, externalSerializer$1); distinctAcc_0_dataview_raw_value = org.apache.flink.table.data.binary.BinaryRawValueData.fromObject(distinctAcc_0_dataview); distinct_view_0 = distinctAcc_0_dataview; } @Override public void accumulate(org.apache.flink.table.data.RowData accInput) throws Exception { int field$7; boolean isNull$7; boolean isNull$9; long result$10; isNull$7 = accInput.isNullAt(0); field$7 = -1; if (!isNull$7) { field$7 = accInput.getInt(0); } java.lang.Integer distinctKey$8 = (java.lang.Integer) field$7; if (isNull$7) { distinctKey$8 = null; } java.lang.Long value$12 = (java.lang.Long) distinct_view_0.get(distinctKey$8); if (value$12 == null) { value$12 = 0L; } boolean is_distinct_value_changed_0 = false; long existed$13 = ((long) value$12) & (1L << 0); if (existed$13 == 0) { // not existed value$12 = ((long) value$12) | (1L << 0); is_distinct_value_changed_0 = true; long result$11 = -1L; boolean isNull$11; if (isNull$7) { // --- Cast section generated by org.apache.flink.table.planner.functions.casting.IdentityCastRule // --- End cast section isNull$11 = agg0_countIsNull; if (!isNull$11) { result$11 = agg0_count; } } else { isNull$9 = agg0_countIsNull || false; result$10 = -1L; if (!isNull$9) { result$10 = (long) (agg0_count + ((long) 1L)); } // --- Cast section generated by org.apache.flink.table.planner.functions.casting.IdentityCastRule // --- End cast section isNull$11 = isNull$9; if (!isNull$11) { result$11 = result$10; } } agg0_count = result$11;; agg0_countIsNull = isNull$11; } if (is_distinct_value_changed_0) { distinct_view_0.put(distinctKey$8, value$12); } } @Override public void setAccumulators(org.apache.flink.table.data.RowData acc) throws Exception { long field$6; boolean isNull$6; isNull$6 = acc.isNullAt(0); field$6 = -1L; if (!isNull$6) { field$6 = acc.getLong(0); } distinct_view_0 = distinctAcc_0_dataview; agg0_count = field$6;; agg0_countIsNull = isNull$6; } } {code} and distinctAcc_0_dataview and GroupAggFunctionttl#accState are the 10s, GroupAggFunctionttl#accState save `Count`, distinctAcc_0_dataview save `distinct`. Reproduction: 1) input 1 record, distinctAcc_0_dataview is updated to 1 and accState is updated to 1 2) after 5 s, input same record , distinctAcc_0_dataview remains unchanged and accState is updated to 1 3) after 11 s, distinctAcc_0_dataview expiration 4) after 12 s, input same record , distinctAcc_0_dataview is updated to 1 and accState is updated to 2 -- This message was sent by Atlassian Jira (v8.20.10#820010)