To clarify, sometimes in the world of Hadoop people freely refer to an
output 'file' when it's really a directory containing 'part-*' files which
are pieces of the file. It's imprecise but that's the meaning. I think the
scaladoc may be referring to 'the path to the file, which includes this
parent dir, is generated ...' In an inherently distributed system, you want
to distributed writes and reads, so big "files" are really made of logical
files within a directory.

There is a JIRA open to support nested dirs which has been languishing:
https://issues.apache.org/jira/browse/SPARK-3586
I'm hoping to pursue that again with help from tdas after 1.3.
That's probably the best solution.

An alternative is to not use the file system as a sort of message queue,
and instead use something like Kafka. It has a lot of other benefits but
maybe it's not feasible to add this to your architecture.

You can merge the files with HDFS APIs without much trouble. The dirs will
be named consistently according to time and are something you can also
query for.

Making 1 partition has implications for parallelism of your job.

Emre, I think I see what you're getting at but you have the map +
materialize pattern which i think doesn't have the right guarantees about
re-execution. Why not foreachRDD?

Yes you can also consider collecting the whole RDD in foreachRDD and doing
what you like, including writing to one file. But that would only work if
the data is always small in each RDD.

On Wed, Feb 18, 2015 at 8:50 AM, Emre Sevinc <emre.sev...@gmail.com> wrote:

> Hello Jose,
>
> We've hit the same issue a couple of months ago. It is possible to write
> directly to files instead of creating directories, but it is not
> straightforward, and I haven't seen any clear demonstration in books,
> tutorials, etc.
>
> We do something like:
>
> SparkConf sparkConf = new SparkConf().setAppName(appName);
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
> Duration(batchInterval));
> JavaDStream<String> stream = MyModuleApp.initializeJob(ssc);
> MyModuleApp.process(stream);
>
> And then in the process method:
>
> @Override public void process(JavaDStream<String> inStream) {    
> JavaDStream<String> json = inStream.map(new MyModuleWorker(jsonSchemaName, 
> validatedJSONoutputDir, rejectedJSONoutputDir));
>     forceOutput(json);  }
>
>
> This, in turn, calls the following (I've removed the irrelevant lines to 
> focus on writing):
>
>
> public class MyModuleWorker implements Function<String,String> {
>
>   public String call(String json) {
>     // process the data and then write it
>
>     writeJSON(json, validatedJSONoutputDir_);  }}
>
> And the writeJSON method is:
>
> public static final void writeJSON(String json, String jsonDirPath) throws 
> IOException {    String jsonFileName = jsonDirPath + "/" + 
> UUID.randomUUID().toString() + ".json.tmp";    URI uri = 
> URI.create(jsonFileName);    Configuration conf = new Configuration();    
> FileSystem fileSystem = FileSystem.get(uri, conf);    FSDataOutputStream out 
> = fileSystem.create(new Path(uri));    
> out.write(json.getBytes(StandardCharsets.UTF_8));    out.close();    
> fileSystem.rename(new Path(uri),            new Path(URI.create(jsonDirPath + 
> "/" + UUID.randomUUID().toString() + ".json")));  }
>
>
> Using a similar technique you might be able to achieve your objective.
>
> Kind regards,
>
> Emre Sevinç
> http://www.bigindustries.be/
>
>
>
> On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez <jfernan...@sdl.com>
> wrote:
>
>>  Hello folks,
>>
>>
>>
>> Our intended use case is:
>>
>> -          Spark Streaming app #1 reads from RabbitMQ and output to HDFS
>>
>> -          Spark Streaming app #2 reads #1’s output and stores the data
>> into Elasticsearch
>>
>>
>>
>> The idea behind this architecture is that if Elasticsearch is down due to
>> an upgrade or system error we don’t have to stop reading messages from the
>> queue. We could also scale each process separately as needed.
>>
>>
>>
>> After a few hours research my understanding is that Spark Streaming
>> outputs files in a *directory* for which you provide the prefix and suffix.
>> This is despite the ScalaDoc for DStream saveAsObjectFiles suggesting
>> otherwise:
>>
>>
>>
>>   /**
>>
>>    * Save each RDD in this DStream as a Sequence file of serialized
>> objects.
>>
>>    * The file name at each batch interval is generated based on `prefix`
>> and
>>
>>    * `suffix`: "prefix-TIME_IN_MS.suffix".
>>
>>    */
>>
>>
>>
>> Spark Streaming can monitor an HDFS directory for files but subfolders
>> are not supported. So as far as I can tell, it is not possible to use Spark
>> Streaming output as input for a different Spark Streaming app without
>> somehow performing a separate operation in the middle.
>>
>>
>>
>> Am I missing something obvious? I’ve read some suggestions like using
>> Hadoop to merge the directories (whose names I don’t see how you would
>> know) and to reduce the partitions to 1 (which wouldn’t help).
>>
>>
>>
>> Any other suggestions? What is the expected pattern a developer would
>> follow that would make Spark Streaming’s output format usable?
>>
>>
>>
>>
>>
>> www.sdl.com
>> <http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
>>
>>  *SDL PLC confidential, all rights reserved.* If you are not the
>> intended recipient of this mail SDL requests and requires that you delete
>> it without acting upon or copying any of its contents, and we further
>> request that you advise us.
>>
>> SDL PLC is a public limited company registered in England and Wales.
>> Registered number: 02675207.
>> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire
>> SL6 7DY, UK.
>>
>> This message has been scanned for malware by Websense. www.websense.com
>>
>
>
>
> --
> Emre Sevinc
>

Reply via email to