*Use Case:* To automate the process of data extraction (HDFS), data analysis (pySpark/sparkR) and saving the data back to HDFS programmatically.
*Prospective solutions:* 1. Create a remote server connectivity program in an IDE like pyCharm or RStudio and use it to retrieve the data from HDFS or else 2. Create the data retrieval code in python or R and then point the IDE to the remote server using TCP. *Problem:* How to achieve either of the prospective solution 1 or 2 defined above? Do you have any better solution then these, if yes please share? *What have I tried so far?* The server and 3 namenodes already installed with pyspark and I have checked pyspark works in standalone mode on all four servers. Pyspark works in standalone mode on my laptop too. I use the following code but I am not able to connect to the remote server. import os import sys try: from pyspark import SparkContext from pyspark import SparkConf print ("Pyspark sucess") except ImportError as e: print ("Error importing Spark Modules", e) conf = SparkConf() conf.setMaster("spark://10.210.250.400:7077") conf.setAppName("First_Remote_Spark_Program") sc = SparkContext(conf=conf) print ("connection succeeded with Master",conf) data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) print(distData) The stack trace of error is Pyspark sucess 15/08/01 14:08:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/08/01 14:08:24 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) at org.apache.hadoop.util.Shell.<clinit>(Shell.java:326) at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) at org.apache.hadoop.security.Groups.<init>(Groups.java:77) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232) at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162) at org.apache.spark.SparkContext.<init>(SparkContext.scala:301) at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) 15/08/01 14:08:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 15/08/01 14:08:26 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.210.250.400:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077 15/08/01 14:08:26 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: no further information: /10.210.250.400:7077 15/08/01 14:08:46 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.210.250.400:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077 15/08/01 14:08:46 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: no further information: /10.210.250.400:7077 15/08/01 14:09:06 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.210.250.400:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077 15/08/01 14:09:06 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: no further information: /10.210.250.400:7077 15/08/01 14:09:25 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 15/08/01 14:09:25 WARN SparkDeploySchedulerBackend: Application ID is not initialized yet. 15/08/01 14:09:25 ERROR OneForOneStrategy: java.lang.NullPointerException at org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/08/01 14:09:25 ERROR SparkContext: Error initializing SparkContext. java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext at org.apache.spark.SparkContext.org <http://org.apache.spark.sparkcontext.org/>$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103) at org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501) at org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005) at org.apache.spark.SparkContext.<init>(SparkContext.scala:543) at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Traceback (most recent call last): File "C:/Users/ashish dutt/PycharmProjects/KafkaToHDFS/local2Remote.py", line 26, in <module> sc = SparkContext(conf=conf) File "C:\spark-1.4.0\python\pyspark\context.py", line 113, in __init__ conf, jsc, profiler_cls) File "C:\spark-1.4.0\python\pyspark\context.py", line 165, in _do_init self._jsc = jsc or self._initialize_context(self._conf._jconf) File "C:\spark-1.4.0\python\pyspark\context.py", line 219, in _initialize_context return self._jvm.JavaSparkContext(jconf) File "C:\spark-1.4.0\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", line 701, in __call__ File "C:\spark-1.4.0\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext. : java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext at org.apache.spark.SparkContext.org <http://org.apache.spark.sparkcontext.org/>$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103) at org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501) at org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005) at org.apache.spark.SparkContext.<init>(SparkContext.scala:543) at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) Process finished with exit code 1 The spark-defaults.conf file is configured as follows #spark.eventLog.dir=hdfs://ABCD01:8020/user/spark/applicationHistory spark.eventLog.dir hdfs://10.210.250.400:8020/user/spark/eventlog spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer spark.shuffle.service.enabled true spark.shuffle.service.port 7337 spark.yarn.historyServer.address http://ABCD04:18088 <http://abcd04:18088/> spark.master spark://10.210.250.400:7077 spark.yarn.jar local:/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar spark.driver.extraLibraryPath /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop/lib/native spark.executor.extraLibraryPath /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop/lib/native spark.yarn.am.extraLibraryPath /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop/lib/native spark.logConf true The spark-env.sh file is configured as follows #!/usr/bin/env bash ## # Generated by Cloudera Manager and should not be modified directly ## SELF="$(cd $(dirname $BASH_SOURCE) && pwd)" if [ -z "$SPARK_CONF_DIR" ]; then export SPARK_CONF_DIR="$SELF" fi export SPARK_HOME=/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/spark export DEFAULT_HADOOP_HOME=/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop #export STANDALONE_SPARK_MASTER_HOST=`ABCD01` export SPARK_MASTER_IP=spark://10.210.250.400 export SPARK_MASTER_PORT=7077 export SPARK_WEBUI_PORT=18080 ### Path of Spark assembly jar in HDFS export SPARK_JAR_HDFS_PATH=${SPARK_JAR_HDFS_PATH:-''} export HADOOP_HOME=${HADOOP_HOME:-$DEFAULT_HADOOP_HOME} if [ -n "$HADOOP_HOME" ]; then LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${HADOOP_HOME}/lib/native fi SPARK_EXTRA_LIB_PATH="" if [ -n "$SPARK_EXTRA_LIB_PATH" ]; then LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$SPARK_EXTRA_LIB_PATH fi export LD_LIBRARY_PATH export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-$SPARK_CONF_DIR/yarn-conf} # This is needed to support old CDH versions that use a forked version # of compute-classpath.sh. export SCALA_LIBRARY_PATH=${SPARK_HOME}/lib # Set distribution classpath. This is only used in CDH 5.3 and later. export SPARK_DIST_CLASSPATH=$(paste -sd: "$SELF/classpath.txt") And the slaves.sh file is configured as 10.210.250.401 10.210.250.402 10.210.250.403 Please tell me how can I connect to the remote server using pycharm or any other IDE? Thank you, Ashish