Sxnan commented on PR #23521:
URL: https://github.com/apache/flink/pull/23521#issuecomment-1813700885

   > Hi @Sxnan! Sorry, I understand that I'm not a reviewer, but it happened 
that I was testing functionality from this MR recently, and I found a bug (in 
my opinion). It concerns the logic of `RecordAttributesValve`. There was very 
little backlog data in my test at source + it had parallelism 4 (actually, any 
value > 1 suits the case). Due to very short backlog phase, time interval 
between sending `RecordAttributes(isBacklog=true)` and 
`RecordAttributes(isBacklog=false)` was also very short. In addition, due to 
the high parallelism one source subtask could send 
`RecordAttributes(isBacklog=false)` even before 
`RecordAttributes(isBacklog=true) `of another subtask. As a result, race 
condition have occurred in `RecordAttributesValve#inputRecordAttributes`. 
`backlogChannelsCnt` was incrementing and decrementing simultaneously, which 
led to not reaching `numInputChannels`, so no RecordAttributes was emitted from 
RecordAttributesValve at all.
   > 
   > I suggest to have different counters for 
`RecordAttributes(isBacklog=true)` and for `RecordAttributes(isBacklog=false)`. 
Therefore, the race condition I mentioned earlier won't affect the result. 
Something like this:
   > 
   > ```
   >     if (recordAttributes.isBacklog()) {
   >         backlogChannelsCnt += 1;
   >         if (backlogChannelsCnt != numInputChannels) {
   >             return;
   >         }
   >         backlogChannelsCnt = 0;
   >     } else {
   >         nonBacklogChannelsCnt += 1;
   >         if (nonBacklogChannelsCnt != numInputChannels) {
   >             return;
   >         }
   >         nonBacklogChannelsCnt = 0;
   >     }
   > 
   >     if (lastOutputAttributes == null
   >             || lastOutputAttributes.isBacklog() != 
recordAttributes.isBacklog()) {
   >         if (lastOutputAttributes != null && 
!lastOutputAttributes.isBacklog()) {
   >             LOG.warn(
   >                     "Switching from non-backlog to backlog is currently 
not supported. Backlog status remains.");
   >             return;
   >         }
   >         lastOutputAttributes = recordAttributes;
   >         output.emitRecordAttributes(recordAttributes);
   >     }
   > ```
   > 
   > WDYT? Also, it is good to have a test for aforementioned case.
   
   Hi @Smir. Thanks for trying this out! The RecordAttributesValve combines the 
RecodAttributes from different input channels from the same input. The input is 
considered in backlog state if and only if all the input channels are backlog = 
true. Otherwise, some non-backlog records will be considered as backlog 
records. 
   
   Back to your case, where there is very little backlog data and it has 
parallelism greater than 1. It is possible that an input channel switches to 
non-backlog state before other input channels switch to backlog state.  When 
that happens, we just process those data as if they are non-backlog data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to