pnowojski commented on code in PR #26051: URL: https://github.com/apache/flink/pull/26051#discussion_r1946598188
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java: ########## @@ -339,25 +362,39 @@ OneInputStreamOperator<RowData, RowData> createDeduplicateOperator() { } } else { if (isAsyncStateEnabled()) { - AsyncStateRowTimeDeduplicateFunction processFunction = - new AsyncStateRowTimeDeduplicateFunction( - rowTypeInfo, - stateRetentionTime, - rowtimeIndex, - generateUpdateBefore, - generateInsert(), - keepLastRow); - return new AsyncKeyedProcessOperator<>(processFunction); + if (!keepLastRow && outputInsertOnly) { + checkState(canBeInsertOnly(config, keepLastRow)); Review Comment: Generating plans and actually using those plans can happen quite far apart. Yes it doesn't make much sense to `checkState` this if we assume both generation and creating job graph happens with the same Flink version, but that's often not the case. With those `checkState` I wanted to add a bit of extra safety if in the future someones changes the code. Similarly how for example the dedupe operator itself makes assertions that the input is insert only. Also they don't hurt. But if you have stronger feelings, about dropping them I would be fine as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org