Hi,

It seems that the problem is caused by a problem in the Cassandra Spark
driver, and not in the plugin.

Since CASSANDRA-10217
<https://issues.apache.org/jira/browse/CASSANDRA-10217>  Cassandra 3.x
per-row indexes don't require to be created on a fake column anymore. Thus,
from Cassandra 3.x the "*CREATE CUSTOM INDEX %s ON %s(%s)*" column-based
syntax is replaced with the new "*CREATE CUSTOM INDEX %s ON %s()*"
row-based syntax.  However, DataStax Spark driver doesn't seem to support
this new feature yet.

When "com.datastax.spark.connector.RDDFunctions.saveToCassandra" is called
it tries to load the table schema and the index schema related to a table
column. Since this new index syntax does not have the fake-column anymore
it results in a NoSuchElementException due to an empty column name.

However, saveToCassandra works well if you execute the same example with
prior fake column syntax:












*CREATE KEYSPACE demoWITH REPLICATION = {'class' : 'SimpleStrategy',
'replication_factor': 1};USE demo;CREATE TABLE tweets (    id INT PRIMARY
KEY,    user TEXT,    body TEXT,    time TIMESTAMP,    latitude FLOAT,
longitude FLOAT,    lucene TEXT);*














*CREATE CUSTOM INDEX tweets_index ON tweets (lucene)USING
'com.stratio.cassandra.lucene.Index'WITH OPTIONS = {    'refresh_seconds' :
'1',    'schema' : '{        fields : {            id    : {type :
"integer"},            user  : {type : "string"},            body  : {type
: "text", analyzer : "english"},            time  : {type : "date", pattern
: "yyyy/MM/dd", sorted : true},            place : {type : "geo_point",
latitude:"latitude", longitude:"longitude"}        }    }'};*

Should we open a new JIRA about this or extend SPARKC-332
<https://datastax-oss.atlassian.net/browse/SPARKC-332> ?

Regards

Eduardo Alonso
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
<https://twitter.com/StratioBD>*

2016-03-29 14:02 GMT+02:00 Cleosson José Pirani de Souza <
cso...@daitangroup.com>:

> Hi Eduardo,
>
>
>  As I was not sure that is a bug, I preferred to send the e-mail to list
> first. It could be something was done wrong.
>
>  The versions are:
>
>    - Spark 1.6.0
>    - Cassandra 3.0.3
>    - Lucene plugin 3.0.3.1
>
>  I opened the bug. The link
> https://github.com/Stratio/cassandra-lucene-index/issues/109
>
>  If it is not a bug, let me know.
>
>
> Thanks,
>
> Cleosson
>
>
> ------------------------------
> *From:* Eduardo Alonso <eduardoalo...@stratio.com>
> *Sent:* Tuesday, March 29, 2016 6:57 AM
>
> *To:* user@cassandra.apache.org
> *Subject:* Re: Does saveToCassandra work with Cassandra Lucene plugin ?
>
>
> Hi Cleosson Jose,
>
> First of all, if you think this is a caused by a cassandra-lucene-index
> <https://github.com/Stratio/cassandra-lucene-index> bug, this user list
> is not the best way to report it. Please use github issues
> <https://github.com/Stratio/cassandra-lucene-index/issues> for this.
>
> Second, in order to reproduce this error,  i need to know which versions
> of cassandra, cassandra-lucene-index, spark and spark-cassandra-connector
> you are using
>
> Regards
>
> Eduardo Alonso
> Vía de las dos Castillas, 33, Ática 4, 3ª Planta
> 28224 Pozuelo de Alarcón, Madrid
> Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
> <https://twitter.com/StratioBD>*
>
> 2016-03-28 23:43 GMT+02:00 Cleosson José Pirani de Souza <
> cso...@daitangroup.com>:
>
>> Hi Jack,
>>
>>
>>  Yes, I used the exact same commands in the Stratio readme.
>>
>>
>> Thanks,
>>
>> Cleososn
>>
>>
>> ------------------------------
>> *From:* Jack Krupansky <jack.krupan...@gmail.com>
>> *Sent:* Monday, March 28, 2016 6:06 PM
>> *To:* user@cassandra.apache.org
>>
>> *Subject:* Re: Does saveToCassandra work with Cassandra Lucene plugin ?
>>
>> The exception message has an empty column name. Odd. Not sure if that is
>> a bug in the exception code or whether you actually have an empty column
>> name somewhere.
>>
>> Did you use the absolutely exact same commands to create the keyspace,
>> table, and custom index as in the Stratio readme?
>>
>> -- Jack Krupansky
>>
>> On Mon, Mar 28, 2016 at 4:57 PM, Cleosson José Pirani de Souza <
>> cso...@daitangroup.com> wrote:
>>
>>> Hi,
>>>
>>>  One important thing, if I remove the custom index using Lucene,
>>> saveToCassandra works.
>>>
>>>
>>> Thanks
>>>
>>> Cleosson
>>>
>>>
>>> ------------------------------
>>> *From:* Anuj Wadehra <anujw_2...@yahoo.co.in>
>>> *Sent:* Monday, March 28, 2016 3:27 PM
>>> *To:* user@cassandra.apache.org; Cleosson José Pirani de Souza;
>>> user@cassandra.apache.org
>>> *Subject:* Re: Does saveToCassandra work with Cassandra Lucene plugin ?
>>>
>>> I used it with Java and there, every field of Pojo must map to column
>>> names of the table. I think someone with Scala syntax knowledge can help
>>> you better.
>>>
>>>
>>> Thanks
>>> Anuj
>>>
>>> Sent from Yahoo Mail on Android
>>> <https://overview.mail.yahoo.com/mobile/?.src=Android>
>>>
>>> On Mon, 28 Mar, 2016 at 11:47 pm, Anuj Wadehra
>>> <anujw_2...@yahoo.co.in> wrote:
>>> With my limited experience with Spark, I can tell you that you need to
>>> make sure that all columns mentioned in somecolumns must be part of CQL
>>> schema of table.
>>>
>>>
>>> Thanks
>>> Anuj
>>>
>>> Sent from Yahoo Mail on Android
>>> <https://overview.mail.yahoo.com/mobile/?.src=Android>
>>>
>>> On Mon, 28 Mar, 2016 at 11:38 pm, Cleosson José Pirani de Souza
>>> <cso...@daitangroup.com> wrote:
>>>
>>>
>>>
>>> Hello,
>>>
>>>
>>>
>>> I am implementing the example on the github (
>>> https://github.com/Stratio/cassandra-lucene-index) and when I try to
>>> save the data using saveToCassandra I get the exception
>>> NoSuchElementException.
>>>  If I use CassandraConnector.withSessionDo I am able to add elements
>>> into Cassandra and no exception is raised.
>>>
>>>
>>>  The code :
>>> import org.apache.spark.{SparkConf, SparkContext, Logging}
>>> import com.datastax.spark.connector.cql.CassandraConnector
>>> import com.datastax.spark.connector._
>>>
>>> object App extends Logging{
>>>     def main(args: Array[String]) {
>>>
>>>         // Get the cassandra IP and create the spark context
>>>         val cassandraIP = System.getenv("CASSANDRA_IP");
>>>         val sparkConf = new SparkConf(true)
>>>                         .set("spark.cassandra.connection.host",
>>> cassandraIP)
>>>                         .set("spark.cleaner.ttl", "3600")
>>>                         .setAppName("Simple Spark Cassandra Example")
>>>
>>>
>>> *        val sc = new SparkContext(sparkConf)*
>>>
>>> *        // Works*
>>> *        CassandraConnector(sparkConf).withSessionDo { session =>*
>>> *           session.execute("INSERT INTO demo.tweets(id, user, body,
>>> time, latitude, longitude) VALUES (19, 'Name', 'Body', '2016-03-19
>>> 09:00:00-0300', 39, 39)")*
>>> *        }*
>>>
>>> *        // Does not work*
>>> *        val demo = sc.parallelize(Seq((9, "Name", "Body", "2016-03-29
>>> 19:00:00-0300", 29, 29)))*
>>> *        // Raises the exception*
>>> *        demo.saveToCassandra("demo", "tweets", SomeColumns("id",
>>> "user", "body", "time", "latitude", "longitude"))*
>>>
>>>
>>> *    } *
>>> *}*
>>>
>>>
>>>
>>>
>>>  The exception:
>>> *16/03/28 14:15:41 INFO CassandraConnector: Connected to Cassandra
>>> cluster: Test Cluster*
>>> *Exception in thread "main" java.util.NoSuchElementException: Column
>>>  not found in demo.tweets*
>>> at
>>> com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName$2.apply(Schema.scala:60)
>>> at
>>> com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName$2.apply(Schema.scala:60)
>>> at scala.collection.Map$WithDefault.default(Map.scala:52)
>>> at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>> at scala.collection.AbstractMap.apply(Map.scala:58)
>>> at
>>> com.datastax.spark.connector.cql.TableDef$$anonfun$9.apply(Schema.scala:153)
>>> at
>>> com.datastax.spark.connector.cql.TableDef$$anonfun$9.apply(Schema.scala:152)
>>> at
>>> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
>>> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>>> at
>>> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
>>> at com.datastax.spark.connector.cql.TableDef.<init>(Schema.scala:152)
>>> at
>>> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:283)
>>> at
>>> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:271)
>>> at
>>> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
>>> at scala.collection.immutable.Set$Set4.foreach(Set.scala:137)
>>> at
>>> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
>>> at
>>> com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:271)
>>> at
>>> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:295)
>>> at
>>> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:294)
>>> at
>>> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
>>> at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
>>> at
>>> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
>>> at
>>> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
>>> at
>>> com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:294)
>>> at
>>> com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:307)
>>> at
>>> com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:304)
>>> at
>>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:121)
>>> at
>>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:120)
>>> at
>>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
>>> at
>>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
>>> at
>>> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
>>> at
>>> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
>>> at
>>> com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
>>> at
>>> com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:304)
>>> at
>>> com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:275)
>>> at
>>> com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
>>> at com.webradar.spci.spark.cassandra.App$.main(App.scala:27)
>>> at com.webradar.spci.spark.cassandra.App.main(App.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>>
>>> Regards,
>>> Cleosson
>>>
>>>
>>>
>>>
>>>
>>
>

Reply via email to