Hi Adam, Thanks for this great writeup of the issue. We (LinkedIn) also operate Observer NameNodes, and have observed the same issues, but have not yet gotten around to implementing a proper fix.
To add a bit of context from our side, there is at least one other place besides the committer v1 algorithm where this can occur, specifically around RDD checkpointing. In this situation, executors write out data to HDFS, then communicate their status back to the driver, which then tries to gather metadata about those files on HDFS (a listing operation). For the time being, we have worked around this by enabling auto-msync mode (as described in HDFS-14211 <https://issues.apache.org/jira/browse/HDFS-14211>) with dfs.client.failover.observer.auto-msync-period.<namespace>=0. We set this in our default configurations *on the driver only*, which helps to make sure we get most of the scalability benefits of the observer reads. We achieve this by putting the config as a System property in spark.driver.defaultJavaOptions. This can cause performance issues with operations which perform many metadata operations serially, but it's a tradeoff we find acceptable for now in terms of correctness vs. performance. Long-term, we believe adding appropriate msync() commands to Spark is the right way forward (the first option you mentioned). I think the documentation mentioned in your 4th option is a good short-term addition, but in the long run, targeted msync() operations will be a more performant fix that can work out-of-the-box. We can hide the calls behind reflection to mitigate concerns around compatibility if needed. There is interest from our side in pursuing this work, and certainly we would be happy to collaborate if there is interest from you or others as well. On Tue, Aug 17, 2021 at 9:40 AM Adam Binford <adam...@gmail.com> wrote: > Hi, > > We ran into an interesting issue that I wanted to share as well as get > thoughts on if anything should be done about this. We run our own Hadoop > cluster and recently deployed an Observer Namenode to take some burden off > of our Active Namenode. We mostly use Delta Lake as our format, and > everything seemed great. But when running some one-off analytics we ran > into an issue. Specifically, we did something like: > > "df.<do some analytic>.repartition(1).write.csv()" > > This is our quick way of creating a CSV we can download and do other > things with when our result is some small aggregation. However, we kept > getting an empty output directory (just a _SUCCESS file and nothing else), > even though in the Spark UI it says it wrote some positive number of rows. > Eventually traced it back to our update to use the > ObserverReadProxyProvider in our notebook sessions. I finally figured out > it was due to the "Maintaining Client Consistency" section talked about in > https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ObserverNameNode.html > . > > After setting the auto msync period to a low value, the writes started > working. I kept digging in and realized it's due to how the > FileOutputCommitter v1 algorithm works. During the commitJob phase, the > AM/driver does a file system listing on the output directory to find all > the finished task output files it needs to move to the top level output > directory. But since this is a read, the observer can serve this request, > but it can be out of date and not see the newly written files that just > finished from the executors. The auto msync fixed it because it forced the > driver to do an msync before the read took place. However, frequent auto > msyncs can defeat some of the performance benefits of the Observer. > > The v2 algorithm shouldn't have this issue because the tasks themselves > copy the output to the final directory when they finish, and the driver > simply adds the _SUCCESS at the end. And Hadoop's default is v2, but Spark > overrides that to use v1 by default because of potential correctness > issues, which is fair. While this is mostly an issue with Hadoop, the fact > that Spark defaults to the v1 algorithm makes it somewhat of a Spark > problem. Also, things like Delta Lake (or even regular structured streaming > output I think) shouldn't have issues because they are direct write with > transaction log based, so no file moving on the driver involved. > > So I mostly wanted to share that in case anyone else runs into this same > issue. But also wanted to get thoughts on if anything should be done about > this to prevent it from happening. Several ideas in no particular order: > > - Perform an msync during Spark's commitJob before calling the parent > commitJob. Since this is only available in newer APIs, probably isn't even > possible while maintaining compatibility with older Hadoop versions. > - Attempt to get an msync added upstream in Hadoop's v1 committer's > commitJob > - Attempt to detect the use of the ObserverReadProxyProvider and either > force using v2 committer on the spark side or just print out a warning that > you either need to use the v2 committer or you need to set the auto msync > period very low or 0 to guarantee correct output. > - Simply add something to the Spark docs somewhere about things to know > when using the ObserverReadProxyProvider > - Assume that if you are capable of creating your own Hadoop cluster with > an Observer Namenode you will recognize this limitation quickly, which it > only took me about an hour to figure out so that's also fair > > Thanks, > > -- > Adam >