SmirAlex commented on PR #23521:
URL: https://github.com/apache/flink/pull/23521#issuecomment-1812134103

   Hi @Sxnan! Sorry, I understand that I'm not a reviewer, but it happened that 
I was testing functionality from this MR recently, and I found a bug (in my 
opinion). It concerns the logic of `RecordAttributesValve`.
   There was very little backlog data in my test  at source + it had 
parallelism 4 (actually, any value > 1 suits the case). Due to very short 
backlog phase, time interval between sending `RecordAttributes(isBacklog=true)` 
and `RecordAttributes(isBacklog=false)` was also very short. In addition, due 
to the high parallelism one source subtask could send 
`RecordAttributes(isBacklog=false)` even before 
`RecordAttributes(isBacklog=true) `of another subtask. As a result, race 
condition have occurred in `RecordAttributesValve#inputRecordAttributes`. 
`backlogChannelsCnt` was incrementing and decrementing simultaneously, which 
led to not reaching `numInputChannels`, so no RecordAttributes was emitted from 
RecordAttributesValve at all.
   
   I suggest to have different counters for `RecordAttributes(isBacklog=true)` 
and for `RecordAttributes(isBacklog=false)`. Therefore, the race condition I 
mentioned earlier won't affect the result. Something like this:
   ```
       if (recordAttributes.isBacklog()) {
           backlogChannelsCnt += 1;
           if (backlogChannelsCnt != numInputChannels) {
               return;
           }
           backlogChannelsCnt = 0;
       } else {
           nonBacklogChannelsCnt += 1;
           if (nonBacklogChannelsCnt != numInputChannels) {
               return;
           }
           nonBacklogChannelsCnt = 0;
       }
   
       if (lastOutputAttributes == null
               || lastOutputAttributes.isBacklog() != 
recordAttributes.isBacklog()) {
           if (lastOutputAttributes != null && 
!lastOutputAttributes.isBacklog()) {
               LOG.warn(
                       "Switching from non-backlog to backlog is currently not 
supported. Backlog status remains.");
               return;
           }
           lastOutputAttributes = recordAttributes;
           output.emitRecordAttributes(recordAttributes);
       }
   ```
   WDYT? Also, it is good to have a test for aforementioned case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to