Sxnan commented on PR #23521: URL: https://github.com/apache/flink/pull/23521#issuecomment-1813700885
> Hi @Sxnan! Sorry, I understand that I'm not a reviewer, but it happened that I was testing functionality from this MR recently, and I found a bug (in my opinion). It concerns the logic of `RecordAttributesValve`. There was very little backlog data in my test at source + it had parallelism 4 (actually, any value > 1 suits the case). Due to very short backlog phase, time interval between sending `RecordAttributes(isBacklog=true)` and `RecordAttributes(isBacklog=false)` was also very short. In addition, due to the high parallelism one source subtask could send `RecordAttributes(isBacklog=false)` even before `RecordAttributes(isBacklog=true) `of another subtask. As a result, race condition have occurred in `RecordAttributesValve#inputRecordAttributes`. `backlogChannelsCnt` was incrementing and decrementing simultaneously, which led to not reaching `numInputChannels`, so no RecordAttributes was emitted from RecordAttributesValve at all. > > I suggest to have different counters for `RecordAttributes(isBacklog=true)` and for `RecordAttributes(isBacklog=false)`. Therefore, the race condition I mentioned earlier won't affect the result. Something like this: > > ``` > if (recordAttributes.isBacklog()) { > backlogChannelsCnt += 1; > if (backlogChannelsCnt != numInputChannels) { > return; > } > backlogChannelsCnt = 0; > } else { > nonBacklogChannelsCnt += 1; > if (nonBacklogChannelsCnt != numInputChannels) { > return; > } > nonBacklogChannelsCnt = 0; > } > > if (lastOutputAttributes == null > || lastOutputAttributes.isBacklog() != recordAttributes.isBacklog()) { > if (lastOutputAttributes != null && !lastOutputAttributes.isBacklog()) { > LOG.warn( > "Switching from non-backlog to backlog is currently not supported. Backlog status remains."); > return; > } > lastOutputAttributes = recordAttributes; > output.emitRecordAttributes(recordAttributes); > } > ``` > > WDYT? Also, it is good to have a test for aforementioned case. Hi @Smir. Thanks for trying this out! The RecordAttributesValve combines the RecodAttributes from different input channels from the same input. The input is considered in backlog state if and only if all the input channels are backlog = true. Otherwise, some non-backlog records will be considered as backlog records. Back to your case, where there is very little backlog data and it has parallelism greater than 1. It is possible that an input channel switches to non-backlog state before other input channels switch to backlog state. When that happens, we just process those data as if they are non-backlog data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org