Hello,

We work in a software development team at the UCSD CMS Tier2 Center. We would like to propose a mechanism to allow one to subclass the DFSInputStream in a clean way from an external package. First I'd like to give some motivation on why and then will proceed with the details.

We have a 3 Petabyte Hadoop cluster we maintain for the LHC experiment at CERN. There are other T2 centers worldwide that contain mirrors of the same data we host. We are working on an extension to Hadoop that, on reading a file, if it is found that there are no available replicas of a block, we use an external interface to retrieve this block of the file from another data center. The external interface is necessary because not all T2 centers involved in CMS are running a Hadoop cluster as their storage backend.

In order to implement this functionality, we need to subclass the DFSInputStream and override the read method, so we can catch IOExceptions that occur on client reads at the block level.

The basic steps required:
1. Invent a new URI scheme for the customized "FileSystem" in core-site.xml:
  <property>
    <name>fs.foofs.impl</name>
    <value>my.package.FooFileSystem</value>
    <description>My Extended FileSystem for foofs: uris.</description>
  </property>

2. Write new classes included in the external package that subclass the following:
FooFileSystem subclasses DistributedFileSystem
FooFSClient subclasses DFSClient
FooFSInputStream subclasses DFSInputStream

Now any client commands that explicitly use the foofs:// scheme in paths to access the hadoop cluster can open files with a customized InputStream that extends functionality of the default hadoop client DFSInputStream. In order to make this happen for our use case, we had to change some access modifiers in the DistributedFileSystem, DFSClient, and DFSInputStream classes provided by Hadoop. In addition, we had to comment out the check in the namenode code that only allows for URI schemes of the form "hdfs://".

Attached is a patch file we apply to hadoop. Note that we derived this patch by modding the Cloudera release hadoop-2.0.0-cdh4.1.1 which can be found at:
http://archive.cloudera.com/cdh4/cdh/4/hadoop-2.0.0-cdh4.1.1.tar.gz

We would greatly appreciate any advise on whether or not this approach sounds reasonable, and if you would consider accepting these modifications into the official Hadoop code base.

Thank you,
Jeff, Alja & Matevz
UCSD Physics
diff -ru 
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
--- 
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
        2013-08-05 13:04:11.376533999 -0700
+++ 
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 2013-08-05 13:04:11.444567994 -0700
@@ -504,7 +504,7 @@
     return dfsClientConf.connectToDnViaHostname;
   }
 
-  void checkOpen() throws IOException {
+  public void checkOpen() throws IOException {
     if (!clientRunning) {
       IOException result = new IOException("Filesystem closed");
       throw result;
@@ -2077,4 +2077,12 @@
   void disableShortCircuit() {
     shortCircuitLocalReads = false;
   }
+  
+  public Configuration getConfig() {
+    return conf;
+  }
+  
+  public FileSystem.Statistics getStats() {
+    return stats;
+  }
 }
diff -ru 
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
--- 
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
   2013-08-05 13:04:11.400545996 -0700
+++ 
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    2013-08-05 13:04:11.455573494 -0700
@@ -62,8 +62,8 @@
 public class DFSInputStream extends FSInputStream implements 
ByteBufferReadable {
   private final SocketCache socketCache;
 
-  private final DFSClient dfsClient;
-  private boolean closed = false;
+  protected final DFSClient dfsClient;
+  protected boolean closed = false;
 
   private final String src;
   private final long prefetchSize;
@@ -87,7 +87,7 @@
    * back to the namenode to get a new list of block locations, and is
    * capped at maxBlockAcquireFailures
    */
-  private int failures = 0;
+  protected int failures = 0;
   private final int timeWindow;
 
   /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
@@ -104,7 +104,7 @@
     deadNodes.put(dnInfo, dnInfo);
   }
   
-  DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean 
verifyChecksum
+  public DFSInputStream(DFSClient dfsClient, String src, int buffersize, 
boolean verifyChecksum
                  ) throws IOException, UnresolvedLinkException {
     this.dfsClient = dfsClient;
     this.verifyChecksum = verifyChecksum;
@@ -346,7 +346,7 @@
    * @return consequent segment of located blocks
    * @throws IOException
    */
-  private synchronized List<LocatedBlock> getBlockRange(long offset, 
+  protected synchronized List<LocatedBlock> getBlockRange(long offset, 
                                                         long length) 
                                                       throws IOException {
     // getFileLength(): returns total file length
@@ -766,7 +766,7 @@
     }
   } 
       
-  private void fetchBlockByteRange(LocatedBlock block, long start, long end,
+  protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
       byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
@@ -1030,7 +1030,7 @@
    * @param corruptedBlockMap, map of corrupted blocks
    * @param dataNodeCount, number of data nodes who contains the block replicas
    */
-  private void reportCheckSumFailure(
+  protected void reportCheckSumFailure(
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, 
       int dataNodeCount) {
     if (corruptedBlockMap.isEmpty()) {
diff -ru 
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
 
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
--- 
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    2013-08-05 13:04:11.426558996 -0700
+++ 
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
     2013-08-05 13:04:11.481586495 -0700
@@ -81,7 +81,7 @@
   private Path workingDir;
   private URI uri;
 
-  DFSClient dfs;
+  protected DFSClient dfs;
   private boolean verifyChecksum = true;
   
   static{
diff -ru 
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
 
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
--- 
orig/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
 2013-08-05 13:04:11.415553496 -0700
+++ 
new/src/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  2013-08-05 13:04:11.470580995 -0700
@@ -325,13 +325,15 @@
           "Invalid URI for NameNode address (check %s): %s has no authority.",
           FileSystem.FS_DEFAULT_NAME_KEY, filesystemURI.toString()));
     }
-    if (!HdfsConstants.HDFS_URI_SCHEME.equalsIgnoreCase(
+    // comment this out so we can use schemes other than hdfs!!
+    /* if (!HdfsConstants.HDFS_URI_SCHEME.equalsIgnoreCase(
         filesystemURI.getScheme())) {
       throw new IllegalArgumentException(String.format(
           "Invalid URI for NameNode address (check %s): %s is not of scheme 
'%s'.",
           FileSystem.FS_DEFAULT_NAME_KEY, filesystemURI.toString(),
           HdfsConstants.HDFS_URI_SCHEME));
     }
+    */
     return getAddress(authority);
   }
 

Reply via email to