rkhachatryan commented on a change in pull request #10435: 
[FLINK-13955][runtime] migrate ContinuousFileReaderOperator to the mailbox 
execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r371324585
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ##########
 @@ -54,39 +62,157 @@
  * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link 
ContinuousFileMonitoringFunction}
  * which has a parallelism of 1, this operator can have DOP > 1.
  *
- * <p>As soon as a split descriptor is received, it is put in a queue, and 
have another
- * thread read the actual data of the split. This architecture allows the 
separation of the
- * reading thread from the one emitting the checkpoint barriers, thus removing 
any potential
- * back-pressure.
+ * <p>This implementation uses {@link MailboxExecutor} to execute each action 
and states:<ol>
+ *     <li>start in {@link ReaderState#IDLE IDLE}</li>
+ *     <li>upon receiving a split add it to the queue, switch to {@link 
ReaderState#OPENING OPENING} and enqueue a
+ *     {@link org.apache.flink.streaming.runtime.tasks.mailbox.Mail mail} with 
self as {@link Runnable}</li>
+ *     <li>open file, switch to {@link ReaderState#READING READING}, read one 
record, re-enqueue self</li>
+ *     <li>if no more records or splits available, switch back to {@link 
ReaderState#IDLE IDLE}</li>
+ *     </ol>
+ *     On close:
+ *     <ol>
+ *     <li>if {@link ReaderState#IDLE IDLE} then close immediately</li>
+ *     <li>otherwise switch to {@link ReaderState#CLOSING CLOSING}, call 
{@link MailboxExecutor#yield() yield} in a loop
+ *     until state is {@link ReaderState#CLOSED CLOSED}</li>
+ *     <li>{@link MailboxExecutor#yield() yield()} causes remaining records 
(and splits) to be processed in the same way as above</li>
+ * </ol></p>
+ * <p>Using {@link MailboxExecutor} allows to avoid explicit synchronization. 
At most one mail should be enqueued at any
+ * given time.</p>
+ * <p>Using FSM approach allows to explicitly define states and enforce {@link 
ReaderState#TRANSITIONS transitions} between them.</p>
  */
 @Internal
 public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OUT>
-       implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, 
OutputTypeConfigurable<OUT> {
+       implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, 
OutputTypeConfigurable<OUT>, RunnableWithException {
 
        private static final long serialVersionUID = 1L;
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
 
-       private FileInputFormat<OUT> format;
-       private TypeSerializer<OUT> serializer;
+       private enum ReaderState {
+               IDLE {
+                       @Override
+                       public boolean 
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+                               LOG.warn("not processing any records in IDLE 
state");
+                               return false;
+                       }
+               },
+               /**
+                * A message is enqueued to process split, but no split is 
opened.
+                */
+               OPENING { // the split was added and message to itself was 
enqueued to process it
+                       @Override
+                       public boolean 
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws IOException {
 
 Review comment:
   Using action function and decoupling transitions from states has its 
advantages, but it also makes the logic less obvious (if I understood you 
right).
   Operator instance should be then passed to these transitions, so no big 
difference here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to