rkhachatryan commented on a change in pull request #10435: 
[FLINK-13955][runtime] migrate ContinuousFileReaderOperator to the mailbox 
execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r371300913
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ##########
 @@ -96,332 +222,274 @@ public void initializeState(StateInitializationContext 
context) throws Exception
                checkpointedState = 
context.getOperatorStateStore().getSerializableListState("splits");
 
                int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
-               if (context.isRestored()) {
-                       LOG.info("Restoring state for the {} (taskIdx={}).", 
getClass().getSimpleName(), subtaskIdx);
+               if (!context.isRestored()) {
+                       LOG.info("No state to restore for the {} 
(taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
+                       return;
+               }
 
+               if (splits != null) {
                        // this may not be null in case we migrate from a 
previous Flink version.
-                       if (restoredReaderState == null) {
-                               restoredReaderState = new ArrayList<>();
-                               for (TimestampedFileInputSplit split : 
checkpointedState.get()) {
-                                       restoredReaderState.add(split);
-                               }
+                       return;
+               }
 
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("{} (taskIdx={}) restored 
{}.", getClass().getSimpleName(), subtaskIdx, restoredReaderState);
-                               }
-                       }
-               } else {
-                       LOG.info("No state to restore for the {} 
(taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
+               LOG.info("Restoring state for the {} (taskIdx={}).", 
getClass().getSimpleName(), subtaskIdx);
+
+               splits = new PriorityQueue<>();
+               for (TimestampedFileInputSplit split : checkpointedState.get()) 
{
+                       splits.add(split);
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("{} (taskIdx={}) restored {}.", 
getClass().getSimpleName(), subtaskIdx, splits);
                }
        }
 
        @Override
        public void open() throws Exception {
                super.open();
 
-               checkState(this.reader == null, "The reader is already 
initialized.");
                checkState(this.serializer != null, "The serializer has not 
been set. " +
                        "Probably the setOutputType() was not called. Please 
report it.");
 
                this.format.setRuntimeContext(getRuntimeContext());
                this.format.configure(new Configuration());
-               this.checkpointLock = getContainingTask().getCheckpointLock();
-
-               // set the reader context based on the time characteristic
-               final TimeCharacteristic timeCharacteristic = 
getOperatorConfig().getTimeCharacteristic();
-               final long watermarkInterval = 
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
-               this.readerContext = StreamSourceContexts.getSourceContext(
-                       timeCharacteristic,
-                       getProcessingTimeService(),
-                       checkpointLock,
-                       getContainingTask().getStreamStatusMaintainer(),
-                       output,
-                       watermarkInterval,
-                       -1);
-
-               // and initialize the split reading thread
-               this.reader = new SplitReader<>(format, serializer, 
readerContext, checkpointLock, restoredReaderState);
-               this.restoredReaderState = null;
-               this.reader.start();
+
+               this.sourceContext = StreamSourceContexts.getSourceContext(
+                               getOperatorConfig().getTimeCharacteristic(),
+                               getProcessingTimeService(),
+                               new Object(), // no actual locking needed
+                               getContainingTask().getStreamStatusMaintainer(),
+                               output,
+                               
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
+                               -1);
+
+               this.reusedRecord = serializer.createInstance();
+               this.completedSplitsCounter = 
getMetricGroup().counter("numSplitsProcessed");
+               this.executor = 
getContainingTask().getMailboxExecutorFactory().createExecutor(getOperatorConfig().getChainIndex());
+               this.splits = this.splits == null ? new PriorityQueue<>() : 
this.splits;
+
+               if (!splits.isEmpty()) {
+                       enqueueMail();
+               }
        }
 
        @Override
        public void processElement(StreamRecord<TimestampedFileInputSplit> 
element) throws Exception {
-               reader.addSplit(element.getValue());
+               Preconditions.checkState(state.isAcceptingSplits());
+               splits.offer(element.getValue());
+               if (state == ReaderState.IDLE) {
+                       enqueueMail();
+               }
        }
 
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               // we do nothing because we emit our own watermarks if needed.
+       private void enqueueMail() {
+               Preconditions.checkState(!state.isTerminal(), "can't enqueue 
mail in terminal state %s", state);
+               executor.execute(this, "ContinuousFileReaderOperator");
+               if (state == ReaderState.IDLE) {
+                       switchState(ReaderState.OPENING);
+               }
        }
 
        @Override
-       public void dispose() throws Exception {
-               super.dispose();
-
-               // first try to cancel it properly and
-               // give it some time until it finishes
-               reader.cancel();
+       public void run() { // enqueued as Runnable into the mailbox (executor)
                try {
-                       reader.join(200);
-               } catch (InterruptedException e) {
-                       // we can ignore this
-               }
-
-               // if the above did not work, then interrupt the thread 
repeatedly
-               while (reader.isAlive()) {
-
-                       StringBuilder bld = new StringBuilder();
-                       StackTraceElement[] stack = reader.getStackTrace();
-                       for (StackTraceElement e : stack) {
-                               bld.append(e).append('\n');
-                       }
-                       LOG.warn("The reader is stuck in method:\n {}", 
bld.toString());
-
-                       reader.interrupt();
-                       try {
-                               reader.join(50);
-                       } catch (InterruptedException e) {
-                               // we can ignore this
-                       }
+                       processRecord();
+               } catch (Exception e) {
 
 Review comment:
   Agree, except that `switchState(CLOSED)` should be still called to prevent 
further processing (even if currently it's guaranteed not to happen).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to