val summary = rowStructText.map(s => s.split(",")).map(
{
s =>
*println(s)*
Summary(formatStringAsDate(s(0)),
s(1).replaceAll("\"", "").toLong,
s(3).replaceAll("\"", "").toLong,
s(4).replaceAll("\"", "").toInt,
s(5).replaceAll("\"", ""),
s(6).replaceAll("\"", "").toInt,
formatStringAsDate(s(7)),
formatStringAsDate(s(8)),
s(9).replaceAll("\"", "").toInt,
s(10).replaceAll("\"", "").toInt,
s(11).replaceAll("\"", "").toFloat,
s(12).replaceAll("\"", "").toInt,
s(13).replaceAll("\"", "").toInt,
s(14).replaceAll("\"", "")
)
}
)
summary.count
AND
rowStructText.map(s => {
* println(s)*
s.split(",").size
}).countByValue foreach println
DOES NOT PRINT THE OUTPUT.
When i open up the spark history server it does not launch new SPARK JOBS
for countByValue . Why is that and when does it actually start a new job ?
On Wed, Aug 5, 2015 at 10:19 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <[email protected]> wrote:
> summary: org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[285] at map
> at <console>:169 (1,517252)
>
> What does that mean ?
>
> On Wed, Aug 5, 2015 at 10:14 PM, Jeff Zhang <[email protected]> wrote:
>
>> You data might have format issue (with less fields than you expect)
>>
>> Please try execute the following code to check whether all the lines with
>> 14 fields:
>> rowStructText.map(s => s.split(",").size).countByValue foreach
>> println
>>
>> On Thu, Aug 6, 2015 at 1:01 PM, Randy Gelhausen <
>> [email protected]> wrote:
>>
>>> You likely have a problem with your parsing logic. I can’t see the data
>>> to know for sure, but since Spark is lazily evaluated, it doesn’t try to
>>> run your map until you execute the SQL that applies it to the data.
>>>
>>> That’s why your first paragraph can run (it’s only defining metadata),
>>> but paragraph 2 throws an error.
>>>
>>> From: "ÐΞ€ρ@Ҝ (๏̯͡๏)"
>>> Reply-To: "[email protected]"
>>> Date: Thursday, August 6, 2015 at 12:37 AM
>>> To: "[email protected]"
>>> Subject: Re: Unable to run count(*)
>>>
>>> %sql
>>> select * from summary
>>>
>>> Throws same error
>>>
>>> On Wed, Aug 5, 2015 at 9:33 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <[email protected]>
>>> wrote:
>>>
>>>> Para-1
>>>> import java.text.SimpleDateFormat
>>>> import java.util.Calendar
>>>> import java.sql.Date
>>>>
>>>> def formatStringAsDate(dateStr: String) = new java.sql.Date(new
>>>> SimpleDateFormat("yyyy-MM-dd").parse(dateStr).getTime())
>>>>
>>>>
>>>> //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)
>>>> val rowStructText =
>>>> sc.textFile("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz")
>>>> case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 :
>>>> String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11:
>>>> Float, f12: Integer, f13: Integer, f14: String)
>>>>
>>>> val summary = rowStructText.map(s => s.split(",")).map(
>>>> {
>>>> s =>
>>>> Summary(formatStringAsDate(s(0)),
>>>> s(1).replaceAll("\"", "").toLong,
>>>> s(3).replaceAll("\"", "").toLong,
>>>> s(4).replaceAll("\"", "").toInt,
>>>> s(5).replaceAll("\"", ""),
>>>> s(6).replaceAll("\"", "").toInt,
>>>> formatStringAsDate(s(7)),
>>>> formatStringAsDate(s(8)),
>>>> s(9).replaceAll("\"", "").toInt,
>>>> s(10).replaceAll("\"", "").toInt,
>>>> s(11).replaceAll("\"", "").toFloat,
>>>> s(12).replaceAll("\"", "").toInt,
>>>> s(13).replaceAll("\"", "").toInt,
>>>> s(14).replaceAll("\"", "")
>>>> )
>>>> }
>>>> ).toDF()
>>>> summary.registerTempTable("summary")
>>>>
>>>>
>>>>
>>>> Output:
>>>> import java.text.SimpleDateFormat import java.util.Calendar import
>>>> java.sql.Date formatStringAsDate: (dateStr: String)java.sql.Date
>>>> rowStructText: org.apache.spark.rdd.RDD[String] =
>>>> /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz
>>>> MapPartitionsRDD[152] at textFile at <console>:100 defined class Summary
>>>> summary: org.apache.spark.sql.DataFrame = [f1: date, f2: bigint, f3:
>>>> bigint, f4: int, f5: string, f6: int, f7: date, f8: date, f9: int, f10:
>>>> int, f11: float, f12: int, f13: int, f14: string]
>>>>
>>>>
>>>> Para-2 (DOES NOT WORK)
>>>> %sql select count(*) from summary
>>>>
>>>> Output
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 0 in stage 29.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>>>> 29.0 (TID 1844, datanode-6-3486.phx01.dev.ebayc3.com):
>>>> java.lang.ArrayIndexOutOfBoundsException: 1 at
>>>> $line184.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:109)
>>>> at
>>>> $line184.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:107)
>>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
>>>> scala.collection.Iterator$$anon$1.next(Iterator.scala:853) at
>>>> scala.collection.Iterator$$anon$1.head(Iterator.scala:840) at
>>>> org.apache.spark.sql.execution.RDDConversions$$anonfun$productToRowRdd$1.apply(ExistingRDD.scala:42)
>>>> at
>>>> org.apache.spark.sql.execution.RDDConversions$$anonfun$productToRowRdd$1.apply(ExistingRDD.scala:37)
>>>> at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at
>>>> org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at
>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at
>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at
>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at
>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>> at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>> Suggestions ?
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>
> --
> Deepak
>
>
--
Deepak