You can use the "spark.cassandra.connection.local_dc" to restrict the query to a single data center. It might just be that you can't access the remote DC from the IP address returned internally when the client attempts to discover ring topology.
On Mon, Feb 23, 2015 at 4:52 PM, Bosung Seo <bos...@brightcloud.com> wrote: > Hi all, > > I'm trying to use Spark and Cassandra. > > I have two datacenter in different regions on AWS, and tried ran simple > table count program. > > However, I'm still getting * WARN TaskSchedulerImpl: Initial job has not > accepted any resources; * , and Spark can't finish the processing. > > The test table only has 571 rows and 2 small columns. I assume it doesn't > require a lot of memory for small table. > > I also tried increasing Cores and Ram in Spark config files, but the > result is still same. > > > -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > scala> import com.datastax.spark.connector._ > import com.datastax.spark.connector._ > > scala> import org.apache.spark.{SparkContext, SparkConf} > import org.apache.spark.{SparkContext, SparkConf} > > scala> val conf = new > SparkConf(true).set("spark.cassandra.connection.host", > "172.17.10.44").set("spark.cassandra.auth.username", > "masteruser").set("spark.cassandra.auth.password", "password") > conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@1cfffdf3 > > scala> val sc = new SparkContext("spark://172.17.10.182:7077", "test", > conf) > 15/02/23 21:56:21 INFO SecurityManager: Changing view acls to: root > 15/02/23 21:56:21 INFO SecurityManager: Changing modify acls to: root > 15/02/23 21:56:21 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(root); users > with modify permissions: Set(root) > 15/02/23 21:56:21 INFO Slf4jLogger: Slf4jLogger started > 15/02/23 21:56:21 INFO Remoting: Starting remoting > 15/02/23 21:56:21 INFO Utils: Successfully started service 'sparkDriver' > on port 41709. > 15/02/23 21:56:21 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://sparkDriver@ip-172-17-10-182:41709] > 15/02/23 21:56:21 INFO SparkEnv: Registering MapOutputTracker > 15/02/23 21:56:21 INFO SparkEnv: Registering BlockManagerMaster > 15/02/23 21:56:21 INFO DiskBlockManager: Created local directory at > /srv/spark/tmp/spark-9f50ea1b-e8eb-4cb8-8f48-d04e3ec525a2/spark-61a2d7fa-697e-4a61-80af-c3d72149f244 > 15/02/23 21:56:21 INFO MemoryStore: MemoryStore started with capacity > 534.5 MB > 15/02/23 21:56:21 INFO HttpFileServer: HTTP File server directory is > /srv/spark/tmp/spark-1c34ed81-1ea9-45b1-81dd-184f12b975f6/spark-7c001536-1b70-40ea-9013-14551ad05a29 > 15/02/23 21:56:21 INFO HttpServer: Starting HTTP Server > 15/02/23 21:56:21 INFO Utils: Successfully started service 'HTTP file > server' on port 51439. > 15/02/23 21:56:21 INFO Utils: Successfully started service 'SparkUI' on > port 4040. > 15/02/23 21:56:21 INFO SparkUI: Started SparkUI at > http://52.10.105.190:4040 > 15/02/23 21:56:21 INFO SparkContext: Added JAR > file:/home/ubuntu/spark-cassandra-connector/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar > at > http://172.17.10.182:51439/jars/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar > with timestamp 1424728581916 > 15/02/23 21:56:21 INFO AppClient$ClientActor: Connecting to master > spark://172.17.10.182:7077... > 15/02/23 21:56:21 INFO SparkDeploySchedulerBackend: Connected to Spark > cluster with app ID app-20150223215621-0010 > 15/02/23 21:56:21 INFO NettyBlockTransferService: Server created on 45474 > 15/02/23 21:56:21 INFO BlockManagerMaster: Trying to register BlockManager > 15/02/23 21:56:21 INFO BlockManagerMasterActor: Registering block manager > ip-172-17-10-182:45474 with 534.5 MB RAM, BlockManagerId(<driver>, > ip-172-17-10-182, 45474) > 15/02/23 21:56:21 INFO BlockManagerMaster: Registered BlockManager > 15/02/23 21:56:22 INFO AppClient$ClientActor: Executor added: > app-20150223215621-0010/0 on worker-20150223191054-ip-172-17-10-45-9000 > (ip-172-17-10-45:9000) with 2 cores > 15/02/23 21:56:22 INFO SparkDeploySchedulerBackend: Granted executor ID > app-20150223215621-0010/0 on hostPort ip-172-17-10-45:9000 with 2 cores, > 512.0 MB RAM > 15/02/23 21:56:22 INFO AppClient$ClientActor: Executor added: > app-20150223215621-0010/1 on worker-20150223191054-ip-172-17-10-47-9000 > (ip-172-17-10-47:9000) with 2 cores > 15/02/23 21:56:22 INFO SparkDeploySchedulerBackend: Granted executor ID > app-20150223215621-0010/1 on hostPort ip-172-17-10-47:9000 with 2 cores, > 512.0 MB RAM > 15/02/23 21:56:22 INFO AppClient$ClientActor: Executor added: > app-20150223215621-0010/2 on worker-20150223191055-ip-172-17-10-46-9000 > (ip-172-17-10-46:9000) with 2 cores > 15/02/23 21:56:22 INFO SparkDeploySchedulerBackend: Granted executor ID > app-20150223215621-0010/2 on hostPort ip-172-17-10-46:9000 with 2 cores, > 512.0 MB RAM > 15/02/23 21:56:22 INFO AppClient$ClientActor: Executor added: > app-20150223215621-0010/3 on worker-20150223191051-ip-172-17-10-44-9000 > (ip-172-17-10-44:9000) with 2 cores > 15/02/23 21:56:22 INFO SparkDeploySchedulerBackend: Granted executor ID > app-20150223215621-0010/3 on hostPort ip-172-17-10-44:9000 with 2 cores, > 512.0 MB RAM > 15/02/23 21:56:22 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/0 is now LOADING > 15/02/23 21:56:22 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/2 is now LOADING > 15/02/23 21:56:22 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/1 is now LOADING > 15/02/23 21:56:22 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/3 is now LOADING > 15/02/23 21:56:22 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/0 is now RUNNING > 15/02/23 21:56:22 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/1 is now RUNNING > 15/02/23 21:56:22 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/2 is now RUNNING > 15/02/23 21:56:22 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/3 is now RUNNING > 15/02/23 21:56:22 INFO EventLoggingListener: Logging events to > file:/tmp/spark-events//app-20150223215621-0010 > 15/02/23 21:56:22 INFO SparkDeploySchedulerBackend: SchedulerBackend is > ready for scheduling beginning after reached minRegisteredResourcesRatio: > 0.0 > sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3649aa92 > > scala> val rdd= sc.cassandraTable("keyspace", "table") > table: > com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.CassandraRow] > = CassandraRDD[0] at RDD at CassandraRDD.scala:50 > > scala> rdd.toArray.foreach(println) > warning: there were 1 deprecation warning(s); re-run with -deprecation for > details > 15/02/23 21:56:40 INFO Cluster: New Cassandra host /172.17.10.44:9042 > added > 15/02/23 21:56:40 INFO Cluster: New Cassandra host /172.17.10.62:9042 > added > *15/02/23 21:56:40 INFO LocalNodeFirstLoadBalancingPolicy: Added host > 172.17.10.62 (DC2) <- Datacenter in different region. * > 15/02/23 21:56:40 INFO Cluster: New Cassandra host /172.17.10.46:9042 > added > 15/02/23 21:56:40 INFO LocalNodeFirstLoadBalancingPolicy: Added host > 172.17.10.46 (BACK) > 15/02/23 21:56:40 INFO Cluster: New Cassandra host /172.17.10.71:9042 > added > *15/02/23 21:56:40 INFO LocalNodeFirstLoadBalancingPolicy: Added host > 172.17.10.71 (DC2)* > 15/02/23 21:56:40 INFO Cluster: New Cassandra host /172.17.10.159:9042 > added > *15/02/23 21:56:40 INFO LocalNodeFirstLoadBalancingPolicy: Added host > 172.17.10.159 (DC2)* > 15/02/23 21:56:40 INFO Cluster: New Cassandra host /172.17.10.45:9042 > added > 15/02/23 21:56:40 INFO LocalNodeFirstLoadBalancingPolicy: Added host > 172.17.10.45 (BACK) > 15/02/23 21:56:40 INFO Cluster: New Cassandra host /172.17.10.72:9042 > added > *15/02/23 21:56:40 INFO LocalNodeFirstLoadBalancingPolicy: Added host > 172.17.10.72 (DC2)* > 15/02/23 21:56:40 INFO Cluster: New Cassandra host /172.17.10.47:9042 > added > 15/02/23 21:56:40 INFO LocalNodeFirstLoadBalancingPolicy: Added host > 172.17.10.47 (BACK) > *15/02/23 21:56:40 INFO CassandraConnector: Connected to Cassandra > cluster: TestCassandra* > *15/02/23 21:56:41 INFO CassandraConnector: Disconnected from Cassandra > cluster: TestCassandra* > 15/02/23 21:56:45 INFO SparkContext: Starting job: toArray at <console>:21 > 15/02/23 21:56:45 INFO DAGScheduler: Got job 0 (toArray at <console>:21) > with 6 output partitions (allowLocal=false) > 15/02/23 21:56:45 INFO DAGScheduler: Final stage: Stage 0(toArray at > <console>:21) > 15/02/23 21:56:45 INFO DAGScheduler: Parents of final stage: List() > 15/02/23 21:56:45 INFO DAGScheduler: Missing parents: List() > 15/02/23 21:56:45 INFO DAGScheduler: Submitting Stage 0 (CassandraRDD[0] > at RDD at CassandraRDD.scala:50), which has no missing parents > 15/02/23 21:56:45 INFO MemoryStore: ensureFreeSpace(4536) called with > curMem=0, maxMem=560497950 > 15/02/23 21:56:45 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 4.4 KB, free 534.5 MB) > 15/02/23 21:56:46 INFO MemoryStore: ensureFreeSpace(2620) called with > curMem=4536, maxMem=560497950 > 15/02/23 21:56:46 INFO MemoryStore: Block broadcast_0_piece0 stored as > bytes in memory (estimated size 2.6 KB, free 534.5 MB) > 15/02/23 21:56:46 INFO BlockManagerInfo: Added broadcast_0_piece0 in > memory on ip-172-17-10-182:45474 (size: 2.6 KB, free: 534.5 MB) > 15/02/23 21:56:46 INFO BlockManagerMaster: Updated info of block > broadcast_0_piece0 > 15/02/23 21:56:46 INFO SparkContext: Created broadcast 0 from broadcast at > DAGScheduler.scala:838 > 15/02/23 21:56:46 INFO DAGScheduler: Submitting 6 missing tasks from Stage > 0 (CassandraRDD[0] at RDD at CassandraRDD.scala:50) > 15/02/23 21:56:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 6 tasks > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/0 is now EXITED (Command exited with code 1) > 15/02/23 21:56:54 INFO SparkDeploySchedulerBackend: Executor > app-20150223215621-0010/0 removed: Command exited with code 1 > 15/02/23 21:56:54 ERROR SparkDeploySchedulerBackend: Asked to remove > non-existent executor 0 > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor added: > app-20150223215621-0010/4 on worker-20150223191054-ip-172-17-10-45-9000 > (ip-172-17-10-45:9000) with 2 cores > 15/02/23 21:56:54 INFO SparkDeploySchedulerBackend: Granted executor ID > app-20150223215621-0010/4 on hostPort ip-172-17-10-45:9000 with 2 cores, > 512.0 MB RAM > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/2 is now EXITED (Command exited with code 1) > 15/02/23 21:56:54 INFO SparkDeploySchedulerBackend: Executor > app-20150223215621-0010/2 removed: Command exited with code 1 > 15/02/23 21:56:54 ERROR SparkDeploySchedulerBackend: Asked to remove > non-existent executor 2 > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor added: > app-20150223215621-0010/5 on worker-20150223191055-ip-172-17-10-46-9000 > (ip-172-17-10-46:9000) with 2 cores > 15/02/23 21:56:54 INFO SparkDeploySchedulerBackend: Granted executor ID > app-20150223215621-0010/5 on hostPort ip-172-17-10-46:9000 with 2 cores, > 512.0 MB RAM > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/4 is now LOADING > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/1 is now EXITED (Command exited with code 1) > 15/02/23 21:56:54 INFO SparkDeploySchedulerBackend: Executor > app-20150223215621-0010/1 removed: Command exited with code 1 > 15/02/23 21:56:54 ERROR SparkDeploySchedulerBackend: Asked to remove > non-existent executor 1 > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor added: > app-20150223215621-0010/6 on worker-20150223191054-ip-172-17-10-47-9000 > (ip-172-17-10-47:9000) with 2 cores > 15/02/23 21:56:54 INFO SparkDeploySchedulerBackend: Granted executor ID > app-20150223215621-0010/6 on hostPort ip-172-17-10-47:9000 with 2 cores, > 512.0 MB RAM > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/5 is now LOADING > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/3 is now EXITED (Command exited with code 1) > 15/02/23 21:56:54 INFO SparkDeploySchedulerBackend: Executor > app-20150223215621-0010/3 removed: Command exited with code 1 > 15/02/23 21:56:54 ERROR SparkDeploySchedulerBackend: Asked to remove > non-existent executor 3 > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor added: > app-20150223215621-0010/7 on worker-20150223191051-ip-172-17-10-44-9000 > (ip-172-17-10-44:9000) with 2 cores > 15/02/23 21:56:54 INFO SparkDeploySchedulerBackend: Granted executor ID > app-20150223215621-0010/7 on hostPort ip-172-17-10-44:9000 with 2 cores, > 512.0 MB RAM > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/6 is now LOADING > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/7 is now LOADING > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/4 is now RUNNING > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/5 is now RUNNING > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/6 is now RUNNING > 15/02/23 21:56:54 INFO AppClient$ClientActor: Executor updated: > app-20150223215621-0010/7 is now RUNNING > *15/02/23 21:57:01 WARN TaskSchedulerImpl: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory* > > -- > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > The logs show that it connected to Cassandra and disconnected. > > datacenter DC2 is in different regions and cant connect with that IP > addresses. > > What is the problem here?? > > I would appreciate any help. > > Thanks, > Bo. > -- ----------------- Nate McCall Austin, TX @zznate Co-Founder & Sr. Technical Consultant Apache Cassandra Consulting http://www.thelastpickle.com