Maybe obvious, but what happens when you change the s3 write to a println of all the data? That should identify whether it's the issue.
count() and read.json() will involve additional tasks (run through the items in the rdd to count them, likewise to infer the schema) but for 300 records that shouldn't be much of an issue. On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson <a...@santacruzintegration.com> wrote: > I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using kafka > direct stream approach. I am running into performance problems. My > processing time is > than my window size. Changing window sizes, adding > cores and executor memory does not change performance. I am having a lot of > trouble identifying the problem by at the metrics provided for streaming > apps in the spark application web UI. > > I think my performance problem has to with writing the data to S3. > > My app receives very complicated JSON. My program is simple, It sorts the > data into a small set of sets and writes each set as a separate S3 object. > The mini batch data has at most 300 events so I do not think shuffle is an > issue. > > DataFrame rawDF = sqlContext.read().json(jsonRDD).cache(); > > … Explode tagCol … > > > DataFrame rulesDF = activityDF.select(tagCol).distinct(); > > Row[] rows = rulesDF.select(tagCol).collect(); > > List<String> tags = new ArrayList<String>(100); > > for (Row row : rows) { > > Object tag = row.get(0); > > tags.add(tag.toString()); > > } > > > I think the for loop bellow is where the bottle neck is. Is write async() ? > > > If not is there an easy to to vectorize/parallelize this for loop or do I > have to create the threads my self? > > > Is creating threads in spark a bad idea? > > > > for(String tag : tags) { > > DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag)); > > if (saveDF.count() >= 1) { // I do not think count() is an issue performance > is about 34 ms > > String dirPath = “s3n://myBucket" + File.separator + date + File.separator + > tag + File.separator + milliSeconds; > > saveDF.write().json(dirPath); > > } > > } > > > Any suggestions would be greatly appreciated > > > Andy > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org