Please see the comments at the tail of SPARK-2356 Cheers
On Wed, Aug 5, 2015 at 6:04 PM, Ashish Dutt <[email protected]> wrote: > *Use Case:* To automate the process of data extraction (HDFS), data > analysis (pySpark/sparkR) and saving the data back to HDFS > programmatically. > > *Prospective solutions:* > > 1. Create a remote server connectivity program in an IDE like pyCharm or > RStudio and use it to retrieve the data from HDFS or else > 2. Create the data retrieval code in python or R and then point the IDE to > the remote server using TCP. > > *Problem:* How to achieve either of the prospective solution 1 or 2 > defined above? Do you have any better solution then these, if yes please > share? > > *What have I tried so far?* > > The server and 3 namenodes already installed with pyspark and I have > checked pyspark works in standalone mode on all four servers. Pyspark works > in standalone mode on my laptop too. > > I use the following code but I am not able to connect to the remote server. > > import os > import sys > try: > from pyspark import SparkContext > from pyspark import SparkConf > print ("Pyspark sucess") > except ImportError as e: > print ("Error importing Spark Modules", e) > > conf = SparkConf() > conf.setMaster("spark://10.210.250.400:7077") > conf.setAppName("First_Remote_Spark_Program") > sc = SparkContext(conf=conf) > print ("connection succeeded with Master",conf) > data = [1, 2, 3, 4, 5] > distData = sc.parallelize(data) > print(distData) > > The stack trace of error is > > Pyspark sucess > 15/08/01 14:08:24 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > 15/08/01 14:08:24 ERROR Shell: Failed to locate the winutils binary in the > hadoop binary path > java.io.IOException: Could not locate executable null\bin\winutils.exe in the > Hadoop binaries. > at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) > at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) > at org.apache.hadoop.util.Shell.<clinit>(Shell.java:326) > at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76) > at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) > at org.apache.hadoop.security.Groups.<init>(Groups.java:77) > at > org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) > at > org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) > at > org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232) > at > org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718) > at > org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703) > at > org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605) > at > org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) > at > org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162) > at org.apache.spark.SparkContext.<init>(SparkContext.scala:301) > at > org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:214) > at > py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) > at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:745) > 15/08/01 14:08:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. > Attempting port 4041. > 15/08/01 14:08:26 WARN AppClient$ClientActor: Could not connect to > akka.tcp://[email protected]:7077: akka.remote.InvalidAssociation: > Invalid address: akka.tcp://[email protected]:7077 > 15/08/01 14:08:26 WARN Remoting: Tried to associate with unreachable remote > address [akka.tcp://[email protected]:7077]. Address is now gated > for 5000 ms, all messages to this address will be delivered to dead letters. > Reason: Connection refused: no further information: /10.210.250.400:7077 > 15/08/01 14:08:46 WARN AppClient$ClientActor: Could not connect to > akka.tcp://[email protected]:7077: akka.remote.InvalidAssociation: > Invalid address: akka.tcp://[email protected]:7077 > 15/08/01 14:08:46 WARN Remoting: Tried to associate with unreachable remote > address [akka.tcp://[email protected]:7077]. Address is now gated > for 5000 ms, all messages to this address will be delivered to dead letters. > Reason: Connection refused: no further information: /10.210.250.400:7077 > 15/08/01 14:09:06 WARN AppClient$ClientActor: Could not connect to > akka.tcp://[email protected]:7077: akka.remote.InvalidAssociation: > Invalid address: akka.tcp://[email protected]:7077 > 15/08/01 14:09:06 WARN Remoting: Tried to associate with unreachable remote > address [akka.tcp://[email protected]:7077]. Address is now gated > for 5000 ms, all messages to this address will be delivered to dead letters. > Reason: Connection refused: no further information: /10.210.250.400:7077 > 15/08/01 14:09:25 ERROR SparkDeploySchedulerBackend: Application has been > killed. Reason: All masters are unresponsive! Giving up. > 15/08/01 14:09:25 WARN SparkDeploySchedulerBackend: Application ID is not > initialized yet. > 15/08/01 14:09:25 ERROR OneForOneStrategy: > java.lang.NullPointerException > at > org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) > at > org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 15/08/01 14:09:25 ERROR SparkContext: Error initializing SparkContext. > java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext > at org.apache.spark.SparkContext.org > <http://org.apache.spark.sparkcontext.org/>$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103) > at org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501) > at > org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005) > at org.apache.spark.SparkContext.<init>(SparkContext.scala:543) > at > org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:214) > at > py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) > at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:745) > Traceback (most recent call last): > File "C:/Users/ashish dutt/PycharmProjects/KafkaToHDFS/local2Remote.py", line > 26, in <module> > sc = SparkContext(conf=conf) > File "C:\spark-1.4.0\python\pyspark\context.py", line 113, in __init__ > conf, jsc, profiler_cls) > File "C:\spark-1.4.0\python\pyspark\context.py", line 165, in _do_init > self._jsc = jsc or self._initialize_context(self._conf._jconf) > File "C:\spark-1.4.0\python\pyspark\context.py", line 219, in > _initialize_context > return self._jvm.JavaSparkContext(jconf) > File "C:\spark-1.4.0\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", > line 701, in __call__ > File "C:\spark-1.4.0\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line > 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > None.org.apache.spark.api.java.JavaSparkContext. > : java.lang.IllegalStateException: Cannot call methods on a stopped > SparkContext > at org.apache.spark.SparkContext.org > <http://org.apache.spark.sparkcontext.org/>$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103) > at org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501) > at > org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005) > at org.apache.spark.SparkContext.<init>(SparkContext.scala:543) > at > org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:214) > at > py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) > at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:745) > > Process finished with exit code 1 > > The spark-defaults.conf file is configured as follows > > #spark.eventLog.dir=hdfs://ABCD01:8020/user/spark/applicationHistory > spark.eventLog.dir hdfs://10.210.250.400:8020/user/spark/eventlog > spark.eventLog.enabled true > spark.serializer org.apache.spark.serializer.KryoSerializer > spark.shuffle.service.enabled true > spark.shuffle.service.port 7337 > spark.yarn.historyServer.address http://ABCD04:18088 <http://abcd04:18088/> > spark.master spark://10.210.250.400:7077 > spark.yarn.jar > local:/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar > spark.driver.extraLibraryPath > /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop/lib/native > spark.executor.extraLibraryPath > /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop/lib/native > spark.yarn.am.extraLibraryPath > /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop/lib/native > spark.logConf true > > The spark-env.sh file is configured as follows > > #!/usr/bin/env bash > ## > # Generated by Cloudera Manager and should not be modified directly > ## > > SELF="$(cd $(dirname $BASH_SOURCE) && pwd)" > if [ -z "$SPARK_CONF_DIR" ]; then > export SPARK_CONF_DIR="$SELF" > fi > > export SPARK_HOME=/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/spark > export > DEFAULT_HADOOP_HOME=/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop > #export STANDALONE_SPARK_MASTER_HOST=`ABCD01` > export SPARK_MASTER_IP=spark://10.210.250.400 > export SPARK_MASTER_PORT=7077 > export SPARK_WEBUI_PORT=18080 > > > ### Path of Spark assembly jar in HDFS > export SPARK_JAR_HDFS_PATH=${SPARK_JAR_HDFS_PATH:-''} > > export HADOOP_HOME=${HADOOP_HOME:-$DEFAULT_HADOOP_HOME} > > if [ -n "$HADOOP_HOME" ]; then > LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${HADOOP_HOME}/lib/native > fi > > SPARK_EXTRA_LIB_PATH="" > if [ -n "$SPARK_EXTRA_LIB_PATH" ]; then > LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$SPARK_EXTRA_LIB_PATH > fi > > export LD_LIBRARY_PATH > export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-$SPARK_CONF_DIR/yarn-conf} > > # This is needed to support old CDH versions that use a forked version > # of compute-classpath.sh. > export SCALA_LIBRARY_PATH=${SPARK_HOME}/lib > > # Set distribution classpath. This is only used in CDH 5.3 and later. > export SPARK_DIST_CLASSPATH=$(paste -sd: "$SELF/classpath.txt") > > And the slaves.sh file is configured as > > 10.210.250.401 > 10.210.250.402 > 10.210.250.403 > > Please tell me how can I connect to the remote server using pycharm or any > other IDE? > > Thank you, > > Ashish > >
