Hi Steve,
Thank you very much for the reality check! Some more answers inline ...
On 8/8/13 1:30 PM, Steve Loughran wrote:
On 7 August 2013 10:59, Jeff Dost <jd...@ucsd.edu> wrote:
Hello,
We work in a software development team at the UCSD CMS Tier2 Center. We
would like to propose a mechanism to allow one to subclass the
DFSInputStream in a clean way from an external package. First I'd like to
give some motivation on why and then will proceed with the details.
We have a 3 Petabyte Hadoop cluster we maintain for the LHC experiment at
CERN. There are other T2 centers worldwide that contain mirrors of the
same data we host. We are working on an extension to Hadoop that, on
reading a file, if it is found that there are no available replicas of a
block, we use an external interface to retrieve this block of the file from
another data center. The external interface is necessary because not all
T2 centers involved in CMS are running a Hadoop cluster as their storage
backend.
You are relying on all these T2 site being HDFS clusters with exactly the
same block sizing of a named file, so that you can pick up a copy of a
block from elsewhere?
No, it's not just HDFS. But we have a common access method for all sites, XRootd
(http://xrootd.slac.stanford.edu/), and that's what our implementation of input
stream falls back to using.
Also, HDFS block sizes are not the same ... it's in the hands of T2 admins.
Why not just handle a failing FileNotFoundException on an open() and either
relay to another site or escalate to T1/T0/Castor? This would be far easier
to implement with a front end wrapper for any FileSystem. What you are
proposing seems far more brittle in both code and execution.
We already do fallback to xrootd on open failures from our application framework
... the job gets redirected to xrootd proxy which downloads the whole file and
serves data as the job asks for it. The changes described by Jeff are an
optimization for cases when a single block becomes unavailable.
We have a lot of data that is replicated on several sites and also available on
tape at Fermilab. Popularity of datasets (a couple 100 TB) varies quite a lot
and what we would like to achieve is to be able to reduce replication down to
one for files that not many people care for at the moment. This should give us
about 1 PB of diskspace on every T2 center and allow us to be more proactive
with data movement by simply observing the job queues.
In order to implement this functionality, we need to subclass the
DFSInputStream and override the read method, so we can catch IOExceptions
that occur on client reads at the block level.
Specific IOEs related to missing blocks, or any IOE -as that could swallow
other problems?
We were looking at that but couldn't quite figure out what else can go wrong ...
so the plan was to log all the exceptions and see what we can do about each of them.
Thanks also for the comments below, Jeff will appreciate them more as he
actually went through all the HDFS code and did all the changes in the patch, I
did the JNI + Xrootd part.
One more question ... if we'd converge on an acceptable solution (I find it a
bit hard at the moment), how long would it take for the changes to be released
and what release cycle would it end up in?
Best,
Matevz
The basic steps required:
1. Invent a new URI scheme for the customized "FileSystem" in
core-site.xml:
<property>
<name>fs.foofs.impl</name>
<value>my.package.**FooFileSystem</value>
<description>My Extended FileSystem for foofs: uris.</description>
</property>
Assuming you only used the FS driver everywhere, then you could just
overwrite the hdfs declaration
2. Write new classes included in the external package that subclass the
following:
FooFileSystem subclasses DistributedFileSystem
FooFSClient subclasses DFSClient
FooFSInputStream subclasses DFSInputStream
Now any client commands that explicitly use the foofs:// scheme in paths
to access the hadoop cluster can open files with a customized InputStream
that extends functionality of the default hadoop client DFSInputStream. In
order to make this happen for our use case, we had to change some access
modifiers in the DistributedFileSystem, DFSClient, and DFSInputStream
classes provided by Hadoop.
As well as marking such things as unstable/internal, its critical that such
things don't expose any sync problems (which bool closed may). Better to
use protected accessors.
In addition, we had to comment out the check in the namenode code that
only allows for URI schemes of the form "hdfs://".
That's there for a reason and filtering it would break. The more elegant
design would be for DistributedFileSystem to have a non-final
getSchemaName() method used in the validator; you'd just overwride it
Attached is a patch file we apply to hadoop.
Patches and reviews should be done via JIRA
Note that we derived this patch by modding the Cloudera release
hadoop-2.0.0-cdh4.1.1 which can be found at:
http://archive.cloudera.com/**cdh4/cdh/4/hadoop-2.0.0-cdh4.**1.1.tar.gz<http://archive.cloudera.com/cdh4/cdh/4/hadoop-2.0.0-cdh4.1.1.tar.gz>
And they have to be done against hadoop trunk in SVN, then potentially
backported to earlier ASF releases. Vendors are free to cherry pick
anything.
We would greatly appreciate any advise on whether or not this approach
sounds reasonable, and if you would consider accepting these modifications
into the official Hadoop code base.
The changes look somewhat minor, but there are issues above -everyone is
worried about stability of HDFS and anything that would change behaviour
(which the commenting out check is very much so) would be rejected.
For a patch to be considered
1. it must not change the semantics of existing APIs, here DFS client
code
2. it mustn't expose fields that could generate concurrency problems
3. It has to be considered the right approach
4. It has to come with tests
These are things that could be covered in the JIRA