Hi, trying to figure out how to process files under the following directory structure:
<base_dir>/batch_dir/*.snappy I have a base directory and every so often a new batch gets dropped with a bunch of .snappy files as well as a bunch of .xml files. I'd like to only process the snappy files but can't figure out how to hook it up with Spark Streaming. I've tried this: ssc.textFileStream(“base_dir/batch_dir/*.snappy”) which doesn’t work, this, which does work but is not quite what I want since there are file formats I don’t care about ssc.textFileStream(“base_dir/batch_dir”) and this, which is closest to what I want (watch all batch folders for new snappy files): ssc.textFileStream(“base_dir//.snappy”) java.io.FileNotFoundException: File hdfs://cdh4-18164-nn/test//.snappy does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456) at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107) at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292) at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) The path itself definitely exists and is reachable both via the regular Spark context: sc.textFile(...) and via hadoop -ls. Any help on how to include wildcards for streaming or filter out unwanted extensions would be much appreciated.