rkhachatryan commented on a change in pull request #10435: 
[FLINK-13955][runtime] migrate ContinuousFileReaderOperator to the mailbox 
execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r371395301
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ##########
 @@ -96,332 +222,274 @@ public void initializeState(StateInitializationContext 
context) throws Exception
                checkpointedState = 
context.getOperatorStateStore().getSerializableListState("splits");
 
                int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
-               if (context.isRestored()) {
-                       LOG.info("Restoring state for the {} (taskIdx={}).", 
getClass().getSimpleName(), subtaskIdx);
+               if (!context.isRestored()) {
+                       LOG.info("No state to restore for the {} 
(taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
+                       return;
+               }
 
+               if (splits != null) {
                        // this may not be null in case we migrate from a 
previous Flink version.
-                       if (restoredReaderState == null) {
-                               restoredReaderState = new ArrayList<>();
-                               for (TimestampedFileInputSplit split : 
checkpointedState.get()) {
-                                       restoredReaderState.add(split);
-                               }
+                       return;
+               }
 
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("{} (taskIdx={}) restored 
{}.", getClass().getSimpleName(), subtaskIdx, restoredReaderState);
-                               }
-                       }
-               } else {
-                       LOG.info("No state to restore for the {} 
(taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
+               LOG.info("Restoring state for the {} (taskIdx={}).", 
getClass().getSimpleName(), subtaskIdx);
+
+               splits = new PriorityQueue<>();
+               for (TimestampedFileInputSplit split : checkpointedState.get()) 
{
+                       splits.add(split);
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("{} (taskIdx={}) restored {}.", 
getClass().getSimpleName(), subtaskIdx, splits);
                }
        }
 
        @Override
        public void open() throws Exception {
                super.open();
 
-               checkState(this.reader == null, "The reader is already 
initialized.");
                checkState(this.serializer != null, "The serializer has not 
been set. " +
                        "Probably the setOutputType() was not called. Please 
report it.");
 
                this.format.setRuntimeContext(getRuntimeContext());
                this.format.configure(new Configuration());
-               this.checkpointLock = getContainingTask().getCheckpointLock();
-
-               // set the reader context based on the time characteristic
-               final TimeCharacteristic timeCharacteristic = 
getOperatorConfig().getTimeCharacteristic();
-               final long watermarkInterval = 
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
-               this.readerContext = StreamSourceContexts.getSourceContext(
-                       timeCharacteristic,
-                       getProcessingTimeService(),
-                       checkpointLock,
-                       getContainingTask().getStreamStatusMaintainer(),
-                       output,
-                       watermarkInterval,
-                       -1);
-
-               // and initialize the split reading thread
-               this.reader = new SplitReader<>(format, serializer, 
readerContext, checkpointLock, restoredReaderState);
-               this.restoredReaderState = null;
-               this.reader.start();
+
+               this.sourceContext = StreamSourceContexts.getSourceContext(
+                               getOperatorConfig().getTimeCharacteristic(),
+                               getProcessingTimeService(),
+                               new Object(), // no actual locking needed
+                               getContainingTask().getStreamStatusMaintainer(),
+                               output,
+                               
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
+                               -1);
+
+               this.reusedRecord = serializer.createInstance();
+               this.completedSplitsCounter = 
getMetricGroup().counter("numSplitsProcessed");
+               this.executor = 
getContainingTask().getMailboxExecutorFactory().createExecutor(getOperatorConfig().getChainIndex());
+               this.splits = this.splits == null ? new PriorityQueue<>() : 
this.splits;
+
+               if (!splits.isEmpty()) {
+                       enqueueMail();
+               }
        }
 
        @Override
        public void processElement(StreamRecord<TimestampedFileInputSplit> 
element) throws Exception {
-               reader.addSplit(element.getValue());
+               Preconditions.checkState(state.isAcceptingSplits());
+               splits.offer(element.getValue());
+               if (state == ReaderState.IDLE) {
+                       enqueueMail();
+               }
        }
 
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               // we do nothing because we emit our own watermarks if needed.
+       private void enqueueMail() {
+               Preconditions.checkState(!state.isTerminal(), "can't enqueue 
mail in terminal state %s", state);
+               executor.execute(this, "ContinuousFileReaderOperator");
+               if (state == ReaderState.IDLE) {
+                       switchState(ReaderState.OPENING);
+               }
        }
 
        @Override
-       public void dispose() throws Exception {
-               super.dispose();
-
-               // first try to cancel it properly and
-               // give it some time until it finishes
-               reader.cancel();
+       public void run() { // enqueued as Runnable into the mailbox (executor)
                try {
-                       reader.join(200);
-               } catch (InterruptedException e) {
-                       // we can ignore this
-               }
-
-               // if the above did not work, then interrupt the thread 
repeatedly
-               while (reader.isAlive()) {
-
-                       StringBuilder bld = new StringBuilder();
-                       StackTraceElement[] stack = reader.getStackTrace();
-                       for (StackTraceElement e : stack) {
-                               bld.append(e).append('\n');
-                       }
-                       LOG.warn("The reader is stuck in method:\n {}", 
bld.toString());
-
-                       reader.interrupt();
-                       try {
-                               reader.join(50);
-                       } catch (InterruptedException e) {
-                               // we can ignore this
-                       }
+                       processRecord();
+               } catch (Exception e) {
+                       switchState(ReaderState.CLOSED);
+                       cleanUp();
+                       throw new FlinkRuntimeException("unable to process 
record from file split", e);
                }
-               reader = null;
-               readerContext = null;
-               restoredReaderState = null;
-               format = null;
-               serializer = null;
        }
 
-       @Override
-       public void close() throws Exception {
-               super.close();
+       private void processRecord() throws IOException {
+               if (!state.prepareToProcessRecord(this)) {
+                       return;
+               }
 
-               waitSplitReaderFinished();
+               readAndCollectRecord();
 
-               output.close();
+               if (format.reachedEnd()) {
+                       onSplitProcessed();
+               } else {
+                       enqueueMail();
+               }
        }
 
-       private void waitSplitReaderFinished() throws InterruptedException {
-               // make sure that we hold the checkpointing lock
-               assert Thread.holdsLock(checkpointLock);
+       private void onSplitProcessed() throws IOException {
+               completedSplitsCounter.inc();
+               LOG.debug("split {} processed: {}", 
completedSplitsCounter.getCount(), currentSplit);
+               format.close();
+               currentSplit = null;
 
-               // close the reader to signal that no more splits will come. By 
doing this,
-               // the reader will exit as soon as it finishes processing the 
already pending splits.
-               // This method will wait until then. Further cleaning up is 
handled by the dispose().
+               if (splits.isEmpty()) {
+                       state.onNoMoreData(this);
+                       return;
+               }
 
-               while (reader != null && reader.isAlive() && 
reader.isRunning()) {
-                       reader.close();
-                       checkpointLock.wait();
+               if (state == ReaderState.READING) {
+                       switchState(ReaderState.OPENING);
                }
 
-               // finally if we are operating on event or ingestion time,
-               // emit the long-max watermark indicating the end of the stream,
-               // like a normal source would do.
+               enqueueMail();
+       }
 
-               if (readerContext != null) {
-                       readerContext.emitWatermark(Watermark.MAX_WATERMARK);
-                       readerContext.close();
-                       readerContext = null;
+       private void readAndCollectRecord() throws IOException {
+               Preconditions.checkState(state == ReaderState.READING || state 
== ReaderState.CLOSING, "can't process record in state %s", state);
+               if (format.reachedEnd()) {
+                       return;
+               }
+               OUT out = format.nextRecord(this.reusedRecord);
+               if (out != null) {
+                       sourceContext.collect(out);
                }
        }
 
-       private class SplitReader<OT> extends Thread {
-
-               private volatile boolean shouldClose;
-
-               private volatile boolean isRunning;
-
-               private final FileInputFormat<OT> format;
-               private final TypeSerializer<OT> serializer;
-
-               private final Object checkpointLock;
-               private final SourceFunction.SourceContext<OT> readerContext;
-
-               private final Queue<TimestampedFileInputSplit> pendingSplits;
-
-               private TimestampedFileInputSplit currentSplit;
-
-               private volatile boolean isSplitOpen;
-
-               private SplitReader(FileInputFormat<OT> format,
-                                       TypeSerializer<OT> serializer,
-                                       SourceFunction.SourceContext<OT> 
readerContext,
-                                       Object checkpointLock,
-                                       List<TimestampedFileInputSplit> 
restoredState) {
-
-                       this.format = checkNotNull(format, "Unspecified 
FileInputFormat.");
-                       this.serializer = checkNotNull(serializer, "Unspecified 
Serializer.");
-                       this.readerContext = checkNotNull(readerContext, 
"Unspecified Reader Context.");
-                       this.checkpointLock = checkNotNull(checkpointLock, 
"Unspecified checkpoint lock.");
-
-                       this.shouldClose = false;
-                       this.isRunning = true;
+       private void loadSplit(TimestampedFileInputSplit split) throws 
IOException {
+               Preconditions.checkState(state != ReaderState.READING && state 
!= ReaderState.CLOSED, "can't load split in state %s", state);
+               Preconditions.checkNotNull(split, "split is null");
+               LOG.debug("load split: {}", split);
+               currentSplit = split;
+               format.openInputFormat();
+               if (format instanceof CheckpointableInputFormat && 
currentSplit.getSplitState() != null) {
+                       // recovering after a node failure with an input
+                       // format that supports resetting the offset
+                       ((CheckpointableInputFormat<TimestampedFileInputSplit, 
Serializable>) format).
+                                       reopen(currentSplit, 
currentSplit.getSplitState());
+               } else {
+                       // we either have a new split, or we recovered from a 
node
+                       // failure but the input format does not support 
resetting the offset.
+                       format.open(currentSplit);
+               }
 
-                       this.pendingSplits = new PriorityQueue<>();
+               // reset the restored state to null for the next iteration
+               currentSplit.resetSplitState();
+       }
 
-                       // this is the case where a task recovers from a 
previous failed attempt
-                       if (restoredState != null) {
-                               this.pendingSplits.addAll(restoredState);
-                       }
+       private void switchState(ReaderState newState) {
+               if (state != newState) {
+                       Preconditions.checkState(state.canSwitchTo(newState), 
"can't switch state from terminal state %s to %s", state, newState);
+                       LOG.debug("switch state: {} -> {}", state, newState);
+                       state = newState;
                }
+       }
 
-               private void addSplit(TimestampedFileInputSplit split) {
-                       checkNotNull(split, "Cannot insert a null value in the 
pending splits queue.");
-                       synchronized (checkpointLock) {
-                               this.pendingSplits.add(split);
-                       }
-               }
+       @Override
+       public void processWatermark(Watermark mark) throws Exception {
+               // we do nothing because we emit our own watermarks if needed.
+       }
 
-               public boolean isRunning() {
-                       return this.isRunning;
-               }
+       @Override
+       public void dispose() throws Exception {
+               super.dispose();
+               checkpointedState = null;
+               completedSplitsCounter = null;
+               currentSplit = null;
+               executor = null;
+               format = null;
+               sourceContext = null;
+               reusedRecord = null;
+               serializer = null;
+               splits = null;
+       }
 
-               @Override
-               public void run() {
-                       try {
+       @Override
+       public void close() throws Exception {
+               LOG.debug("closing");
+               super.close();
 
-                               Counter completedSplitsCounter = 
getMetricGroup().counter("numSplitsProcessed");
-                               this.format.openInputFormat();
-
-                               while (this.isRunning) {
-
-                                       synchronized (checkpointLock) {
-
-                                               if (currentSplit == null) {
-                                                       currentSplit = 
this.pendingSplits.poll();
-
-                                                       // if the list of 
pending splits is empty (currentSplit == null) then:
-                                                       //   1) if close() was 
called on the operator then exit the while loop
-                                                       //   2) if not wait 50 
ms and try again to fetch a new split to read
-
-                                                       if (currentSplit == 
null) {
-                                                               if 
(this.shouldClose) {
-                                                                       
isRunning = false;
-                                                               } else {
-                                                                       
checkpointLock.wait(50);
-                                                               }
-                                                               continue;
-                                                       }
-                                               }
-
-                                               if (this.format instanceof 
CheckpointableInputFormat && currentSplit.getSplitState() != null) {
-                                                       // recovering after a 
node failure with an input
-                                                       // format that supports 
resetting the offset
-                                                       
((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) 
this.format).
-                                                               
reopen(currentSplit, currentSplit.getSplitState());
-                                               } else {
-                                                       // we either have a new 
split, or we recovered from a node
-                                                       // failure but the 
input format does not support resetting the offset.
-                                                       
this.format.open(currentSplit);
-                                               }
-
-                                               // reset the restored state to 
null for the next iteration
-                                               
this.currentSplit.resetSplitState();
-                                               this.isSplitOpen = true;
-                                       }
-
-                                       LOG.debug("Reading split: " + 
currentSplit);
-
-                                       try {
-                                               OT nextElement = 
serializer.createInstance();
-                                               while (!format.reachedEnd()) {
-                                                       synchronized 
(checkpointLock) {
-                                                               nextElement = 
format.nextRecord(nextElement);
-                                                               if (nextElement 
!= null) {
-                                                                       
readerContext.collect(nextElement);
-                                                               } else {
-                                                                       break;
-                                                               }
-                                                       }
-                                               }
-                                               completedSplitsCounter.inc();
-
-                                       } finally {
-                                               // close and prepare for the 
next iteration
-                                               synchronized (checkpointLock) {
-                                                       this.format.close();
-                                                       this.isSplitOpen = 
false;
-                                                       this.currentSplit = 
null;
-                                               }
-                                       }
+               switch (state) {
+                       case IDLE:
+                               switchState(ReaderState.CLOSED);
+                               break;
+                       case CLOSED:
+                               LOG.warn("operator is already closed, doing 
nothing");
+                               return;
+                       default:
+                               switchState(ReaderState.CLOSING);
+                               while (!state.isTerminal()) {
+                                       executor.yield();
                                }
+               }
 
-                       } catch (Throwable e) {
-
-                               
getContainingTask().handleAsyncException("Caught exception when processing 
split: " + currentSplit, e);
+               try {
+                       sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
+               } catch (Exception e) {
+                       LOG.warn("unable to emit watermark while closing", e);
+               }
 
-                       } finally {
-                               synchronized (checkpointLock) {
-                                       LOG.debug("Reader terminated, and 
exiting...");
+               cleanUp();
+       }
 
-                                       try {
-                                               this.format.closeInputFormat();
-                                       } catch (IOException e) {
-                                               
getContainingTask().handleAsyncException(
-                                                       "Caught exception from 
" + this.format.getClass().getName() + ".closeInputFormat() : " + 
e.getMessage(), e);
-                                       }
-                                       this.isSplitOpen = false;
-                                       this.currentSplit = null;
-                                       this.isRunning = false;
+       private void cleanUp() {
+               LOG.debug("cleanup, state={}", state);
 
-                                       checkpointLock.notifyAll();
-                               }
-                       }
-               }
+               RunnableWithException[] runClose = {
 
 Review comment:
   Do you mean it's shaded version?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to