Hongshun Wang created FLINK-34129:
-------------------------------------
Summary: correct writing: MiniBatchGlobalGroupAggFunction will
make -D as +I then make +I as -U when state expired
Key: FLINK-34129
URL: https://issues.apache.org/jira/browse/FLINK-34129
Project: Flink
Issue Type: Improvement
Components: Table SQL / Runtime
Affects Versions: 1.18.1
Reporter: Hongshun Wang
Fix For: 1.19.0
Take sum for example:
When state is expired, then an update operation from source happens.
MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but will
emit +I[1, -20] and -D[1, -20]. The sink will detele the data from external
database.
Let's see why this will happens: # when state is expired and -U[1, 20] arrive,
MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set
firstRow as true.
{code:java}
if (stateAcc == null) {
stateAcc = globalAgg.createAccumulators();
firstRow = true;
}
{code}
# then sum accumulator will retract sum value as -20
# As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, then
emit to downstream.
{code:java}
if (!recordCounter.recordCountIsZero(acc)) {
// if this was not the first row and we have to emit retractions
if (!firstRow) {
// ignore
} else {
// update acc to state
accState.update(acc);
// this is the first, output new result
// prepare INSERT message for new row
resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
out.collect(resultRow);
} {code}
# when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, so
RetractionRecordCounter#recordCountIsZero will return true. Because firstRow =
false now, will change the +U as -D, then emit to downtream
{code:java}
if (!recordCounter.recordCountIsZero(acc)) {
// ignode
}else{
// we retracted the last record for this key
// if this is not first row sent out a DELETE message
if (!firstRow) {
// prepare DELETE message for previous row
resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
out.collect(resultRow);
}{code}
So the sink will receiver +I and -D after a source update operation, the data
will be delete.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)