Hello Jose,
We've hit the same issue a couple of months ago. It is possible to write
directly to files instead of creating directories, but it is not
straightforward, and I haven't seen any clear demonstration in books,
tutorials, etc.
We do something like:
SparkConf sparkConf = new SparkConf().setAppName(appName);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(batchInterval));
JavaDStream<String> stream = MyModuleApp.initializeJob(ssc);
MyModuleApp.process(stream);
And then in the process method:
@Override public void process(JavaDStream<String> inStream) {
JavaDStream<String> json = inStream.map(new
MyModuleWorker(jsonSchemaName, validatedJSONoutputDir,
rejectedJSONoutputDir));
forceOutput(json); }
This, in turn, calls the following (I've removed the irrelevant lines
to focus on writing):
public class MyModuleWorker implements Function<String,String> {
public String call(String json) {
// process the data and then write it
writeJSON(json, validatedJSONoutputDir_); }}
And the writeJSON method is:
public static final void writeJSON(String json, String jsonDirPath)
throws IOException { String jsonFileName = jsonDirPath + "/" +
UUID.randomUUID().toString() + ".json.tmp"; URI uri =
URI.create(jsonFileName); Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(uri, conf);
FSDataOutputStream out = fileSystem.create(new Path(uri));
out.write(json.getBytes(StandardCharsets.UTF_8)); out.close();
fileSystem.rename(new Path(uri), new
Path(URI.create(jsonDirPath + "/" + UUID.randomUUID().toString() +
".json"))); }
Using a similar technique you might be able to achieve your objective.
Kind regards,
Emre Sevinç
http://www.bigindustries.be/
On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez <[email protected]> wrote:
> Hello folks,
>
>
>
> Our intended use case is:
>
> - Spark Streaming app #1 reads from RabbitMQ and output to HDFS
>
> - Spark Streaming app #2 reads #1’s output and stores the data
> into Elasticsearch
>
>
>
> The idea behind this architecture is that if Elasticsearch is down due to
> an upgrade or system error we don’t have to stop reading messages from the
> queue. We could also scale each process separately as needed.
>
>
>
> After a few hours research my understanding is that Spark Streaming
> outputs files in a *directory* for which you provide the prefix and suffix.
> This is despite the ScalaDoc for DStream saveAsObjectFiles suggesting
> otherwise:
>
>
>
> /**
>
> * Save each RDD in this DStream as a Sequence file of serialized
> objects.
>
> * The file name at each batch interval is generated based on `prefix`
> and
>
> * `suffix`: "prefix-TIME_IN_MS.suffix".
>
> */
>
>
>
> Spark Streaming can monitor an HDFS directory for files but subfolders are
> not supported. So as far as I can tell, it is not possible to use Spark
> Streaming output as input for a different Spark Streaming app without
> somehow performing a separate operation in the middle.
>
>
>
> Am I missing something obvious? I’ve read some suggestions like using
> Hadoop to merge the directories (whose names I don’t see how you would
> know) and to reduce the partitions to 1 (which wouldn’t help).
>
>
>
> Any other suggestions? What is the expected pattern a developer would
> follow that would make Spark Streaming’s output format usable?
>
>
>
>
>
> www.sdl.com
> <http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
>
> *SDL PLC confidential, all rights reserved.* If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
>
> This message has been scanned for malware by Websense. www.websense.com
>
--
Emre Sevinc