[ 
https://issues.apache.org/jira/browse/FLINK-3466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15379247#comment-15379247
 ] 

ASF GitHub Bot commented on FLINK-3466:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2252#discussion_r70956813
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 ---
    @@ -0,0 +1,223 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.runtime.tasks;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
    +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
    +import 
org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
    +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
    +import org.apache.flink.runtime.execution.ExecutionState;
    +import 
org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.filecache.FileCache;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.io.disk.iomanager.IOManager;
    +import org.apache.flink.runtime.io.network.NetworkEnvironment;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.memory.MemoryManager;
    +import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
    +import org.apache.flink.runtime.state.StateHandle;
    +import org.apache.flink.runtime.taskmanager.Task;
    +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.apache.flink.runtime.util.SerializableObject;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.streaming.api.graph.StreamConfig;
    +import org.apache.flink.streaming.api.operators.StreamSource;
    +import org.apache.flink.util.SerializedValue;
    +
    +import org.junit.Test;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.IOException;
    +import java.io.Serializable;
    +import java.net.URL;
    +import java.util.Collections;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.junit.Assert.*;
    +import static org.mockito.Mockito.*;
    +
    +/**
    + * This test checks that task restores that get stuck in the presence of 
interrupts
    + * are handled properly.
    + *
    + * In practice, reading from HDFS is interrupt sensitive: The HDFS code 
frequently deadlocks
    + * or livelocks if it is interrupted.
    + */
    +public class InterruptSensitiveRestoreTest {
    +
    +   private static final OneShotLatch IN_RESTORE_LATCH = new OneShotLatch();
    +
    +   @Test
    +   public void testRestoreWithInterrupt() throws Exception {
    +
    +           Configuration taskConfig = new Configuration();
    +           StreamConfig cfg = new StreamConfig(taskConfig);
    +           cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    +           cfg.setStreamOperator(new StreamSource<>(new TestSource()));
    +
    +           StateHandle<Serializable> lockingHandle = new 
InterruptLockingStateHandle();
    +           StreamTaskState opState = new StreamTaskState();
    +           opState.setFunctionState(lockingHandle);
    +           StreamTaskStateList taskState = new StreamTaskStateList(new 
StreamTaskState[] { opState });
    +
    +           TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(taskConfig, taskState);
    +           Task task = createTask(tdd);
    +
    +           // start the task and wait until it is in "restore"
    +           task.startTaskThread();
    +           IN_RESTORE_LATCH.await();
    +
    +           // trigger cancellation and signal to continue
    +           task.cancelExecution();
    +
    +           task.getExecutingThread().join(30000);
    --- End diff --
    
    I think it should not be a race. On interruption, the thread's 'interrupt' 
flag will be set and, upon entering `wait()`, it should immediately throw an 
`InterruptedException`.
    
    Also, the cancellation sends periodic interrupts.


> Job might get stuck in restoreState() from HDFS due to interrupt
> ----------------------------------------------------------------
>
>                 Key: FLINK-3466
>                 URL: https://issues.apache.org/jira/browse/FLINK-3466
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.0.0, 0.10.2
>            Reporter: Robert Metzger
>            Assignee: Stephan Ewen
>
> A user reported the following issue with a failing job:
> {code}
> 10:46:09,223 WARN  org.apache.flink.runtime.taskmanager.Task                  
>    - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck 
> in method:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1979)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
> java.io.DataInputStream.read(DataInputStream.java:149)
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
> java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162)
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> java.lang.Thread.run(Thread.java:745)
> {code}
> and 
> {code}
> 10:46:09,223 WARN  org.apache.flink.runtime.taskmanager.Task                  
>    - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck 
> in method:
> java.lang.Throwable.fillInStackTrace(Native Method)
> java.lang.Throwable.fillInStackTrace(Throwable.java:783)
> java.lang.Throwable.<init>(Throwable.java:250)
> java.lang.Exception.<init>(Exception.java:54)
> java.lang.InterruptedException.<init>(InterruptedException.java:57)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2038)
> org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
> java.io.DataInputStream.read(DataInputStream.java:149)
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
> java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162)
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> java.lang.Thread.run(Thread.java:745)
> {code}
> The issue is most likely that the HDFS client gets stuck in the 
> "org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read()" call when it 
> receives an interrupt.
> By putting the call into a separate thread, the TaskInterrupt would not break 
> the HadoopReadThread.
> The HadoopReadThread would stop eventually with an error or after the read 
> operation has finished.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to