Hi Biao, Thanks for the answer!
So given the multi-threaded readers, now we have as open questions: 1) How do we let the checkpoints pass through our multi-threaded reader operator? 2) Do we have separate reader and source operators or not? In the strategy that has a separate source, the source operator has a parallelism of 1 and is responsible for split recovery only. For the first one, given also the constraints (blocking, finite queues, etc), I do not have an answer yet. For the 2nd, I think that we should go with separate operators for the source and the readers, for the following reasons: 1) This is more aligned with a potential future improvement where the split discovery becomes a responsibility of the JobManager and readers are pooling more work from the JM. 2) The source is going to be the "single point of truth". It will know what has been processed and what not. If the source and the readers are a single operator with parallelism > 1, or in general, if the split discovery is done by each task individually, then: i) we have to have a deterministic scheme for each reader to assign splits to itself (e.g. mod subtaskId). This is not necessarily trivial for all sources. ii) each reader would have to keep a copy of all its processed slpits iii) the state has to be a union state with a non-trivial merging logic in order to support rescaling. Two additional points that you raised above: i) The point that you raised that we need to keep all splits (processed and not-processed) I think is a bit of a strong requirement. This would imply that for infinite sources the state will grow indefinitely. This is problem is even more pronounced if we do not have a single source that assigns splits to readers, as each reader will have its own copy of the state. ii) it is true that for finite sources we need to somehow not close the readers when the source/split discoverer finishes. The ContinuousFileReaderOperator has a work-around for that. It is not elegant, and checkpoints are not emitted after closing the source, but this, I believe, is a bigger problem which requires more changes than just refactoring the source interface. Cheers, Kostas