There is an option to join small files up. If you are unable to find it
just let me know.


Regards,
Gourav

On Thu, Jul 28, 2016 at 4:58 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi Pedro
>
> Thanks for the explanation. I started watching your repo. In the short
> term I think I am going to try concatenating my small files into 64MB and
> using HDFS. My spark streaming app is implemented Java and uses data
> frames. It writes to s3. My batch processing is written in python It reads
> data into data frames.
>
> Its probably a lot of work to make your solution working in these other
> contexts.
>
> Here is another use case you might be interested in
> Writing multiple files to S3 is really slow. It causes a lot of problems
> for my streaming app. Bad things happen if your processing time exceeds
> your window length. Our streaming app must save all the input. For each
> mini batch we split the input into as many as 30 different data sets. Each
> one needs to be written to S3.
>
> As a temporary work around I use an executor service to try and get more
> concurrent writes. Ideally the spark frame work would provide support for
> async IO, and hopefully the S3 performance issue would be improved. Here is
> my code if you are interested
>
>
> public class StreamingKafkaGnipCollector {
>
>     static final int POOL_SIZE = 30;
>
>     static ExecutorService executor = Executors.newFixedThreadPool(
> POOL_SIZE);
>
> …
>
> private static void saveRawInput(SQLContext sqlContext,
> JavaPairInputDStream<String, String> messages, String outputURIBase) {
>
> JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
> String>, String>() {
>
> private static final long serialVersionUID = 1L;
>
>
> @Override
>
> public String call(Tuple2<String, String> tuple2) {
>
> //logger.warn("TODO _2:{}", tuple2._2);
>
> return tuple2._2();
>
> }
>
> });
>
>
> lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
>
> @Override
>
> public void call(JavaRDD<String> jsonRDD, Time time) throws Exception {
> …
>
> // df.write().json("s3://"); is very slow
>
> // run saves concurrently
>
> List<SaveData> saveData = new ArrayList<SaveData>(100);
>
> for (String tag: tags) {
>
> DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag));
>
> String dirPath = createPath(outputURIBase, date, tag, milliSeconds);
>
> saveData.add(new SaveData(saveDF, dirPath));
>
> }
>
>
> saveImpl(saveData, executor); // concurrent writes to S3
>
> }
>
> private void saveImpl(List<SaveData> saveData, ExecutorService executor) {
>
> List<Future<?>> runningThreads = new ArrayList<Future<?>>(POOL_SIZE);
>
> for(SaveData data : saveData) {
>
> SaveWorker worker = new SaveWorker(data);
>
> Future<?> f = executor.submit(worker);
>
> runningThreads.add(f);
>
> }
>
> // wait for all the workers to complete
>
> for (Future<?> worker : runningThreads) {
>
> try {
>
> worker.get();
>
> logger.debug("worker completed");
>
> } catch (InterruptedException e) {
>
> logger.error("", e);
>
> } catch (ExecutionException e) {
>
> logger.error("", e);
>
> }
>
> }
>
> }
>
>
> static class SaveData {
>
> private DataFrame df;
>
> private String path;
>
>
> SaveData(DataFrame df, String path) {
>
> this.df = df;
>
> this.path = path;
>
> }
>
> }
>
> static class SaveWorker implements Runnable {
>
> SaveData data;
>
>
> public SaveWorker(SaveData data) {
>
> this.data = data;
>
> }
>
>
> @Override
>
> public void run() {
>
> if (data.df.count() >= 1) {
>
> data.df.write().json(data.path);
>
> }
>
> }
>
> }
>
> }
>
>
> From: Pedro Rodriguez <ski.rodrig...@gmail.com>
> Date: Wednesday, July 27, 2016 at 8:40 PM
> To: Andrew Davidson <a...@santacruzintegration.com>
> Cc: "user @spark" <user@spark.apache.org>
> Subject: Re: performance problem when reading lots of small files created
> by spark streaming.
>
> There are a few blog posts that detail one possible/likely issue for
> example:
> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
>
> TLDR: The hadoop libraries spark uses assumes that its input comes from a
>  file system (works with HDFS) however S3 is a key value store, not a file
> system. Somewhere along the line, this makes things very slow. Below I
> describe their approach and a library I am working on to solve this problem.
>
> (Much) Longer Version (with a shiny new library in development):
> So far in my reading of source code, Hadoop attempts to actually read from
> S3 which can be expensive particularly since it does so from a single
> driver core (different from listing files, actually reading them, I can
> find the source code and link it later if you would like). The concept
> explained above is to instead use the AWS sdk to list files then distribute
> the files names as a collection with sc.parallelize, then read them in
> parallel. I found this worked, but lacking in a few ways so I started this
> project: https://github.com/EntilZha/spark-s3
>
> This takes that idea further by:
> 1. Rather than sc.parallelize, implement the RDD interface where each
> partition is defined by the files it needs to read (haven't gotten to
> DataFrames yet)
> 2. At the driver node, use the AWS SDK to list all the files with their
> size (listing is fast), then run the Least Processing Time Algorithm to
> sift the files into roughly balanced partitions by size
> 3. API: S3Context(sc).textFileByPrefix("bucket", "file1",
> "folder2").regularRDDOperationsHere or import implicits and do
> sc.s3.textFileByPrefix
>
> At present, I am battle testing and benchmarking it at my current job and
> results are promising with significant improvements to jobs dealing with
> many files especially many small files and to jobs whose input is
> unbalanced to start with. Jobs perform better because: 1) there isn't a
> long stall at the driver when hadoop decides how to split S3 files 2) the
> partitions end up nearly perfectly balanced because of LPT algorithm.
>
> Since I hadn't intended to advertise this quite yet the documentation is
> not super polished but exists here:
> http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context
>
> I am completing the sonatype process for publishing artifacts on maven
> central (this should be done by tomorrow so referencing
> "io.entilzha:spark-s3_2.10:0.0.0" should work very soon). I would love to
> hear if this library solution works, otherwise I hope the blog post above
> is illuminating.
>
> Pedro
>
> On Wed, Jul 27, 2016 at 8:19 PM, Andy Davidson <
> a...@santacruzintegration.com> wrote:
>
>> I have a relatively small data set however it is split into many small
>> JSON files. Each file is between maybe 4K and 400K
>> This is probably a very common issue for anyone using spark streaming. My
>> streaming app works fine, how ever my batch application takes several hours
>> to run.
>>
>> All I am doing is calling count(). Currently I am trying to read the
>> files from s3. When I look at the app UI it looks like spark is blocked
>> probably on IO? Adding additional workers and memory does not improve
>> performance.
>>
>> I am able to copy the files from s3 to a worker relatively quickly. So I
>> do not think s3 read time is the problem.
>>
>> In the past when I had similar data sets stored on HDFS I was able to use
>> coalesce() to reduce the number of partition from 200K to 30. This made a
>> big improvement in processing time. How ever when I read from s3 coalesce()
>> does not improve performance.
>>
>> I tried copying the files to a normal file system and then using ‘hadoop
>> fs put’ to copy the files to hdfs how ever this takes several hours and is
>> no where near completion. It appears hdfs does not deal with small files
>> well.
>>
>> I am considering copying the files from s3 to a normal file system on one
>> of my workers and then concatenating the files into a few much large files,
>> then using ‘hadoop fs put’ to move them to hdfs. Do you think this would
>> improve the spark count() performance issue?
>>
>> Does anyone know of heuristics for determining the number or size of the
>> concatenated files?
>>
>> Thanks in advance
>>
>> Andy
>>
>
>
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>

Reply via email to