Hi All,
*I am new to spark and trying to execute spark sql through java code as
below*
package com.ce.sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.api.java.function.Function;
public class SparkSQL {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Log Analyzer
SQL").setMaster("spark://<host>:7077");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaSQLContext sqlContext = new JavaSQLContext(sc);
JavaRDD<Person> people =
sc.textFile("hdfs://localhost/spark/people.txt").map(
new Function<String, Person>() {
public Person call(String line) throws Exception {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
}
});
// Apply a schema to an RDD of JavaBeans and register
it as a table.
JavaSchemaRDD schemaPeople =
sqlContext.applySchema(people,
Person.class);
schemaPeople.registerAsTable("people");
// SQL can be run over RDDs that have been registered
as tables.
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name
FROM people
WHERE age >= 10");
List<String> teenagerNames = teenagers.map(new
Function<Row,
String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
}
}
I am using hadoop2.2, spark1.0.2, scala2.11 and running hadoop in pseudo
d.m., spark master and workers are running on same machine.
After executing above code I am getting
15/05/25 15:29:22 INFO Analyzer: Max iterations (2) reached for batch
MultiInstanceRelations
15/05/25 15:29:22 INFO Analyzer: Max iterations (2) reached for batch
CaseInsensitiveAttributeReferences
15/05/25 15:29:22 INFO Analyzer: Max iterations (2) reached for batch Check
Analysis
15/05/25 15:29:22 INFO SQLContext$$anon$1: Max iterations (2) reached for
batch Add exchange
15/05/25 15:29:22 INFO SQLContext$$anon$1: Max iterations (2) reached for
batch Prepare Expressions
15/05/25 15:29:27 INFO FileInputFormat: Total input paths to process : 1
15/05/25 15:29:28 INFO SparkContext: Starting job: collect at
SparkSQL.java:54
15/05/25 15:29:28 INFO DAGScheduler: Got job 0 (collect at SparkSQL.java:54)
with 2 output partitions (allowLocal=false)
15/05/25 15:29:28 INFO DAGScheduler: Final stage: Stage 0(collect at
SparkSQL.java:54)
15/05/25 15:29:28 INFO DAGScheduler: Parents of final stage: List()
15/05/25 15:29:28 INFO DAGScheduler: Missing parents: List()
15/05/25 15:29:28 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[8] at map
at JavaSchemaRDD.scala:40), which has no missing parents
15/05/25 15:29:28 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0
(MappedRDD[8] at map at JavaSchemaRDD.scala:40)
15/05/25 15:29:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/05/25 15:29:37 INFO AppClient$ClientActor: Connecting to master
spark://localhost:7077...
15/05/25 15:29:43 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
15/05/25 15:29:57 INFO AppClient$ClientActor: Connecting to master
spark://localhost:7077...
15/05/25 15:29:58 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
15/05/25 15:30:13 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory
15/05/25 15:30:17 ERROR SparkDeploySchedulerBackend: Application has been
killed. Reason: All masters are unresponsive! Giving up.
15/05/25 15:30:17 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
have all completed, from pool
15/05/25 15:30:17 INFO TaskSchedulerImpl: Cancelling stage 0
15/05/25 15:30:17 INFO DAGScheduler: Failed to run collect at
SparkSQL.java:54
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: All masters are unresponsive! Giving up.
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/05/25 15:30:18 WARN QueuedThreadPool: 7 threads could not be stopped
15/05/25 15:30:18 INFO SparkUI: Stopped Spark web UI at
http://01hw243941:4040
15/05/25 15:30:18 INFO DAGScheduler: Stopping DAGScheduler
15/05/25 15:30:18 INFO SparkDeploySchedulerBackend: Shutting down all
executors
15/05/25 15:30:18 INFO SparkDeploySchedulerBackend: Asking each executor to
shut down
I can also see logs from master as below
15/05/25 14:27:54 ERROR remote.EndpointWriter: dropping message [class
akka.actor.SelectChildName] for non-local recipient
[Actor[akka.tcp://[email protected]:7077/]] arriving at
[akka.tcp://[email protected]:7077] inbound addresses are
[akka.tcp://sparkMaster@01hw243941:7077]
15/05/25 14:28:13 ERROR remote.EndpointWriter: dropping message [class
akka.actor.SelectChildName] for non-local recipient
[Actor[akka.tcp://[email protected]:7077/]] arriving at
[akka.tcp://[email protected]:7077] inbound addresses are
[akka.tcp://sparkMaster@01hw243941:7077]
15/05/25 14:28:33 ERROR remote.EndpointWriter: dropping message [class
akka.actor.SelectChildName] for non-local recipient
[Actor[akka.tcp://[email protected]:7077/]] arriving at
[akka.tcp://[email protected]:7077] inbound addresses are
[akka.tcp://sparkMaster@01hw243941:7077]
15/05/25 14:28:54 INFO master.Master: akka.tcp://spark@01hw243941:51189 got
disassociated, removing it.
15/05/25 14:28:54 INFO master.Master: akka.tcp://spark@01hw243941:51189 got
disassociated, removing it.
15/05/25 14:28:54 INFO actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from
Actor[akka://sparkMaster/deadLetters] to
Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%40172.17.198.77%3A55452-41#-304308467]
was not delivered. [40] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
15/05/25 14:28:54 ERROR remote.EndpointWriter: AssociationError
[akka.tcp://sparkMaster@01hw243941:7077] ->
[akka.tcp://spark@01hw243941:51189]: Error [Association failed with
[akka.tcp://spark@01hw243941:51189]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://spark@01hw243941:51189]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: 01hw243941/172.17.198.77:51189
]
15/05/25 14:28:54 INFO master.Master: akka.tcp://spark@01hw243941:51189 got
disassociated, removing it.
15/05/25 14:28:54 ERROR remote.EndpointWriter: AssociationError
[akka.tcp://sparkMaster@01hw243941:7077] ->
[akka.tcp://spark@01hw243941:51189]: Error [Association failed with
[akka.tcp://spark@01hw243941:51189]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://spark@01hw243941:51189]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: 01hw243941/172.17.198.77:51189
]
15/05/25 14:28:54 INFO master.Master: akka.tcp://spark@01hw243941:51189 got
disassociated, removing it.
15/05/25 14:28:54 ERROR remote.EndpointWriter: AssociationError
[akka.tcp://sparkMaster@01hw243941:7077] ->
[akka.tcp://spark@01hw243941:51189]: Error [Association failed with
[akka.tcp://spark@01hw243941:51189]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://spark@01hw243941:51189]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: 01hw243941/172.17.198.77:51189
]
15/05/25 14:28:54 INFO master.Master: akka.tcp://spark@01hw243941:51189 got
disassociated, removing it.
Please help me to solve this issue thanks in advance
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-through-java-code-facing-issue-tp23015.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]