SmirAlex commented on PR #23521: URL: https://github.com/apache/flink/pull/23521#issuecomment-1812134103
Hi @Sxnan! Sorry, I understand that I'm not a reviewer, but it happened that I was testing functionality from this MR recently, and I found a bug (in my opinion). It concerns the logic of `RecordAttributesValve`. There was very little backlog data in my test at source + it had parallelism 4 (actually, any value > 1 suits the case). Due to very short backlog phase, time interval between sending `RecordAttributes(isBacklog=true)` and `RecordAttributes(isBacklog=false)` was also very short. In addition, due to the high parallelism one source subtask could send `RecordAttributes(isBacklog=false)` even before `RecordAttributes(isBacklog=true) `of another subtask. As a result, race condition have occurred in `RecordAttributesValve#inputRecordAttributes`. `backlogChannelsCnt` was incrementing and decrementing simultaneously, which led to not reaching `numInputChannels`, so no RecordAttributes was emitted from RecordAttributesValve at all. I suggest to have different counters for `RecordAttributes(isBacklog=true)` and for `RecordAttributes(isBacklog=false)`. Therefore, the race condition I mentioned earlier won't affect the result. Something like this: ``` if (recordAttributes.isBacklog()) { backlogChannelsCnt += 1; if (backlogChannelsCnt != numInputChannels) { return; } backlogChannelsCnt = 0; } else { nonBacklogChannelsCnt += 1; if (nonBacklogChannelsCnt != numInputChannels) { return; } nonBacklogChannelsCnt = 0; } if (lastOutputAttributes == null || lastOutputAttributes.isBacklog() != recordAttributes.isBacklog()) { if (lastOutputAttributes != null && !lastOutputAttributes.isBacklog()) { LOG.warn( "Switching from non-backlog to backlog is currently not supported. Backlog status remains."); return; } lastOutputAttributes = recordAttributes; output.emitRecordAttributes(recordAttributes); } ``` WDYT? Also, it is good to have a test for aforementioned case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org