Haoze Wu created HDFS-15925:
-------------------------------

             Summary: The lack of packet-level mirrorError state 
synchronization in BlockReceiver$PacketResponder can cause the HDFS client to 
hang
                 Key: HDFS-15925
                 URL: https://issues.apache.org/jira/browse/HDFS-15925
             Project: Hadoop HDFS
          Issue Type: Bug
          Components: datanode
    Affects Versions: 3.2.2
            Reporter: Haoze Wu


    When the datanode is receiving data block packets from a HDFS client and 
forwarding these packets to a mirror (another datanode) simultaneously, a 
single IOException in the datanode’s forwarding path can cause the client to 
get stuck for 1 min, without any logging. After 1 min, the client’s log shows a 
warning of EOFException and `Slow waitForAckedSeqno took 60106ms 
(threshold=30000ms)`.

    Normally the datanode will inform the client of this error state 
immediately, and then the client will resend the packets immediately. The whole 
process is very fast. After careful analyses, we find the above symptom is due 
to the lack of packet-level mirrorError state synchronization in 
BlockReceiver$PacketResponder: in some concurrency condition, the 
BlockReceiver$PacketResponder will hang for 1 min and then exit, without 
sending the error state to the client.

*Root Cause Analysis*

 
{code:java}
//hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

class BlockReceiver implements Closeable {
  // ...

  private void handleMirrorOutError(IOException ioe) throws IOException {
    // ...
    if (Thread.interrupted()) {
      throw ioe;
    } else { // encounter an error while writing to mirror
      // continue to run even if can not write to mirror
      // notify client of the error
      // and wait for the client to shut down the pipeline
      mirrorError = true;                                            // line 461
    }
  }

  private int receivePacket() throws IOException {
    // read the next packet
    packetReceiver.receiveNextPacket(in);                            // line 528
    // ...
    boolean lastPacketInBlock = header.isLastPacketInBlock();        // line 551
    //First write the packet to the mirror:
    if (mirrorOut != null && !mirrorError) {
      try {
        // ...
        packetReceiver.mirrorPacketTo(mirrorOut);                    // line 588
        // ...
      } catch (IOException e) {
        handleMirrorOutError(e);                                     // line 604
      }
    }
    // ...
    return lastPacketInBlock?-1:len;                                 // line 849
  }

  void receiveBlock(...) throws IOException {
    // ...
    try {
      if (isClient && !isTransfer) {
        responder = new Daemon(datanode.threadGroup, 
            new PacketResponder(replyOut, mirrIn, downstreams));
        responder.start();                                           // line 968
      }

      while(receivePacket() >= 0){/*Receive until the last packet*/} // line 971

      // wait for all outstanding packet responses. And then
      // indicate responder to gracefully shutdown.
      // Mark that responder has been closed for future processing
      if (responder != null) {
        ((PacketResponder)responder.getRunnable()).close();          // line 977
        responderClosed = true;
      }
      // ...
    } catch (IOException ioe) {                                      // line 
1003
      // ...
    } finally {
      // ...
      if (!responderClosed) { // Data transfer was not complete.
        if (responder != null) {
          // ...
          responder.interrupt();                                     // line 
1046
        }
        // ...
      }
      if (responder != null) {
        try {
          responder.interrupt();                                     // line 
1053
          // ...
        } catch (InterruptedException e) {
          responder.interrupt();                                     // line 
1067
          // ...
        }
        // ...
      }
    }
  }
}
{code}
    In the `BlockReceiver.receivePacket` method, if the datanode fails to 
forward the packet to the mirror ( (line 588) due to an IOException, it is 
handled by line 604, which sets the mirrorError flag in line 461. According to 
the comments, the BlockReceiver keeps going with the mirrorError state, and the 
client would be notified of the error.

 

    However, jstack shows that the datanode gets stuck in the `DataXceiver` 
thread (receiving data block packets from client) and the 
`BlockReceiver$PacketResponder` thread (replying ACK packets to client). In 
particular, the `DataXceiver` thread gets stuck in the loop in line 971, which 
is further caused by blocking in line 528, meaning that the `lastPacketInBlock` 
packet has not arrived, and no more packets are coming in.

 
{code:java}
//hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

class BlockReceiver implements Closeable {
  // ...

  class PacketResponder implements Runnable, Closeable {
    // ...

    public void run() {
      // ...
      while (isRunning() && !lastPacketInBlock) {
        // ...
        try {
          // ...
          PipelineAck ack = new PipelineAck();
          // ...
          try {
            if (... && !mirrorError) {                               // line 
1381
              // ...
              // read an ack from downstream datanode
              ack.readFields(downstreamIn);                          // line 
1384
              // ...
            }
            // ...
          } catch (InterruptedException ine) {
            isInterrupted = true;                                    // line 
1434
          } catch (IOException ioe) {
            if (Thread.interrupted()) {
              isInterrupted = true;                                  // line 
1437
            } else ...
          }

          if (Thread.interrupted() || isInterrupted) {               // line 
1458
            // ...
            LOG.info(myString + ": Thread is interrupted.");
            running = false;
            continue;                                                // line 
1472
          }
          // ...
          sendAckUpstream(ack, expected, totalAckTimeNanos,          // line 
1481
            (pkt != null ? pkt.offsetInBlock : 0),
            PipelineAck.combineHeader(datanode.getECN(), myStatus));
          // ...
        } catch (IOException e) {
          // ...
        } catch (Throwable e) {
          // ...
        }
      }
      LOG.info(myString + " terminating");
    }

    private void sendAckUpstream(...) throws IOException {
      try {
        // ...

        try {
          if (!running) return;
          sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,  // line 
1568
              offsetInBlock, myHeader);
        } finally {
          // ...
        }
      } catch (InterruptedException ie) {
        // ...
      }
    }

    private void sendAckUpstreamUnprotected(...) throws IOException {
      final int[] replies;
      if (ack == null) {
        // ...
        replies = new int[] { myHeader };
      } else if (mirrorError) { // ack read error
        int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
        int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
        replies = new int[] {h, h1};                                 // line 
1602
      } else {
        // ...
      }
      PipelineAck replyAck = new PipelineAck(seqno, replies,
          totalAckTimeNanos);
      // ...
      replyAck.write(upstreamOut);                                   // line 
1632
      // ...
    }

  }
}
{code}
    The `BlockReceiver$PacketResponder` thread checks the mirrorError flag in 
line 1381. The `DataXceiver` thread is run concurrently. If 
`BlockReceiver$PacketResponder` finds mirrorError is false, it will try to read 
the ACK packet from downstream (the mirror, another datanode) in line 1384, 
which is a blocking call.

    However, there is a race condition. If the mirrorError flag set by the 
`handleMirrorOutError` method is noticed in line 1381, then the 
`BlockReceiver$PacketResponder` thread will not run the blocking network I/O 
call in line 1384. Instead, it will go to line 1481, and then 1568, and then 
1632. According to the code around line 1602, this ACK contains `Status.ERROR` 
which can warn the client. On the contrary, if the mirrorError flag is set 
after the timing of line 1381, the `BlockReceiver$PacketResponder` thread gets 
blocked in line 1384. In our scenario, a data block packet is not sent to the 
mirror datanode due to the IOException, so the corresponding ACK packet will 
not be sent by the mirror datanode either. Therefore, the 
`BlockReceiver$PacketResponder` thread will be blocked here for a long time.

*Fix*

 

    The key is to avoid the problematic concurrency between 
`BlockReceiver#receivePacket` and the ACK packet (from downstream mirror 
datanode) reading in `BlockReceiver$PacketResponder`. The simplest way to do it 
is that, every time `BlockReceiver#receivePacket` successfully forwards a 
packet to the downstream mirror datanode, we grant one chance for 
`BlockReceiver$PacketResponder` to check the mirrorError state and read the ACK 
with the blocking I/O call. It is reasonable because if the datanode has not 
sent the packet, it is impossible for the `BlockReceiver$PacketResponder` to 
get the corresponding ACK.

    The implementation only needs a semaphore in 
`BlockReceiver$PacketResponder`, and will not affect the other components.

*Reproduction*

    Start HDFS with the default configuration. Then execute a client (we used 
the command `bin/hdfs dfs -copyFromLocal ./foo.txt /1.txt` in the terminal). 
For each data block packet the client sends to the datanode, the datanode 
forwards it by line 588 in `BlockReceiver.java` 
([https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java#L588]).
 Inject one single IOException there.

    Most of the time, we don't have the concurrency condition to trigger this 
bug. Now the reliable way we use to reproduce this bug is setting 
`dfs.blocksize` as `1m` in `hdfs-site.xml`. Then run `bin/hdfs dfs 
-copyFromLocal ./foo.txt /1.txt` where `./foo.txt` is a file of 15MB (generated 
from `fallocate -l 15000000 foo.txt`). Then do the aforementioned injection in 
the timing of the 12th occurrence of 
[https://github.com/apache/hadoop/blob/rel/release-3.2.2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java#L747].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-dev-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-dev-h...@hadoop.apache.org

Reply via email to