Hello,
We work in a software development team at the UCSD CMS Tier2 Center. We
would like to propose a mechanism to allow one to subclass the
DFSInputStream in a clean way from an external package. First I'd like
to give some motivation on why and then will proceed with the details.
We have a 3 Petabyte Hadoop cluster we maintain for the LHC experiment
at CERN. There are other T2 centers worldwide that contain mirrors of
the same data we host. We are working on an extension to Hadoop that,
on reading a file, if it is found that there are no available replicas
of a block, we use an external interface to retrieve this block of the
file from another data center. The external interface is necessary
because not all T2 centers involved in CMS are running a Hadoop cluster
as their storage backend.
In order to implement this functionality, we need to subclass the
DFSInputStream and override the read method, so we can catch
IOExceptions that occur on client reads at the block level.
The basic steps required:
1. Invent a new URI scheme for the customized "FileSystem" in core-site.xml:
<property>
<name>fs.foofs.impl</name>
<value>my.package.FooFileSystem</value>
<description>My Extended FileSystem for foofs: uris.</description>
</property>
2. Write new classes included in the external package that subclass the
following:
FooFileSystem subclasses DistributedFileSystem
FooFSClient subclasses DFSClient
FooFSInputStream subclasses DFSInputStream
Now any client commands that explicitly use the foofs:// scheme in paths
to access the hadoop cluster can open files with a customized
InputStream that extends functionality of the default hadoop client
DFSInputStream. In order to make this happen for our use case, we had
to change some access modifiers in the DistributedFileSystem, DFSClient,
and DFSInputStream classes provided by Hadoop. In addition, we had to
comment out the check in the namenode code that only allows for URI
schemes of the form "hdfs://".
Attached is a patch file we apply to hadoop. Note that we derived this
patch by modding the Cloudera release hadoop-2.0.0-cdh4.1.1 which can be
found at:
http://archive.cloudera.com/cdh4/cdh/4/hadoop-2.0.0-cdh4.1.1.tar.gz
We would greatly appreciate any advise on whether or not this approach
sounds reasonable, and if you would consider accepting these
modifications into the official Hadoop code base.
Thank you,
Jeff, Alja & Matevz
UCSD Physics
diff -ru
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
---
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
2013-08-05 13:04:11.376533999 -0700
+++
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
2013-08-05 13:04:11.444567994 -0700
@@ -504,7 +504,7 @@
return dfsClientConf.connectToDnViaHostname;
}
- void checkOpen() throws IOException {
+ public void checkOpen() throws IOException {
if (!clientRunning) {
IOException result = new IOException("Filesystem closed");
throw result;
@@ -2077,4 +2077,12 @@
void disableShortCircuit() {
shortCircuitLocalReads = false;
}
+
+ public Configuration getConfig() {
+ return conf;
+ }
+
+ public FileSystem.Statistics getStats() {
+ return stats;
+ }
}
diff -ru
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
---
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
2013-08-05 13:04:11.400545996 -0700
+++
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
2013-08-05 13:04:11.455573494 -0700
@@ -62,8 +62,8 @@
public class DFSInputStream extends FSInputStream implements
ByteBufferReadable {
private final SocketCache socketCache;
- private final DFSClient dfsClient;
- private boolean closed = false;
+ protected final DFSClient dfsClient;
+ protected boolean closed = false;
private final String src;
private final long prefetchSize;
@@ -87,7 +87,7 @@
* back to the namenode to get a new list of block locations, and is
* capped at maxBlockAcquireFailures
*/
- private int failures = 0;
+ protected int failures = 0;
private final int timeWindow;
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
@@ -104,7 +104,7 @@
deadNodes.put(dnInfo, dnInfo);
}
- DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean
verifyChecksum
+ public DFSInputStream(DFSClient dfsClient, String src, int buffersize,
boolean verifyChecksum
) throws IOException, UnresolvedLinkException {
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
@@ -346,7 +346,7 @@
* @return consequent segment of located blocks
* @throws IOException
*/
- private synchronized List<LocatedBlock> getBlockRange(long offset,
+ protected synchronized List<LocatedBlock> getBlockRange(long offset,
long length)
throws IOException {
// getFileLength(): returns total file length
@@ -766,7 +766,7 @@
}
}
- private void fetchBlockByteRange(LocatedBlock block, long start, long end,
+ protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
@@ -1030,7 +1030,7 @@
* @param corruptedBlockMap, map of corrupted blocks
* @param dataNodeCount, number of data nodes who contains the block replicas
*/
- private void reportCheckSumFailure(
+ protected void reportCheckSumFailure(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
int dataNodeCount) {
if (corruptedBlockMap.isEmpty()) {
diff -ru
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
---
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
2013-08-05 13:04:11.426558996 -0700
+++
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
2013-08-05 13:04:11.481586495 -0700
@@ -81,7 +81,7 @@
private Path workingDir;
private URI uri;
- DFSClient dfs;
+ protected DFSClient dfs;
private boolean verifyChecksum = true;
static{
diff -ru
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
---
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
2013-08-05 13:04:11.415553496 -0700
+++
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
2013-08-05 13:04:11.470580995 -0700
@@ -325,13 +325,15 @@
"Invalid URI for NameNode address (check %s): %s has no authority.",
FileSystem.FS_DEFAULT_NAME_KEY, filesystemURI.toString()));
}
- if (!HdfsConstants.HDFS_URI_SCHEME.equalsIgnoreCase(
+ // comment this out so we can use schemes other than hdfs!!
+ /* if (!HdfsConstants.HDFS_URI_SCHEME.equalsIgnoreCase(
filesystemURI.getScheme())) {
throw new IllegalArgumentException(String.format(
"Invalid URI for NameNode address (check %s): %s is not of scheme
'%s'.",
FileSystem.FS_DEFAULT_NAME_KEY, filesystemURI.toString(),
HdfsConstants.HDFS_URI_SCHEME));
}
+ */
return getAddress(authority);
}