[ https://issues.apache.org/jira/browse/FLINK-3466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15379247#comment-15379247 ]
ASF GitHub Bot commented on FLINK-3466: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2252#discussion_r70956813 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.util.SerializedValue; + +import org.junit.Test; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.io.Serializable; +import java.net.URL; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +/** + * This test checks that task restores that get stuck in the presence of interrupts + * are handled properly. + * + * In practice, reading from HDFS is interrupt sensitive: The HDFS code frequently deadlocks + * or livelocks if it is interrupted. + */ +public class InterruptSensitiveRestoreTest { + + private static final OneShotLatch IN_RESTORE_LATCH = new OneShotLatch(); + + @Test + public void testRestoreWithInterrupt() throws Exception { + + Configuration taskConfig = new Configuration(); + StreamConfig cfg = new StreamConfig(taskConfig); + cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + cfg.setStreamOperator(new StreamSource<>(new TestSource())); + + StateHandle<Serializable> lockingHandle = new InterruptLockingStateHandle(); + StreamTaskState opState = new StreamTaskState(); + opState.setFunctionState(lockingHandle); + StreamTaskStateList taskState = new StreamTaskStateList(new StreamTaskState[] { opState }); + + TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(taskConfig, taskState); + Task task = createTask(tdd); + + // start the task and wait until it is in "restore" + task.startTaskThread(); + IN_RESTORE_LATCH.await(); + + // trigger cancellation and signal to continue + task.cancelExecution(); + + task.getExecutingThread().join(30000); --- End diff -- I think it should not be a race. On interruption, the thread's 'interrupt' flag will be set and, upon entering `wait()`, it should immediately throw an `InterruptedException`. Also, the cancellation sends periodic interrupts. > Job might get stuck in restoreState() from HDFS due to interrupt > ---------------------------------------------------------------- > > Key: FLINK-3466 > URL: https://issues.apache.org/jira/browse/FLINK-3466 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.0.0, 0.10.2 > Reporter: Robert Metzger > Assignee: Stephan Ewen > > A user reported the following issue with a failing job: > {code} > 10:46:09,223 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck > in method: > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1979) > org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255) > org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016) > org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717) > org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421) > org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332) > org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576) > org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800) > org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848) > java.io.DataInputStream.read(DataInputStream.java:149) > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69) > java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310) > java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323) > java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794) > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801) > java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55) > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52) > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35) > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162) > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > java.lang.Thread.run(Thread.java:745) > {code} > and > {code} > 10:46:09,223 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck > in method: > java.lang.Throwable.fillInStackTrace(Native Method) > java.lang.Throwable.fillInStackTrace(Throwable.java:783) > java.lang.Throwable.<init>(Throwable.java:250) > java.lang.Exception.<init>(Exception.java:54) > java.lang.InterruptedException.<init>(InterruptedException.java:57) > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2038) > org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325) > org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266) > org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016) > org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717) > org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421) > org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332) > org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576) > org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800) > org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848) > java.io.DataInputStream.read(DataInputStream.java:149) > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69) > java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310) > java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323) > java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794) > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801) > java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55) > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52) > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35) > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162) > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > java.lang.Thread.run(Thread.java:745) > {code} > The issue is most likely that the HDFS client gets stuck in the > "org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read()" call when it > receives an interrupt. > By putting the call into a separate thread, the TaskInterrupt would not break > the HadoopReadThread. > The HadoopReadThread would stop eventually with an error or after the read > operation has finished. -- This message was sent by Atlassian JIRA (v6.3.4#6332)