Hello Jose,

We've hit the same issue a couple of months ago. It is possible to write
directly to files instead of creating directories, but it is not
straightforward, and I haven't seen any clear demonstration in books,
tutorials, etc.

We do something like:

SparkConf sparkConf = new SparkConf().setAppName(appName);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(batchInterval));
JavaDStream<String> stream = MyModuleApp.initializeJob(ssc);
MyModuleApp.process(stream);

And then in the process method:

@Override public void process(JavaDStream<String> inStream) {
JavaDStream<String> json = inStream.map(new
MyModuleWorker(jsonSchemaName, validatedJSONoutputDir,
rejectedJSONoutputDir));
    forceOutput(json);  }


This, in turn, calls the following (I've removed the irrelevant lines
to focus on writing):


public class MyModuleWorker implements Function<String,String> {

  public String call(String json) {
    // process the data and then write it

    writeJSON(json, validatedJSONoutputDir_);  }}

And the writeJSON method is:

public static final void writeJSON(String json, String jsonDirPath)
throws IOException {    String jsonFileName = jsonDirPath + "/" +
UUID.randomUUID().toString() + ".json.tmp";    URI uri =
URI.create(jsonFileName);    Configuration conf = new Configuration();
   FileSystem fileSystem = FileSystem.get(uri, conf);
FSDataOutputStream out = fileSystem.create(new Path(uri));
out.write(json.getBytes(StandardCharsets.UTF_8));    out.close();
fileSystem.rename(new Path(uri),            new
Path(URI.create(jsonDirPath + "/" + UUID.randomUUID().toString() +
".json")));  }


Using a similar technique you might be able to achieve your objective.

Kind regards,

Emre Sevinç
http://www.bigindustries.be/



On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez <jfernan...@sdl.com> wrote:

>  Hello folks,
>
>
>
> Our intended use case is:
>
> -          Spark Streaming app #1 reads from RabbitMQ and output to HDFS
>
> -          Spark Streaming app #2 reads #1’s output and stores the data
> into Elasticsearch
>
>
>
> The idea behind this architecture is that if Elasticsearch is down due to
> an upgrade or system error we don’t have to stop reading messages from the
> queue. We could also scale each process separately as needed.
>
>
>
> After a few hours research my understanding is that Spark Streaming
> outputs files in a *directory* for which you provide the prefix and suffix.
> This is despite the ScalaDoc for DStream saveAsObjectFiles suggesting
> otherwise:
>
>
>
>   /**
>
>    * Save each RDD in this DStream as a Sequence file of serialized
> objects.
>
>    * The file name at each batch interval is generated based on `prefix`
> and
>
>    * `suffix`: "prefix-TIME_IN_MS.suffix".
>
>    */
>
>
>
> Spark Streaming can monitor an HDFS directory for files but subfolders are
> not supported. So as far as I can tell, it is not possible to use Spark
> Streaming output as input for a different Spark Streaming app without
> somehow performing a separate operation in the middle.
>
>
>
> Am I missing something obvious? I’ve read some suggestions like using
> Hadoop to merge the directories (whose names I don’t see how you would
> know) and to reduce the partitions to 1 (which wouldn’t help).
>
>
>
> Any other suggestions? What is the expected pattern a developer would
> follow that would make Spark Streaming’s output format usable?
>
>
>
>
>
> www.sdl.com
> <http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
>
>  *SDL PLC confidential, all rights reserved.* If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
>
> This message has been scanned for malware by Websense. www.websense.com
>



-- 
Emre Sevinc

Reply via email to