And in case you are running in local mode try giving  more cores to spark with 
e.g. [5] – low number could be interfering with the tuning params which you can 
try to play with as well – all this is in the context of how those params 
interact with the Connection Pool and what that pool is doing in terms of 
Multithreading

 

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md
  

 

Tuning

The following properties set in SparkConf can be used to fine-tune the saving 
process, These values have been set to achieve stability and not performance. 
Changing these values may increase your performance based on your workload:

*       spark.cassandra.output.batch.size.rows: number of rows per single 
batch; default is 'auto' which means the connector will adjust the number of 
rows based on the amount of data in each row
*       spark.cassandra.output.batch.size.bytes: maximum total size of the 
batch in bytes; defaults to 1 kB.
*       spark.cassandra.output.batch.grouping.key: determines how insert 
statements are grouped into batches; available values are: 

*       none: a batch may contain any statements
*       replica_set: a batch may contain only statements to be written to the 
same replica set
*       partition (default): a batch may contain only statements for rows 
sharing the same partition key value

*       spark.cassandra.output.batch.buffer.size: how many batches per single 
Spark task can be stored in memory before sending to Cassandra; default 1000
*       spark.cassandra.output.concurrent.writes: maximum number of batches 
executed in parallel by a single Spark task; defaults to 5
*       spark.cassandra.output.consistency.level: consistency level for 
writing; defaults to LOCAL_ONE.
*       spark.cassandra.output.throughput_mb_per_sec: maximum write throughput 
allowed per single core in MB/s limit this on long (+8 hour) runs to 70% of 
your max throughput as seen on a smaller job for stability

 

 

From: Sergio Jiménez Barrio [mailto:drarse.a...@gmail.com] 
Sent: Sunday, May 10, 2015 12:59 PM
To: Evo Eftimov
Subject: Re: Spark streaming closes with Cassandra Conector

 

How Can I see this? Thanks Evo

 

2015-05-10 13:36 GMT+02:00 Evo Eftimov <evo.efti...@isecc.com>:

And one other suggestion in relation to the connection pool line of enquiry - 
check whether your cassandra service is configured to allow only one session 
per e.g. User

 

I think the error is generated inside thr connection pool when it tries to 
initialize a connection after the first one

 

 

Sent from Samsung Mobile

 

-------- Original message --------

From: Evo Eftimov 

Date:2015/05/10 12:02 (GMT+00:00) 

To: 'Gerard Maas' 

Cc: 'Sergio Jiménez Barrio' ,'spark users' 

Subject: RE: Spark streaming closes with Cassandra Conector 

 

Hmm there is also a Connection Pool involved and such things (especially while 
still rough on the edges) may behave erratically in a distributed multithreaded 
environment 

 

Can you try forEachPartition and  foreach  together – this will create a 
slightly different multithreading execution and distribution profile which may 
skip a potential error in the Connection Pool code   

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Sunday, May 10, 2015 11:56 AM
To: Evo Eftimov
Cc: Sergio Jiménez Barrio; spark users
Subject: Re: Spark streaming closes with Cassandra Conector

 

I'm familiar with the TableWriter code and that log only appears if the write 
actually succeeded. (See 
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala)

 

Thinking infrastructure, we see that it's always trying to reach 'localhost'. 
Are you running 1 node test in local mode?  Otherwise, there's something wrong 
with the way you're configuring Cassandra or the connection to it  (always 
tempted to say "her" :-)  ).

 

-kr, Gerard.

 

On Sun, May 10, 2015 at 12:47 PM, Evo Eftimov <evo.efti...@isecc.com> wrote:

I think the message that it has written 2 rows is misleading 

 

If you look further down you will see that it could not initialize a connection 
pool for Casandra (presumably while trying to write the previously mentioned 2 
rows)

 

Another confirmation of this hypothesis is the phrase “error during Transport 
Initialization” – so all these stuff points out in the direction of 
Infrastructure or Configuration issues – check you Casandra service and how you 
connect to it etc mate 

 

From: Gerard Maas [mailto:gerard.m...@gmail.com] 
Sent: Sunday, May 10, 2015 11:33 AM
To: Sergio Jiménez Barrio; spark users
Subject: Re: Spark streaming closes with Cassandra Conector

 

It successfully writes some data and fails afterwards, like the host or 
connection goes down. Weird.

 

Maybe you should post this question on the Spark-Cassandra connector group:

https://groups.google.com/a/lists.datastax.com/forum/#!forum/spark-connector-user

 

 

-kr, Gerard.

 

 

On Sun, May 10, 2015 at 12:23 PM, Sergio Jiménez Barrio <drarse.a...@gmail.com> 
wrote:

This is:


15/05/10 12:20:08 INFO TableWriter: Wrote 2 rows to ataques.attacks in 0,016 s.
15/05/10 12:20:08 INFO LocalNodeFirstLoadBalancingPolicy: Suspected host 
127.0.0.1 (datacenter1)
15/05/10 12:20:08 ERROR Session: Error creating pool to /127.0.0.1:9042
com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042] Unexpected 
error during transport initialization 
(com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error writing: 
Closed channel)
    at 
com.datastax.driver.core.Connection.initializeTransport(Connection.java:186)
    at com.datastax.driver.core.Connection.<init>(Connection.java:116)
    at 
com.datastax.driver.core.PooledConnection.<init>(PooledConnection.java:32)
    at com.datastax.driver.core.Connection$Factory.open(Connection.java:586)
    at 
com.datastax.driver.core.DynamicConnectionPool.<init>(DynamicConnectionPool.java:74)
    at 
com.datastax.driver.core.HostConnectionPool.newInstance(HostConnectionPool.java:33)
    at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:231)
    at com.datastax.driver.core.SessionManager$2.call(SessionManager.java:224)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at 
com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at 
com.google.common.util.concurrent.AbstractListeningExecutorService.submit(AbstractListeningExecutorService.java:61)
    at 
com.datastax.driver.core.SessionManager.forceRenewPool(SessionManager.java:224)
    at com.datastax.driver.core.Cluster$Manager.onUp(Cluster.java:1469)
    at com.datastax.driver.core.Cluster$Manager.access$1100(Cluster.java:1144)
    at com.datastax.driver.core.Cluster$Manager$4.runMayThrow(Cluster.java:1562)
    at 
com.datastax.driver.core.ExceptionCatchingRunnable.run(ExceptionCatchingRunnable.java:32)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Error 
writing: Closed channel
    at 
com.datastax.driver.core.Connection$1.operationComplete(Connection.java:432)
    at 
org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427)
    at 
org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:413)
    at 
org.jboss.netty.channel.DefaultChannelFuture.setFailure(DefaultChannelFuture.java:380)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:248)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:151)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296)
    at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at 
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at 
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    ... 3 more
15/05/10 12:20:08 ERROR ControlConnection: [Control connection] Cannot connect 
to any host, scheduling retry in 1000 milliseconds

Thanks!

 

2015-05-10 0:58 GMT+02:00 Gerard Maas <gerard.m...@gmail.com>:

Hola Sergio,

 

It would help if you added the error message + stack trace.

 

-kr, Gerard.

 

On Sat, May 9, 2015 at 11:32 PM, Sergio Jiménez Barrio <drarse.a...@gmail.com> 
wrote:

I am trying save some data in Cassandra in app with spark Streaming:

 

Messages.foreachRDD {

 . . .

CassandraRDD.saveToCassandra("test","test")

}

 

When I run, the app is closes when I recibe data or can't connect with 
Cassandra.

 

Some idea? Thanks 



-- 
Atte. Sergio Jiménez

 

 

 

 

 

Reply via email to