Hexiaoqiao commented on code in PR #6926:
URL: https://github.com/apache/hadoop/pull/6926#discussion_r1730295351
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java:
##########
@@ -3067,69 +3087,70 @@ public void run() {
//
// Header info
//
- Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
- EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
- targetStorageTypes, targetStorageIds);
+ Token<BlockTokenIdentifier> accessToken =
+ getBlockAccessToken(target,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
+ targetStorageTypes, targetStorageIds);
- long writeTimeout = dnConf.socketWriteTimeout +
- HdfsConstants.WRITE_TIMEOUT_EXTENSION *
(targets.length-1);
+ long writeTimeout =
+ dnConf.socketWriteTimeout + HdfsConstants.WRITE_TIMEOUT_EXTENSION
* (targets.length
+ - 1);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
- DataEncryptionKeyFactory keyFactory =
- getDataEncryptionKeyFactoryForBlock(b);
- IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
- unbufIn, keyFactory, accessToken, bpReg);
+ DataEncryptionKeyFactory keyFactory =
getDataEncryptionKeyFactoryForBlock(source);
+ IOStreamPair saslStreams =
+ saslClient.socketSend(sock, unbufOut, unbufIn, keyFactory,
accessToken, bpReg);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
-
- out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- DFSUtilClient.getSmallBufferSize(getConf())));
+
+ out = new DataOutputStream(
+ new BufferedOutputStream(unbufOut,
DFSUtilClient.getSmallBufferSize(getConf())));
in = new DataInputStream(unbufIn);
- blockSender = new BlockSender(b, 0, b.getNumBytes(),
- false, false, true, DataNode.this, null, cachingStrategy);
- DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg)
- .build();
-
- String storageId = targetStorageIds.length > 0 ?
- targetStorageIds[0] : null;
- new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
- clientname, targets, targetStorageTypes, srcNode,
- stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
- false, false, null, storageId,
- targetStorageIds);
+ blockSender =
+ new BlockSender(source, 0, source.getNumBytes(), false, false,
true, DataNode.this,
+ null, cachingStrategy);
+ DatanodeInfo srcNode = new
DatanodeInfoBuilder().setNodeID(bpReg).build();
+
+ String storageId = targetStorageIds.length > 0 ? targetStorageIds[0] :
null;
+ new Sender(out).writeBlock(target, targetStorageTypes[0], accessToken,
clientname, targets,
+ targetStorageTypes, srcNode, stage, 0, 0, 0, 0,
blockSender.getChecksum(),
+ cachingStrategy, false, false, null, storageId, targetStorageIds);
// send data & checksum
blockSender.sendBlock(out, unbufOut, throttler);
// no response necessary
- LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}",
- getClass().getSimpleName(), DataNode.this.getDisplayName(),
- b, b.getNumBytes(), curTarget);
+ LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}",
getClass().getSimpleName(),
+ DataNode.this.getDisplayName(), source, source.getNumBytes(),
curTarget);
// read ack
if (isClient) {
- DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
- PBHelperClient.vintPrefixed(in));
+ DNTransferAckProto closeAck =
+ DNTransferAckProto.parseFrom(PBHelperClient.vintPrefixed(in));
LOG.debug("{}: close-ack={}", getClass().getSimpleName(), closeAck);
if (closeAck.getStatus() != Status.SUCCESS) {
if (closeAck.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
- "Got access token error for connect ack, targets="
- + Arrays.asList(targets));
+ "Got access token error for connect ack, targets=" +
Arrays.asList(targets));
} else {
- throw new IOException("Bad connect ack, targets="
- + Arrays.asList(targets) + " status=" +
closeAck.getStatus());
+ throw new IOException(
+ "Bad connect ack, targets=" + Arrays.asList(targets) + "
status="
+ + closeAck.getStatus());
}
}
} else {
metrics.incrBlocksReplicated();
}
} catch (IOException ie) {
- handleBadBlock(b, ie, false);
- LOG.warn("{}:Failed to transfer {} to {} got",
- bpReg, b, targets[0], ie);
+ if (copyBlockCrossNamespace) {
+ throw new RuntimeException(ie);
+ }
+ handleBadBlock(source, ie, false);
+ LOG.warn("{}:Failed to transfer {} to {} got", bpReg, source,
targets[0], ie);
} catch (Throwable t) {
- LOG.error("Failed to transfer block {}", b, t);
+ LOG.error("Failed to transfer block {}", source, t);
+ if (copyBlockCrossNamespace) {
+ throw new RuntimeException(t);
Review Comment:
As the above comment.
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java:
##########
@@ -3067,69 +3087,70 @@ public void run() {
//
// Header info
//
- Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b,
- EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
- targetStorageTypes, targetStorageIds);
+ Token<BlockTokenIdentifier> accessToken =
+ getBlockAccessToken(target,
EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
+ targetStorageTypes, targetStorageIds);
- long writeTimeout = dnConf.socketWriteTimeout +
- HdfsConstants.WRITE_TIMEOUT_EXTENSION *
(targets.length-1);
+ long writeTimeout =
+ dnConf.socketWriteTimeout + HdfsConstants.WRITE_TIMEOUT_EXTENSION
* (targets.length
+ - 1);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
- DataEncryptionKeyFactory keyFactory =
- getDataEncryptionKeyFactoryForBlock(b);
- IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
- unbufIn, keyFactory, accessToken, bpReg);
+ DataEncryptionKeyFactory keyFactory =
getDataEncryptionKeyFactoryForBlock(source);
+ IOStreamPair saslStreams =
+ saslClient.socketSend(sock, unbufOut, unbufIn, keyFactory,
accessToken, bpReg);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
-
- out = new DataOutputStream(new BufferedOutputStream(unbufOut,
- DFSUtilClient.getSmallBufferSize(getConf())));
+
+ out = new DataOutputStream(
+ new BufferedOutputStream(unbufOut,
DFSUtilClient.getSmallBufferSize(getConf())));
in = new DataInputStream(unbufIn);
- blockSender = new BlockSender(b, 0, b.getNumBytes(),
- false, false, true, DataNode.this, null, cachingStrategy);
- DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg)
- .build();
-
- String storageId = targetStorageIds.length > 0 ?
- targetStorageIds[0] : null;
- new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
- clientname, targets, targetStorageTypes, srcNode,
- stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
- false, false, null, storageId,
- targetStorageIds);
+ blockSender =
+ new BlockSender(source, 0, source.getNumBytes(), false, false,
true, DataNode.this,
+ null, cachingStrategy);
+ DatanodeInfo srcNode = new
DatanodeInfoBuilder().setNodeID(bpReg).build();
+
+ String storageId = targetStorageIds.length > 0 ? targetStorageIds[0] :
null;
+ new Sender(out).writeBlock(target, targetStorageTypes[0], accessToken,
clientname, targets,
+ targetStorageTypes, srcNode, stage, 0, 0, 0, 0,
blockSender.getChecksum(),
+ cachingStrategy, false, false, null, storageId, targetStorageIds);
// send data & checksum
blockSender.sendBlock(out, unbufOut, throttler);
// no response necessary
- LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}",
- getClass().getSimpleName(), DataNode.this.getDisplayName(),
- b, b.getNumBytes(), curTarget);
+ LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}",
getClass().getSimpleName(),
+ DataNode.this.getDisplayName(), source, source.getNumBytes(),
curTarget);
// read ack
if (isClient) {
- DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
- PBHelperClient.vintPrefixed(in));
+ DNTransferAckProto closeAck =
+ DNTransferAckProto.parseFrom(PBHelperClient.vintPrefixed(in));
LOG.debug("{}: close-ack={}", getClass().getSimpleName(), closeAck);
if (closeAck.getStatus() != Status.SUCCESS) {
if (closeAck.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
- "Got access token error for connect ack, targets="
- + Arrays.asList(targets));
+ "Got access token error for connect ack, targets=" +
Arrays.asList(targets));
} else {
- throw new IOException("Bad connect ack, targets="
- + Arrays.asList(targets) + " status=" +
closeAck.getStatus());
+ throw new IOException(
+ "Bad connect ack, targets=" + Arrays.asList(targets) + "
status="
+ + closeAck.getStatus());
}
}
} else {
metrics.incrBlocksReplicated();
}
} catch (IOException ie) {
- handleBadBlock(b, ie, false);
- LOG.warn("{}:Failed to transfer {} to {} got",
- bpReg, b, targets[0], ie);
+ if (copyBlockCrossNamespace) {
+ throw new RuntimeException(ie);
Review Comment:
Be careful to throw RuntimeException directly here.
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java:
##########
@@ -3863,5 +3864,27 @@ public void setLastDirScannerFinishTime(long time) {
public long getPendingAsyncDeletions() {
return asyncDiskService.countPendingDeletions();
}
+
+ @Override
+ public void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock dstBlock)
throws IOException {
+ BlockLocalPathInfo blpi = getBlockLocalPathInfo(srcBlock);
+ FsVolumeImpl v = getVolume(srcBlock);
+
+ try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
dstBlock.getBlockPoolId(),
Review Comment:
Not sure if it is enough to obtain only dest block pool lock here.
##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java:
##########
@@ -131,6 +131,7 @@ public class DFSOutputStream extends FSOutputSummer
private FileEncryptionInfo fileEncryptionInfo;
private int writePacketSize;
private boolean leaseRecovered = false;
+ private ExtendedBlock userAssignmentLastBlock;
Review Comment:
Sorry I don't get the purpose to add `userAssignmentLastBlock` here.
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java:
##########
@@ -3003,46 +3010,58 @@ private class DataTransfer implements Runnable {
final DatanodeInfo[] targets;
final StorageType[] targetStorageTypes;
final private String[] targetStorageIds;
- final ExtendedBlock b;
+ final private ExtendedBlock source;
+ private ExtendedBlock target;
final BlockConstructionStage stage;
final private DatanodeRegistration bpReg;
final String clientname;
final CachingStrategy cachingStrategy;
- /** Throttle to block replication when data transfers or writes. */
+ /**
+ * Throttle to block replication when data transfers or writes.
+ */
private DataTransferThrottler throttler;
+ private boolean copyBlockCrossNamespace;
Review Comment:
+ What this attribution describe here?
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java:
##########
@@ -136,6 +136,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys
{
"dfs.datanode.ec.reconstruct.write.bandwidthPerSec";
public static final long
DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT =
0; // A value of zero indicates no limit
+ public static final String
DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY =
Review Comment:
Is this one config item same as CopyOp time out and is it necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]