Hello Jose, We've hit the same issue a couple of months ago. It is possible to write directly to files instead of creating directories, but it is not straightforward, and I haven't seen any clear demonstration in books, tutorials, etc.
We do something like: SparkConf sparkConf = new SparkConf().setAppName(appName); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(batchInterval)); JavaDStream<String> stream = MyModuleApp.initializeJob(ssc); MyModuleApp.process(stream); And then in the process method: @Override public void process(JavaDStream<String> inStream) { JavaDStream<String> json = inStream.map(new MyModuleWorker(jsonSchemaName, validatedJSONoutputDir, rejectedJSONoutputDir)); forceOutput(json); } This, in turn, calls the following (I've removed the irrelevant lines to focus on writing): public class MyModuleWorker implements Function<String,String> { public String call(String json) { // process the data and then write it writeJSON(json, validatedJSONoutputDir_); }} And the writeJSON method is: public static final void writeJSON(String json, String jsonDirPath) throws IOException { String jsonFileName = jsonDirPath + "/" + UUID.randomUUID().toString() + ".json.tmp"; URI uri = URI.create(jsonFileName); Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(uri, conf); FSDataOutputStream out = fileSystem.create(new Path(uri)); out.write(json.getBytes(StandardCharsets.UTF_8)); out.close(); fileSystem.rename(new Path(uri), new Path(URI.create(jsonDirPath + "/" + UUID.randomUUID().toString() + ".json"))); } Using a similar technique you might be able to achieve your objective. Kind regards, Emre Sevinç http://www.bigindustries.be/ On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez <jfernan...@sdl.com> wrote: > Hello folks, > > > > Our intended use case is: > > - Spark Streaming app #1 reads from RabbitMQ and output to HDFS > > - Spark Streaming app #2 reads #1’s output and stores the data > into Elasticsearch > > > > The idea behind this architecture is that if Elasticsearch is down due to > an upgrade or system error we don’t have to stop reading messages from the > queue. We could also scale each process separately as needed. > > > > After a few hours research my understanding is that Spark Streaming > outputs files in a *directory* for which you provide the prefix and suffix. > This is despite the ScalaDoc for DStream saveAsObjectFiles suggesting > otherwise: > > > > /** > > * Save each RDD in this DStream as a Sequence file of serialized > objects. > > * The file name at each batch interval is generated based on `prefix` > and > > * `suffix`: "prefix-TIME_IN_MS.suffix". > > */ > > > > Spark Streaming can monitor an HDFS directory for files but subfolders are > not supported. So as far as I can tell, it is not possible to use Spark > Streaming output as input for a different Spark Streaming app without > somehow performing a separate operation in the middle. > > > > Am I missing something obvious? I’ve read some suggestions like using > Hadoop to merge the directories (whose names I don’t see how you would > know) and to reduce the partitions to 1 (which wouldn’t help). > > > > Any other suggestions? What is the expected pattern a developer would > follow that would make Spark Streaming’s output format usable? > > > > > > www.sdl.com > <http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature> > > *SDL PLC confidential, all rights reserved.* If you are not the intended > recipient of this mail SDL requests and requires that you delete it without > acting upon or copying any of its contents, and we further request that you > advise us. > > SDL PLC is a public limited company registered in England and Wales. > Registered number: 02675207. > Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 > 7DY, UK. > > This message has been scanned for malware by Websense. www.websense.com > -- Emre Sevinc