Cassandra with Spark, select a few columns with case class

2018-03-25 Thread Guillermo Ortiz
I'm trying to get a few columns from a Cassandra table from Spark and put
them in a case class. If I want all the columns that I have in my case
class works. But, I only want to bring a few of them and don't have a
specific case class for each case.

I tried to overload constructor in the case class and define a normal class
but I didn't get to work.

//It doesn't work, it's normal because there aren't an specific contructor.
case class Father(idPadre: Int, name: String, lastName: String,
children: Map[Int,Son], hobbies: Map[Int,Hobbie],  lastUpdate: Date)
//It works, because it has the right contructor. I tried to do an
companion object and def others contructors but it didn't work
case class FatherEspecifica(idFather: Int, name: String, children: Map[Int,Son])
//Problems in compilation, I don't know why.
class FatherClaseNormal(idFather: Int, name: String, lastName: String,
children: Map[Int,Son], hobbies: Map[Int,Hobbie],  lastUpdate: Date){
  /**
* A secondary constructor.
*/
  def this(name: String) {
this(0, name, "", Map(), Map(), new Date());
println("\nNo last name or age given.")
  }
}

//I'm trying to get some a few columns and don't have to have all the
case classes and I would like to map directly to case class and don't
use CassandraRows.
joinRdd = rddAvro.joinWithCassandraTable[FatherXXX]("poc_udt",
"father",SomeColumns("id_father", "name", "children"))

CREATE TABLE IF NOT EXISTS poc_udt.father(
id_father int PRIMARY KEY,
name text,
last_name text,
children map>,
hobbies map>,
last_update timestamp
)

When I use a normal class the error is:

Error:(57, 67) No RowReaderFactory can be found for this type
Error occurred in an application involving default arguments.
val joinRdd =
rddAvro.joinWithCassandraTable[FatherClaseNormal]("poc_udt",
"father",SomeColumns("name"))


Re: replace dead node vs remove node

2018-03-25 Thread kurt greaves
Didn't read the blog but it's worth noting that if you replace the node and
give it a *different* ip address repairs will not be necessary as it will
receive writes during replacement. This works as long as you start up the
replacement node before HH window ends.

https://issues.apache.org/jira/browse/CASSANDRA-12344 and
https://issues.apache.org/jira/browse/CASSANDRA-11559 fixes this for same
address replacements (hopefully in 4.0)

On Fri., 23 Mar. 2018, 15:11 Anthony Grasso, 
wrote:

> Hi Peng,
>
> Correct, you would want to repair in either case.
>
> Regards,
> Anthony
>
>
> On Fri, 23 Mar 2018 at 14:09, Peng Xiao <2535...@qq.com> wrote:
>
>> Hi Anthony,
>>
>> there is a problem with replacing dead node as per the blog,if the
>> replacement process takes longer than max_hint_window_in_ms,we must run
>> repair to make the replaced node consistent again, since it missed ongoing
>> writes during bootstrapping.but for a great cluster,repair is a painful
>> process.
>>
>> Thanks,
>> Peng Xiao
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Anthony Grasso";
>> *发送时间:* 2018年3月22日(星期四) 晚上7:13
>> *收件人:* "user";
>> *主题:* Re: replace dead node vs remove node
>>
>> Hi Peng,
>>
>> Depending on the hardware failure you can do one of two things:
>>
>> 1. If the disks are intact and uncorrupted you could just use the disks
>> with the current data on them in the new node. Even if the IP address
>> changes for the new node that is fine. In that case all you need to do is
>> run repair on the new node. The repair will fix any writes the node missed
>> while it was down. This process is similar to the scenario in this blog
>> post:
>> http://thelastpickle.com/blog/2018/02/21/replace-node-without-bootstrapping.html
>>
>> 2. If the disks are inaccessible or corrupted, then use the method as
>> described in the blogpost you linked to. The operation is similar to
>> bootstrapping a new node. There is no need to perform any other remove or
>> join operation on the failed or new nodes. As per the blog post, you
>> definitely want to run repair on the new node as soon as it joins the
>> cluster. In this case here, the data on the failed node is effectively lost
>> and replaced with data from other nodes in the cluster.
>>
>> Hope this helps.
>>
>> Regards,
>> Anthony
>>
>>
>> On Thu, 22 Mar 2018 at 20:52, Peng Xiao <2535...@qq.com> wrote:
>>
>>> Dear All,
>>>
>>> when one node failure with hardware errors,it will be in DN status in
>>> the cluster.Then if we are not able to handle this error in three hours(max
>>> hints window),we will loss data,right?we have to run repair to keep the
>>> consistency.
>>> And as per
>>> https://blog.alteroot.org/articles/2014-03-12/replace-a-dead-node-in-cassandra.html,we
>>> can replace this dead node,is it the same as bootstrap new node?that means
>>> we don't need to remove node and rejoin?
>>> Could anyone please advise?
>>>
>>> Thanks,
>>> Peng Xiao
>>>
>>>
>>>
>>>
>>>


Deserialize Map[Int, UDT] to a case class from Spark Connector

2018-03-25 Thread Guillermo Ortiz
Hello,

I'm working with UDT's and spark connector with these dependencies:

2.11.12
2.0.2
2.0.7
3.4.0



org.apache.spark
spark-core_2.11
${spark.version}



org.apache.spark
spark-streaming_2.11
${spark.version}




com.datastax.spark
spark-cassandra-connector_2.11
${cassandra-conector.version}



com.datastax.cassandra
cassandra-driver-core
${cassandra-driver.version}


So, with these dependencies I'm using scala 2.11, but I get this error
*the GettableToMappedTypeConverter which can't deserialize TypeTags
due
to Scala 2.10 TypeTag limitation. They come back as nulls and therefore
you see this NPE.*

Why do I get this error if I'm using SCALA 2.11? I want to read a
MAP[Int,MyUDT>]
from Spark with the connector. The problem it's that if theree are any
field which it's not setted it's not possible.

If all fields are setted it works.



Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 0.0 (TID 0, localhost):
java.lang.NullPointerException: Requested a TypeTag of the
GettableToMappedTypeConverter which can't deserialize TypeTags due to Scala
2.10 TypeTag limitation. They come back as nulls and therefore you see this
NPE.
at
com.datastax.spark.connector.rdd.reader.GettableDataToMappedTypeConverter.targetTypeTag(GettableDataToMappedTypeConverter.scala:34)
at
com.datastax.spark.connector.types.TypeConverter$AbstractMapConverter.valueTypeTag(TypeConverter.scala:707)
at
com.datastax.spark.connector.types.TypeConverter$MapConverter$$typecreator45$1.apply(TypeConverter.scala:791)
at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
at
com.datastax.spark.connector.types.TypeConverter$class.targetTypeName(TypeConverter.scala:36)
at
com.datastax.spark.connector.types.TypeConverter$CollectionConverter.targetTypeName(TypeConverter.scala:682)
at
com.datastax.spark.connector.rdd.reader.GettableDataToMappedTypeConverter.tryConvert(GettableDataToMappedTypeConverter.scala:156)


Measuring eventual consistency latency

2018-03-25 Thread Jeronimo de A. Barros
I'd like to know if there is a reasonable method to measure how long take
to have the data available across all replica nodes in a multi DC
environment using LOCAL_ONE or LOCAL_QUORUM consistency levels.

If already there be a study about this topic in some place and someone
could point me the direction, it will be of great help.

Thanks !


Re: Deserialize Map[Int, UDT] to a case class from Spark Connector

2018-03-25 Thread Christophe Schmitz
Hi Guillermo

Which version of Spark are you using? Starting with Version 2.0, Spark is
built with Scala 2.11 by default. If you are using a prior version (which
looks like it's the case since your error message mention scala 2.10), you
might need to build it yourself from sources with Scala 2.11 support or to
upgrade your Spark cluster to 2.x

Cheers,

Christophe


On 26 March 2018 at 09:11, Guillermo Ortiz  wrote:

> Hello,
>
> I'm working with UDT's and spark connector with these dependencies:
>
> 2.11.12
> 2.0.2
> 2.0.7
> 3.4.0
>
>
> 
> org.apache.spark
> spark-core_2.11
> ${spark.version}
> 
>
> 
> org.apache.spark
> spark-streaming_2.11
> ${spark.version}
> 
>
>
> 
> com.datastax.spark
> spark-cassandra-connector_2.11
> ${cassandra-conector.version}
> 
>
> 
> com.datastax.cassandra
> cassandra-driver-core
> ${cassandra-driver.version}
> 
>
> So, with these dependencies I'm using scala 2.11, but I get this error *the 
> GettableToMappedTypeConverter which can't deserialize TypeTags due
> to Scala 2.10 TypeTag limitation. They come back as nulls and therefore
> you see this NPE.*
>
> Why do I get this error if I'm using SCALA 2.11? I want to read a 
> MAP[Int,MyUDT>]
> from Spark with the connector. The problem it's that if theree are any field 
> which it's not setted it's not possible.
>
> If all fields are setted it works.
>
>
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent
> failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): 
> java.lang.NullPointerException:
> Requested a TypeTag of the GettableToMappedTypeConverter which can't
> deserialize TypeTags due to Scala 2.10 TypeTag limitation. They come back
> as nulls and therefore you see this NPE.
> at com.datastax.spark.connector.rdd.reader.
> GettableDataToMappedTypeConverter.targetTypeTag(
> GettableDataToMappedTypeConverter.scala:34)
> at com.datastax.spark.connector.types.TypeConverter$
> AbstractMapConverter.valueTypeTag(TypeConverter.scala:707)
> at com.datastax.spark.connector.types.TypeConverter$
> MapConverter$$typecreator45$1.apply(TypeConverter.scala:791)
> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$
> lzycompute(TypeTags.scala:232)
> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
> at com.datastax.spark.connector.types.TypeConverter$class.
> targetTypeName(TypeConverter.scala:36)
> at com.datastax.spark.connector.types.TypeConverter$
> CollectionConverter.targetTypeName(TypeConverter.scala:682)
> at com.datastax.spark.connector.rdd.reader.
> GettableDataToMappedTypeConverter.tryConvert(
> GettableDataToMappedTypeConverter.scala:156)
>



-- 

*Christophe Schmitz - **VP Consulting*

AU: +61 4 03751980 / FR: +33 7 82022899

   


Read our latest technical blog posts here
. This email has been sent on behalf
of Instaclustr Pty. Limited (Australia) and Instaclustr Inc (USA). This
email and any attachments may contain confidential and legally
privileged information.  If you are not the intended recipient, do not copy
or disclose its content, but please reply to this email immediately and
highlight the error to the sender and then immediately delete the message.


Re: Measuring eventual consistency latency

2018-03-25 Thread Christophe Schmitz
Hi Jeronimo,

I am not sure that will address your exact request, but did you look at
this issue (resolved in 3.8) which adds a ned latency across DCs metrics?
https://issues.apache.org/jira/browse/CASSANDRA-11569

Cheers,

Christophe

On 26 March 2018 at 10:01, Jeronimo de A. Barros 
wrote:

> I'd like to know if there is a reasonable method to measure how long take
> to have the data available across all replica nodes in a multi DC
> environment using LOCAL_ONE or LOCAL_QUORUM consistency levels.
>
> If already there be a study about this topic in some place and someone
> could point me the direction, it will be of great help.
>
> Thanks !
>



-- 

*Christophe Schmitz - **VP Consulting*

AU: +61 4 03751980 / FR: +33 7 82022899

   


Read our latest technical blog posts here
. This email has been sent on behalf
of Instaclustr Pty. Limited (Australia) and Instaclustr Inc (USA). This
email and any attachments may contain confidential and legally
privileged information.  If you are not the intended recipient, do not copy
or disclose its content, but please reply to this email immediately and
highlight the error to the sender and then immediately delete the message.


Re: Measuring eventual consistency latency

2018-03-25 Thread Jeff Jirsa
Probably closer to https://issues.apache.org/jira/browse/CASSANDRA-13289


Will be in 4.0
-- 
Jeff Jirsa


> On Mar 25, 2018, at 4:44 PM, Christophe Schmitz  
> wrote:
> 
> Hi Jeronimo,
> 
> I am not sure that will address your exact request, but did you look at this 
> issue (resolved in 3.8) which adds a ned latency across DCs metrics?
> https://issues.apache.org/jira/browse/CASSANDRA-11569
> 
> Cheers,
> 
> Christophe
> 
>> On 26 March 2018 at 10:01, Jeronimo de A. Barros  
>> wrote:
>> I'd like to know if there is a reasonable method to measure how long take to 
>> have the data available across all replica nodes in a multi DC environment 
>> using LOCAL_ONE or LOCAL_QUORUM consistency levels.
>> 
>> If already there be a study about this topic in some place and someone could 
>> point me the direction, it will be of great help.
>> 
>> Thanks !
> 
> 
> 
> -- 
> Christophe Schmitz - VP Consulting
> AU: +61 4 03751980 / FR: +33 7 82022899
> 
> Read our latest technical blog posts here. This email has been sent on behalf 
> of Instaclustr Pty. Limited (Australia) and Instaclustr Inc (USA). This email 
> and any attachments may contain confidential and legally privileged 
> information.  If you are not the intended recipient, do not copy or disclose 
> its content, but please reply to this email immediately and highlight the 
> error to the sender and then immediately delete the message.