Hi, It seems that the problem is caused by a problem in the Cassandra Spark driver, and not in the plugin.
Since CASSANDRA-10217 <https://issues.apache.org/jira/browse/CASSANDRA-10217> Cassandra 3.x per-row indexes don't require to be created on a fake column anymore. Thus, from Cassandra 3.x the "*CREATE CUSTOM INDEX %s ON %s(%s)*" column-based syntax is replaced with the new "*CREATE CUSTOM INDEX %s ON %s()*" row-based syntax. However, DataStax Spark driver doesn't seem to support this new feature yet. When "com.datastax.spark.connector.RDDFunctions.saveToCassandra" is called it tries to load the table schema and the index schema related to a table column. Since this new index syntax does not have the fake-column anymore it results in a NoSuchElementException due to an empty column name. However, saveToCassandra works well if you execute the same example with prior fake column syntax: *CREATE KEYSPACE demoWITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 1};USE demo;CREATE TABLE tweets ( id INT PRIMARY KEY, user TEXT, body TEXT, time TIMESTAMP, latitude FLOAT, longitude FLOAT, lucene TEXT);* *CREATE CUSTOM INDEX tweets_index ON tweets (lucene)USING 'com.stratio.cassandra.lucene.Index'WITH OPTIONS = { 'refresh_seconds' : '1', 'schema' : '{ fields : { id : {type : "integer"}, user : {type : "string"}, body : {type : "text", analyzer : "english"}, time : {type : "date", pattern : "yyyy/MM/dd", sorted : true}, place : {type : "geo_point", latitude:"latitude", longitude:"longitude"} } }'};* Should we open a new JIRA about this or extend SPARKC-332 <https://datastax-oss.atlassian.net/browse/SPARKC-332> ? Regards Eduardo Alonso Vía de las dos Castillas, 33, Ática 4, 3ª Planta 28224 Pozuelo de Alarcón, Madrid Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd <https://twitter.com/StratioBD>* 2016-03-29 14:02 GMT+02:00 Cleosson José Pirani de Souza < cso...@daitangroup.com>: > Hi Eduardo, > > > As I was not sure that is a bug, I preferred to send the e-mail to list > first. It could be something was done wrong. > > The versions are: > > - Spark 1.6.0 > - Cassandra 3.0.3 > - Lucene plugin 3.0.3.1 > > I opened the bug. The link > https://github.com/Stratio/cassandra-lucene-index/issues/109 > > If it is not a bug, let me know. > > > Thanks, > > Cleosson > > > ------------------------------ > *From:* Eduardo Alonso <eduardoalo...@stratio.com> > *Sent:* Tuesday, March 29, 2016 6:57 AM > > *To:* user@cassandra.apache.org > *Subject:* Re: Does saveToCassandra work with Cassandra Lucene plugin ? > > > Hi Cleosson Jose, > > First of all, if you think this is a caused by a cassandra-lucene-index > <https://github.com/Stratio/cassandra-lucene-index> bug, this user list > is not the best way to report it. Please use github issues > <https://github.com/Stratio/cassandra-lucene-index/issues> for this. > > Second, in order to reproduce this error, i need to know which versions > of cassandra, cassandra-lucene-index, spark and spark-cassandra-connector > you are using > > Regards > > Eduardo Alonso > Vía de las dos Castillas, 33, Ática 4, 3ª Planta > 28224 Pozuelo de Alarcón, Madrid > Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd > <https://twitter.com/StratioBD>* > > 2016-03-28 23:43 GMT+02:00 Cleosson José Pirani de Souza < > cso...@daitangroup.com>: > >> Hi Jack, >> >> >> Yes, I used the exact same commands in the Stratio readme. >> >> >> Thanks, >> >> Cleososn >> >> >> ------------------------------ >> *From:* Jack Krupansky <jack.krupan...@gmail.com> >> *Sent:* Monday, March 28, 2016 6:06 PM >> *To:* user@cassandra.apache.org >> >> *Subject:* Re: Does saveToCassandra work with Cassandra Lucene plugin ? >> >> The exception message has an empty column name. Odd. Not sure if that is >> a bug in the exception code or whether you actually have an empty column >> name somewhere. >> >> Did you use the absolutely exact same commands to create the keyspace, >> table, and custom index as in the Stratio readme? >> >> -- Jack Krupansky >> >> On Mon, Mar 28, 2016 at 4:57 PM, Cleosson José Pirani de Souza < >> cso...@daitangroup.com> wrote: >> >>> Hi, >>> >>> One important thing, if I remove the custom index using Lucene, >>> saveToCassandra works. >>> >>> >>> Thanks >>> >>> Cleosson >>> >>> >>> ------------------------------ >>> *From:* Anuj Wadehra <anujw_2...@yahoo.co.in> >>> *Sent:* Monday, March 28, 2016 3:27 PM >>> *To:* user@cassandra.apache.org; Cleosson José Pirani de Souza; >>> user@cassandra.apache.org >>> *Subject:* Re: Does saveToCassandra work with Cassandra Lucene plugin ? >>> >>> I used it with Java and there, every field of Pojo must map to column >>> names of the table. I think someone with Scala syntax knowledge can help >>> you better. >>> >>> >>> Thanks >>> Anuj >>> >>> Sent from Yahoo Mail on Android >>> <https://overview.mail.yahoo.com/mobile/?.src=Android> >>> >>> On Mon, 28 Mar, 2016 at 11:47 pm, Anuj Wadehra >>> <anujw_2...@yahoo.co.in> wrote: >>> With my limited experience with Spark, I can tell you that you need to >>> make sure that all columns mentioned in somecolumns must be part of CQL >>> schema of table. >>> >>> >>> Thanks >>> Anuj >>> >>> Sent from Yahoo Mail on Android >>> <https://overview.mail.yahoo.com/mobile/?.src=Android> >>> >>> On Mon, 28 Mar, 2016 at 11:38 pm, Cleosson José Pirani de Souza >>> <cso...@daitangroup.com> wrote: >>> >>> >>> >>> Hello, >>> >>> >>> >>> I am implementing the example on the github ( >>> https://github.com/Stratio/cassandra-lucene-index) and when I try to >>> save the data using saveToCassandra I get the exception >>> NoSuchElementException. >>> If I use CassandraConnector.withSessionDo I am able to add elements >>> into Cassandra and no exception is raised. >>> >>> >>> The code : >>> import org.apache.spark.{SparkConf, SparkContext, Logging} >>> import com.datastax.spark.connector.cql.CassandraConnector >>> import com.datastax.spark.connector._ >>> >>> object App extends Logging{ >>> def main(args: Array[String]) { >>> >>> // Get the cassandra IP and create the spark context >>> val cassandraIP = System.getenv("CASSANDRA_IP"); >>> val sparkConf = new SparkConf(true) >>> .set("spark.cassandra.connection.host", >>> cassandraIP) >>> .set("spark.cleaner.ttl", "3600") >>> .setAppName("Simple Spark Cassandra Example") >>> >>> >>> * val sc = new SparkContext(sparkConf)* >>> >>> * // Works* >>> * CassandraConnector(sparkConf).withSessionDo { session =>* >>> * session.execute("INSERT INTO demo.tweets(id, user, body, >>> time, latitude, longitude) VALUES (19, 'Name', 'Body', '2016-03-19 >>> 09:00:00-0300', 39, 39)")* >>> * }* >>> >>> * // Does not work* >>> * val demo = sc.parallelize(Seq((9, "Name", "Body", "2016-03-29 >>> 19:00:00-0300", 29, 29)))* >>> * // Raises the exception* >>> * demo.saveToCassandra("demo", "tweets", SomeColumns("id", >>> "user", "body", "time", "latitude", "longitude"))* >>> >>> >>> * } * >>> *}* >>> >>> >>> >>> >>> The exception: >>> *16/03/28 14:15:41 INFO CassandraConnector: Connected to Cassandra >>> cluster: Test Cluster* >>> *Exception in thread "main" java.util.NoSuchElementException: Column >>> not found in demo.tweets* >>> at >>> com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName$2.apply(Schema.scala:60) >>> at >>> com.datastax.spark.connector.cql.StructDef$$anonfun$columnByName$2.apply(Schema.scala:60) >>> at scala.collection.Map$WithDefault.default(Map.scala:52) >>> at scala.collection.MapLike$class.apply(MapLike.scala:141) >>> at scala.collection.AbstractMap.apply(Map.scala:58) >>> at >>> com.datastax.spark.connector.cql.TableDef$$anonfun$9.apply(Schema.scala:153) >>> at >>> com.datastax.spark.connector.cql.TableDef$$anonfun$9.apply(Schema.scala:152) >>> at >>> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) >>> at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) >>> at >>> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) >>> at com.datastax.spark.connector.cql.TableDef.<init>(Schema.scala:152) >>> at >>> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:283) >>> at >>> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchTables$1$2.apply(Schema.scala:271) >>> at >>> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) >>> at scala.collection.immutable.Set$Set4.foreach(Set.scala:137) >>> at >>> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) >>> at >>> com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchTables$1(Schema.scala:271) >>> at >>> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:295) >>> at >>> com.datastax.spark.connector.cql.Schema$$anonfun$com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1$2.apply(Schema.scala:294) >>> at >>> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) >>> at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) >>> at >>> scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) >>> at >>> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) >>> at >>> com.datastax.spark.connector.cql.Schema$.com$datastax$spark$connector$cql$Schema$$fetchKeyspaces$1(Schema.scala:294) >>> at >>> com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:307) >>> at >>> com.datastax.spark.connector.cql.Schema$$anonfun$fromCassandra$1.apply(Schema.scala:304) >>> at >>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:121) >>> at >>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withClusterDo$1.apply(CassandraConnector.scala:120) >>> at >>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) >>> at >>> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109) >>> at >>> com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139) >>> at >>> com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) >>> at >>> com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120) >>> at >>> com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:304) >>> at >>> com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:275) >>> at >>> com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36) >>> at com.webradar.spci.spark.cassandra.App$.main(App.scala:27) >>> at com.webradar.spci.spark.cassandra.App.main(App.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:497) >>> at >>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) >>> at >>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) >>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) >>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) >>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>> >>> >>> Regards, >>> Cleosson >>> >>> >>> >>> >>> >> >