Thank u so much for the solution. I run the code like this,
JavaRDD<User> rdd = context.parallelize(usersList);
JavaRDD<User> rdd_sorted_users= rdd.sortBy(new Function<User,String>(){
@Override
public String call(User usr1) throws Exception {
String userName = usr1.getUserName().toUpperCase();
return userName ;
}
}, false, 1);
User user_top=rdd_sorted_users.first();
System.out.println("The top user is
:"+user_top.getUserName());
But it is giving me this exception, however my class has already
implemented the java.io.Serializable
15/07/07 08:13:07 ERROR TaskSetManager: Failed to serialize task 0, not
attempting to retry it.
java.io.NotSerializableException: Model.User
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:59)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at
org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at
org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168)
at
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.scheduler.TaskSchedulerImpl.org
$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293)
at
org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:79)
at
org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:58)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org
$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178)
at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org
$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198)
at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/07/07 08:13:07 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool
15/07/07 08:13:07 ERROR TaskSchedulerImpl: Resource offer failed, task set
TaskSet_0 was not serializable
15/07/07 08:13:07 INFO TaskSchedulerImpl: Cancelling stage 0
15/07/07 08:13:07 INFO DAGScheduler: ResultStage 0 (first at
HelloWorldnAddition.java:300) failed in 0.136 s
Exception - Message: Job aborted due to stage failure: Failed to serialize
task 0, not attempting to retry it. Exception during serialization:
java.io.NotSerializableException: Model.User
2015-07-07 17:07 GMT+02:00 oubrik [via Apache Spark User List] <
[email protected]>:
> JavaRDD<User> rdd_sorted_users= rdd.sortBy(new Function<User,String>(){
>
> @Override
> public String call(User arg0) throws Exception {
> String userName = usr1.getUserName().toUpperCase();
> return userName ;
> }
>
> }, false, 1);
>
> false :ascending
> true:descending
>
>
> for top : User user_top=rdd_sorted_users.first()
>
> 2015-07-07 17:05 GMT+02:00 Brahim Oubrik <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=23696&i=0>>:
>
>> sorry User not PPP
>>
>> 2015-07-07 17:04 GMT+02:00 Brahim Oubrik <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=23696&i=1>>:
>>
>>> JavaRDD<User> rdd_sorted_users= rdd.sortBy(new Function<User,String>(){
>>>
>>> @Override
>>> public String call(PPP arg0) throws Exception {
>>> String userName = usr1.getUserName().toUpperCase();
>>> return userName ;
>>> }
>>>
>>> }, false, 1);
>>>
>>> false :ascending
>>> true:descending
>>>
>>>
>>> for top : User user_top=rdd_sorted_users.first()
>>>
>>> 2015-07-07 16:54 GMT+02:00 Hafsa Asif [via Apache Spark User List] <[hidden
>>> email] <http:///user/SendEmail.jtp?type=node&node=23696&i=2>>:
>>>
>>>> Rusty,
>>>>
>>>> I am very thankful for your help. Actually, I am facing difficulty in
>>>> objects. My plan is that, I have an object list containing list of User
>>>> objects. After parallelizing it through spark context, I apply comparator
>>>> on user.getUserName(). As usernames are sorted, their related user object
>>>> are sorted according to user names.
>>>> In the end when I apply top, I get the whole object of user .
>>>>
>>>> Some like this:
>>>>
>>>> public static Comparator<User> UserComparator
>>>> = new Comparator<User>() {
>>>>
>>>> public int compare(User usr1, User usr2) {
>>>> String userName1 = usr1.getUserName().toUpperCase();
>>>> String userName2 = usr1.getUserName().toUpperCase();
>>>>
>>>> //ascending order
>>>> return userName1.compareTo(userName2);
>>>>
>>>> //descending order
>>>> //return fruitName2.compareTo(fruitName1);
>>>> }
>>>>
>>>> };
>>>>
>>>> JavaRDD<User> rdd = context.parallelize(usersList);
>>>>
>>>> 2015-07-07 16:05 GMT+02:00 rusty [via Apache Spark User List] <[hidden
>>>> email] <http:///user/SendEmail.jtp?type=node&node=23688&i=0>>:
>>>>
>>>>> JavaRDD<String> lines2 = ctx.parallelize(Arrays.asList("3", "6", "2",
>>>>> "5", "8", "6", "7"));
>>>>> List<String> top =lines2.top(7, new
>>>>> CustomComaprator<String>());
>>>>> for (String integer : top) {
>>>>> System.out.println(integer);
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> class CustomComaprator<T> implements Serializable, Comparator<T> {
>>>>> /**
>>>>> *
>>>>> */
>>>>> public CustomComaprator() {
>>>>> // TODO Auto-generated constructor stub
>>>>>
>>>>> }
>>>>>
>>>>> private static final long serialVersionUID =
>>>>> 2004092520677431781L;
>>>>>
>>>>> @Override
>>>>> public int compare(T o11, T o12) {
>>>>> int o1 = Integer.parseInt(String.valueOf(o11));
>>>>> int o2 = Integer.parseInt(String.valueOf(o12));
>>>>>
>>>>> return o1 > o2 ? 1 : o1 == o2 ? 0 : -1;
>>>>> }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> If you reply to this email, your message will be added to the
>>>>> discussion below:
>>>>>
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23684.html
>>>>> To unsubscribe from How to implement top() and filter() on object
>>>>> List for JavaRDD, click here.
>>>>> NAML
>>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23688.html
>>>> To unsubscribe from How to implement top() and filter() on object List
>>>> for JavaRDD, click here.
>>>> NAML
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>>
>>> --
>>> Bien cordialement,
>>>
>>>
>>> *-------------------------------------------------------------------------------------*
>>> *Brahim OUBRIK*
>>> *Elève-Ingénieur en Informatique à Polytech Paris Sud.*
>>> *Cell <a href="tel:%2B33%200651137644" value="+33651137644
>>> <%2B33651137644>" target="_blank">+33 0651137644 <%2B33%200651137644>*
>>>
>>>
>>> *-------------------------------------------------------------------------------------*
>>>
>>
>>
>>
>> --
>> Bien cordialement,
>>
>>
>> *-------------------------------------------------------------------------------------*
>> *Brahim OUBRIK*
>> *Elève-Ingénieur en Informatique à Polytech Paris Sud.*
>> *Cell <a href="tel:%2B33%200651137644" value="+33651137644
>> <%2B33651137644>" target="_blank">+33 0651137644 <%2B33%200651137644>*
>>
>>
>> *-------------------------------------------------------------------------------------*
>>
>
>
>
> --
> Bien cordialement,
>
>
> *-------------------------------------------------------------------------------------*
> *Brahim OUBRIK*
> *Elève-Ingénieur en Informatique à Polytech Paris Sud.*
> *Cell +33 0651137644 <%2B33%200651137644>*
>
>
> *-------------------------------------------------------------------------------------*
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23696.html
> To unsubscribe from How to implement top() and filter() on object List
> for JavaRDD, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=23669&code=aGFmc2EuYXNpZkBtYXRjaGluZ3V1LmNvbXwyMzY2OXwtMTA0ODgyNjY3NA==>
> .
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23698.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.