dawidwys commented on pull request #19219:
URL: https://github.com/apache/flink/pull/19219#issuecomment-1084641245


   I second @pnowojski opinion that serializers should be considered not thread 
safe and used accordingly. We should aim that if we pass a serializer to 
another thread it should always be a duplicated version of the serializer and 
once we pass it, we should cede ownership to that thread.
   
   In this particular case I believe the problem is in 
`org.apache.flink.runtime.operators.BatchTask#initInputLocalStrategy:1010`. We 
pass a serializer to the `ExternalSorter` which spawns additional thread for 
reading/sorting... At the same time the implementation of 
`org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory#getSerializer`
 does not call `duplicate` the first time it is called. Thus we pass the 
original serializer to a new thread of the `ExternalSorter`. Later on in the 
`BatchTask#run` we call `Driver#prepare` which calls `duplicate()` which causes 
the `ConcurrentModificationException`.
   
   I'd suggest to fix the issue in `BatchTask` by calling the `duplicate()` 
explicitly:
   ```
       private void initInputLocalStrategy(int inputNum) throws Exception {
   ....
                   case SORT:
                       @SuppressWarnings({"rawtypes", "unchecked"})
                       Sorter<?> sorter =
                               ExternalSorter.newBuilder(
                                               getMemoryManager(),
                                               this,
                                               // we must duplicate the 
serializer as it will be used in a reading thread of the sorter
                                               
this.inputSerializers[inputNum].getSerializer().duplicate(),
                                               
getLocalStrategyComparator(inputNum))
   ....
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to