Hi All,

*I am new to spark and trying to execute spark sql through java code as
below*

package com.ce.sql;

import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.api.java.function.Function;

public class SparkSQL {
        
        public static void main(String[] args) {

            SparkConf conf = new SparkConf().setAppName("Log Analyzer
SQL").setMaster("spark://<host>:7077");
            JavaSparkContext sc = new JavaSparkContext(conf);
            JavaSQLContext sqlContext = new JavaSQLContext(sc);
            
            JavaRDD<Person> people =
sc.textFile("hdfs://localhost/spark/people.txt").map(
                          new Function<String, Person>() {
                            public Person call(String line) throws Exception {  
                        
                              String[] parts = line.split(",");
                              Person person = new Person();
                              person.setName(parts[0]);
                              person.setAge(Integer.parseInt(parts[1].trim()));
                              return person;
                            }
                          });    

                        // Apply a schema to an RDD of JavaBeans and register 
it as a table.
                        JavaSchemaRDD schemaPeople = 
sqlContext.applySchema(people,
Person.class);                  
                        schemaPeople.registerAsTable("people");  

                        // SQL can be run over RDDs that have been registered 
as tables.
                        JavaSchemaRDD teenagers = sqlContext.sql("SELECT name 
FROM people
WHERE age >= 10");
                        
                        List<String> teenagerNames = teenagers.map(new 
Function<Row,
String>() {
                                  public String call(Row row) {
                                    return "Name: " + row.getString(0);
                                  }
                                }).collect();       
        }
}

I am using hadoop2.2, spark1.0.2, scala2.11 and running hadoop in pseudo
d.m., spark master and workers are running on same machine.
After executing above code I am getting  

15/05/25 15:29:22 INFO Analyzer: Max iterations (2) reached for batch
MultiInstanceRelations
15/05/25 15:29:22 INFO Analyzer: Max iterations (2) reached for batch
CaseInsensitiveAttributeReferences
15/05/25 15:29:22 INFO Analyzer: Max iterations (2) reached for batch Check
Analysis
15/05/25 15:29:22 INFO SQLContext$$anon$1: Max iterations (2) reached for
batch Add exchange
15/05/25 15:29:22 INFO SQLContext$$anon$1: Max iterations (2) reached for
batch Prepare Expressions
15/05/25 15:29:27 INFO FileInputFormat: Total input paths to process : 1
15/05/25 15:29:28 INFO SparkContext: Starting job: collect at
SparkSQL.java:54
15/05/25 15:29:28 INFO DAGScheduler: Got job 0 (collect at SparkSQL.java:54)
with 2 output partitions (allowLocal=false)
15/05/25 15:29:28 INFO DAGScheduler: Final stage: Stage 0(collect at
SparkSQL.java:54)
15/05/25 15:29:28 INFO DAGScheduler: Parents of final stage: List()
15/05/25 15:29:28 INFO DAGScheduler: Missing parents: List()
15/05/25 15:29:28 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[8] at map
at JavaSchemaRDD.scala:40), which has no missing parents
15/05/25 15:29:28 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0
(MappedRDD[8] at map at JavaSchemaRDD.scala:40)
15/05/25 15:29:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/05/25 15:29:37 INFO AppClient$ClientActor: Connecting to master
spark://localhost:7077...
15/05/25 15:29:43 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
15/05/25 15:29:57 INFO AppClient$ClientActor: Connecting to master
spark://localhost:7077...
15/05/25 15:29:58 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
15/05/25 15:30:13 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
15/05/25 15:30:17 ERROR SparkDeploySchedulerBackend: Application has been
killed. Reason: All masters are unresponsive! Giving up.
15/05/25 15:30:17 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool 
15/05/25 15:30:17 INFO TaskSchedulerImpl: Cancelling stage 0
15/05/25 15:30:17 INFO DAGScheduler: Failed to run collect at
SparkSQL.java:54
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: All masters are unresponsive! Giving up.
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
        at scala.Option.foreach(Option.scala:236)
        at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
        at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/05/25 15:30:18 WARN QueuedThreadPool: 7 threads could not be stopped
15/05/25 15:30:18 INFO SparkUI: Stopped Spark web UI at
http://01hw243941:4040
15/05/25 15:30:18 INFO DAGScheduler: Stopping DAGScheduler
15/05/25 15:30:18 INFO SparkDeploySchedulerBackend: Shutting down all
executors
15/05/25 15:30:18 INFO SparkDeploySchedulerBackend: Asking each executor to
shut down

I can also see logs from master as below

15/05/25 14:27:54 ERROR remote.EndpointWriter: dropping message [class
akka.actor.SelectChildName] for non-local recipient
[Actor[akka.tcp://sparkMaster@172.17.198.77:7077/]] arriving at
[akka.tcp://sparkMaster@172.17.198.77:7077] inbound addresses are
[akka.tcp://sparkMaster@01hw243941:7077]
15/05/25 14:28:13 ERROR remote.EndpointWriter: dropping message [class
akka.actor.SelectChildName] for non-local recipient
[Actor[akka.tcp://sparkMaster@172.17.198.77:7077/]] arriving at
[akka.tcp://sparkMaster@172.17.198.77:7077] inbound addresses are
[akka.tcp://sparkMaster@01hw243941:7077]
15/05/25 14:28:33 ERROR remote.EndpointWriter: dropping message [class
akka.actor.SelectChildName] for non-local recipient
[Actor[akka.tcp://sparkMaster@172.17.198.77:7077/]] arriving at
[akka.tcp://sparkMaster@172.17.198.77:7077] inbound addresses are
[akka.tcp://sparkMaster@01hw243941:7077]
15/05/25 14:28:54 INFO master.Master: akka.tcp://spark@01hw243941:51189 got
disassociated, removing it.
15/05/25 14:28:54 INFO master.Master: akka.tcp://spark@01hw243941:51189 got
disassociated, removing it.
15/05/25 14:28:54 INFO actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://sparkMaster/deadLetters] to
Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%40172.17.198.77%3A55452-41#-304308467]
was not delivered. [40] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
15/05/25 14:28:54 ERROR remote.EndpointWriter: AssociationError
[akka.tcp://sparkMaster@01hw243941:7077] ->
[akka.tcp://spark@01hw243941:51189]: Error [Association failed with
[akka.tcp://spark@01hw243941:51189]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://spark@01hw243941:51189]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: 01hw243941/172.17.198.77:51189
]
15/05/25 14:28:54 INFO master.Master: akka.tcp://spark@01hw243941:51189 got
disassociated, removing it.
15/05/25 14:28:54 ERROR remote.EndpointWriter: AssociationError
[akka.tcp://sparkMaster@01hw243941:7077] ->
[akka.tcp://spark@01hw243941:51189]: Error [Association failed with
[akka.tcp://spark@01hw243941:51189]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://spark@01hw243941:51189]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: 01hw243941/172.17.198.77:51189
]
15/05/25 14:28:54 INFO master.Master: akka.tcp://spark@01hw243941:51189 got
disassociated, removing it.
15/05/25 14:28:54 ERROR remote.EndpointWriter: AssociationError
[akka.tcp://sparkMaster@01hw243941:7077] ->
[akka.tcp://spark@01hw243941:51189]: Error [Association failed with
[akka.tcp://spark@01hw243941:51189]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://spark@01hw243941:51189]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: 01hw243941/172.17.198.77:51189
]
15/05/25 14:28:54 INFO master.Master: akka.tcp://spark@01hw243941:51189 got
disassociated, removing it.


Please help me to solve this issue thanks in advance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-through-java-code-facing-issue-tp23015.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to