Thanks for the advice folks, it is much appreciated. This seems like a pretty 
unfortunate design flaw. My team was surprised by it.

I’m going to drop the two-step process and do it all in a single step until we 
get Kafka online.

From: Sean Owen [mailto:so...@cloudera.com]
Sent: Wednesday, February 18, 2015 1:53 AM
To: Emre Sevinc
Cc: Jose Fernandez; user@spark.apache.org
Subject: Re: Spark Streaming output cannot be used as input?

To clarify, sometimes in the world of Hadoop people freely refer to an output 
'file' when it's really a directory containing 'part-*' files which are pieces 
of the file. It's imprecise but that's the meaning. I think the scaladoc may be 
referring to 'the path to the file, which includes this parent dir, is 
generated ...' In an inherently distributed system, you want to distributed 
writes and reads, so big "files" are really made of logical files within a 
directory.

There is a JIRA open to support nested dirs which has been languishing: 
https://issues.apache.org/jira/browse/SPARK-3586
I'm hoping to pursue that again with help from tdas after 1.3.
That's probably the best solution.

An alternative is to not use the file system as a sort of message queue, and 
instead use something like Kafka. It has a lot of other benefits but maybe it's 
not feasible to add this to your architecture.

You can merge the files with HDFS APIs without much trouble. The dirs will be 
named consistently according to time and are something you can also query for.

Making 1 partition has implications for parallelism of your job.

Emre, I think I see what you're getting at but you have the map + materialize 
pattern which i think doesn't have the right guarantees about re-execution. Why 
not foreachRDD?

Yes you can also consider collecting the whole RDD in foreachRDD and doing what 
you like, including writing to one file. But that would only work if the data 
is always small in each RDD.


 [http://www.sdl.com/Content/images/Innovate_2015_400.png] 
<www.sdl.com/innovate/sanfran>

SDL PLC confidential, all rights reserved. If you are not the intended 
recipient of this mail SDL requests and requires that you delete it without 
acting upon or copying any of its contents, and we further request that you 
advise us.

SDL PLC is a public limited company registered in England and Wales. Registered 
number: 02675207.
Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, 
UK.

On Wed, Feb 18, 2015 at 8:50 AM, Emre Sevinc 
<emre.sev...@gmail.com<mailto:emre.sev...@gmail.com>> wrote:
Hello Jose,
We've hit the same issue a couple of months ago. It is possible to write 
directly to files instead of creating directories, but it is not 
straightforward, and I haven't seen any clear demonstration in books, 
tutorials, etc.
We do something like:

SparkConf sparkConf = new SparkConf().setAppName(appName);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
Duration(batchInterval));
JavaDStream<String> stream = MyModuleApp.initializeJob(ssc);
MyModuleApp.process(stream);

And then in the process method:

@Override public void process(JavaDStream<String> inStream) {



    JavaDStream<String> json = inStream.map(new MyModuleWorker(jsonSchemaName, 
validatedJSONoutputDir, rejectedJSONoutputDir));

    forceOutput(json);

  }


This, in turn, calls the following (I've removed the irrelevant lines to focus 
on writing):


public class MyModuleWorker implements Function<String,String> {

  public String call(String json) {

    // process the data and then write it

    writeJSON(json, validatedJSONoutputDir_);

  }



}

And the writeJSON method is:

public static final void writeJSON(String json, String jsonDirPath) throws 
IOException {

    String jsonFileName = jsonDirPath + "/" + UUID.randomUUID().toString() + 
".json.tmp";

    URI uri = URI.create(jsonFileName);

    Configuration conf = new Configuration();

    FileSystem fileSystem = FileSystem.get(uri, conf);

    FSDataOutputStream out = fileSystem.create(new Path(uri));

    out.write(json.getBytes(StandardCharsets.UTF_8));

    out.close();



    fileSystem.rename(new Path(uri),

            new Path(URI.create(jsonDirPath + "/" + 
UUID.randomUUID().toString() + ".json")));



  }

Using a similar technique you might be able to achieve your objective.
Kind regards,
Emre Sevinç
http://www.bigindustries.be/


On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez 
<jfernan...@sdl.com<mailto:jfernan...@sdl.com>> wrote:
Hello folks,

Our intended use case is:

-          Spark Streaming app #1 reads from RabbitMQ and output to HDFS

-          Spark Streaming app #2 reads #1’s output and stores the data into 
Elasticsearch

The idea behind this architecture is that if Elasticsearch is down due to an 
upgrade or system error we don’t have to stop reading messages from the queue. 
We could also scale each process separately as needed.

After a few hours research my understanding is that Spark Streaming outputs 
files in a *directory* for which you provide the prefix and suffix. This is 
despite the ScalaDoc for DStream saveAsObjectFiles suggesting otherwise:

  /**
   * Save each RDD in this DStream as a Sequence file of serialized objects.
   * The file name at each batch interval is generated based on `prefix` and
   * `suffix`: "prefix-TIME_IN_MS.suffix".
   */

Spark Streaming can monitor an HDFS directory for files but subfolders are not 
supported. So as far as I can tell, it is not possible to use Spark Streaming 
output as input for a different Spark Streaming app without somehow performing 
a separate operation in the middle.

Am I missing something obvious? I’ve read some suggestions like using Hadoop to 
merge the directories (whose names I don’t see how you would know) and to 
reduce the partitions to 1 (which wouldn’t help).

Any other suggestions? What is the expected pattern a developer would follow 
that would make Spark Streaming’s output format usable?



www.sdl.com

SDL PLC confidential, all rights reserved. If you are not the intended 
recipient of this mail SDL requests and requires that you delete it without 
acting upon or copying any of its contents, and we further request that you 
advise us.

SDL PLC is a public limited company registered in England and Wales. Registered 
number: 02675207.
Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, 
UK. 
<http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>

This message has been scanned for malware by Websense. 
www.websense.com<http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>


<http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
-- 
<http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
Emre 
Sevinc<http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
 
<http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
 
<http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>

Click here to report this email as 
spam.<http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>

Reply via email to