Merge branch 'cassandra-2.0' into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/57e959d8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/57e959d8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/57e959d8

Branch: refs/heads/trunk
Commit: 57e959d88af5f1f88b7c2c00173721947ddcc9bf
Parents: 41325c3 ef33f95
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Wed Nov 27 10:44:21 2013 -0600
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Wed Nov 27 10:44:21 2013 -0600

----------------------------------------------------------------------
 build.xml                                       |  12 +-
 test/Test.iml                                   | 214 +++++
 .../cassandra/pig/CqlTableDataTypeTest.java     | 461 +++++++++++
 .../org/apache/cassandra/pig/CqlTableTest.java  | 254 ++++++
 .../org/apache/cassandra/pig/PigTestBase.java   | 185 +++++
 .../pig/ThriftColumnFamilyDataTypeTest.java     | 220 +++++
 .../cassandra/pig/ThriftColumnFamilyTest.java   | 822 +++++++++++++++++++
 test/pig/org/apache/pig/test/MiniCluster.java   |  82 ++
 .../org/apache/pig/test/MiniGenericCluster.java | 122 +++
 .../cassandra/pig/CqlTableDataTypeTest.java     | 461 -----------
 .../org/apache/cassandra/pig/CqlTableTest.java  | 254 ------
 .../org/apache/cassandra/pig/PigTestBase.java   | 185 -----
 .../pig/ThriftColumnFamilyDataTypeTest.java     | 220 -----
 .../cassandra/pig/ThriftColumnFamilyTest.java   | 822 -------------------
 test/unit/org/apache/pig/test/MiniCluster.java  |  82 --
 .../org/apache/pig/test/MiniGenericCluster.java | 122 ---
 16 files changed, 2367 insertions(+), 2151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/57e959d8/build.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57e959d8/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
----------------------------------------------------------------------
diff --cc test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
index 0000000,223cbf4..9369a18
mode 000000,100644..100644
--- a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
+++ b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
@@@ -1,0 -1,827 +1,822 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.cassandra.pig;
+ 
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.nio.charset.CharacterCodingException;
+ import java.util.Iterator;
+ 
+ import org.apache.cassandra.cli.CliMain;
 -import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.thrift.AuthenticationException;
+ import org.apache.cassandra.thrift.AuthorizationException;
+ import org.apache.cassandra.thrift.Cassandra;
+ import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 -import org.apache.cassandra.thrift.ColumnParent;
+ import org.apache.cassandra.thrift.ColumnPath;
+ import org.apache.cassandra.thrift.ConsistencyLevel;
+ import org.apache.cassandra.thrift.InvalidRequestException;
+ import org.apache.cassandra.thrift.NotFoundException;
+ import org.apache.cassandra.thrift.TimedOutException;
+ import org.apache.cassandra.thrift.UnavailableException;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.pig.data.DataBag;
+ import org.apache.pig.data.DataByteArray;
+ import org.apache.pig.data.Tuple;
+ import org.apache.thrift.TException;
+ import org.apache.thrift.transport.TTransportException;
+ import org.junit.Assert;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ public class ThriftColumnFamilyTest extends PigTestBase
+ {    
+     private static String[] statements = {
+             "create keyspace thriftKs with placement_strategy = 
'org.apache.cassandra.locator.SimpleStrategy' and" +
+             " strategy_options={replication_factor:1};",
+             "use thriftKs;",
+ 
+             "create column family SomeApp " +
+                     " with comparator = UTF8Type " +
+                     " and default_validation_class = UTF8Type " +
+                     " and key_validation_class = UTF8Type " +
+                     " and column_metadata = [{column_name: name, 
validation_class: UTF8Type, index_type: KEYS}, " +
+                     "{column_name: vote_type, validation_class: UTF8Type}, " +
+                     "{column_name: rating, validation_class: Int32Type}, " +
+                     "{column_name: score, validation_class: LongType}, " +
+                     "{column_name: percent, validation_class: FloatType}, " +
+                     "{column_name: atomic_weight, validation_class: 
DoubleType}, " +
+                     "{column_name: created, validation_class: DateType},]; ",
+ 
+              "create column family CopyOfSomeApp " +
+                     "with key_validation_class = UTF8Type " +
+                     "and default_validation_class = UTF8Type " +
+                     "and comparator = UTF8Type " +
+                     "and column_metadata = " +
+                     "[ " +
+                         "{column_name: name, validation_class: UTF8Type, 
index_type: KEYS}, " +
+                         "{column_name: vote_type, validation_class: 
UTF8Type}, " +
+                         "{column_name: rating, validation_class: Int32Type}, 
" +
+                         "{column_name: score, validation_class: LongType}, " +
+                         "{column_name: percent, validation_class: FloatType}, 
" +
+                         "{column_name: atomic_weight, validation_class: 
DoubleType}, " +
+                         "{column_name: created, validation_class: DateType}, 
" +
+                     "];",
+ 
+              "set SomeApp['foo']['name'] = 'User Foo';",
+              "set SomeApp['foo']['vote_type'] = 'like';",
+              "set SomeApp['foo']['rating'] = 8;",
+              "set SomeApp['foo']['score'] = 125000;",
+              "set SomeApp['foo']['percent'] = '85.0';",
+              "set SomeApp['foo']['atomic_weight'] = '2.7182818284590451';",
+              "set SomeApp['foo']['created'] = 1335890877;",
+ 
+              "set SomeApp['bar']['name'] = 'User Bar';",
+              "set SomeApp['bar']['vote_type'] = 'like';",
+              "set SomeApp['bar']['rating'] = 9;",
+              "set SomeApp['bar']['score'] = 15000;",
+              "set SomeApp['bar']['percent'] = '35.0';",
+              "set SomeApp['bar']['atomic_weight'] = '3.1415926535897931';",
+              "set SomeApp['bar']['created'] = 1335890877;",
+ 
+              "set SomeApp['baz']['name'] = 'User Baz';",
+              "set SomeApp['baz']['vote_type'] = 'dislike';",
+              "set SomeApp['baz']['rating'] = 3;",
+              "set SomeApp['baz']['score'] = 512000;",
+              "set SomeApp['baz']['percent'] = '95.3';",
+              "set SomeApp['baz']['atomic_weight'] = '1.61803399';",
+              "set SomeApp['baz']['created'] = 1335890877;",
+              "set SomeApp['baz']['extra1'] = 'extra1';",
+              "set SomeApp['baz']['extra2'] = 'extra2';",
+              "set SomeApp['baz']['extra3'] = 'extra3';",
+ 
+              "set SomeApp['qux']['name'] = 'User Qux';",
+              "set SomeApp['qux']['vote_type'] = 'dislike';",
+              "set SomeApp['qux']['rating'] = 2;",
+              "set SomeApp['qux']['score'] = 12000;",
+              "set SomeApp['qux']['percent'] = '64.7';",
+              "set SomeApp['qux']['atomic_weight'] = '0.660161815846869';",
+              "set SomeApp['qux']['created'] = 1335890877;",
+              "set SomeApp['qux']['extra1'] = 'extra1';",
+              "set SomeApp['qux']['extra2'] = 'extra2';",
+              "set SomeApp['qux']['extra3'] = 'extra3';",
+              "set SomeApp['qux']['extra4'] = 'extra4';",
+              "set SomeApp['qux']['extra5'] = 'extra5';",
+              "set SomeApp['qux']['extra6'] = 'extra6';",
+              "set SomeApp['qux']['extra7'] = 'extra7';",
+ 
+              "create column family U8 with " +
+                      "key_validation_class = UTF8Type and " +
+                      "comparator = UTF8Type;",
+                      
+              "create column family Bytes with " +
+                       "key_validation_class = BytesType and " +
+                       "comparator = UTF8Type;",
+ 
+              "set U8['foo']['x'] = ascii('Z');",
+              "set Bytes[ascii('foo')]['x'] = ascii('Z');",
+ 
+              "create column family CC with " +
+                        "key_validation_class = UTF8Type and " +
+                        "default_validation_class=CounterColumnType " +
+                        "and comparator=UTF8Type;",
+ 
+              "incr CC['chuck']['kick'];",
+              "incr CC['chuck']['kick'];",
+              "incr CC['chuck']['kick'];",
+              "incr CC['chuck']['fist'];",
+ 
+              "create column family Compo " +
+                        "with key_validation_class = UTF8Type " +
+                        "and default_validation_class = UTF8Type " +
+                        "and comparator = 'CompositeType(UTF8Type,UTF8Type)';",
+ 
+              "set Compo['punch']['bruce:lee'] = 'ouch';",
+              "set Compo['punch']['bruce:bruce'] = 'hunh?';",
+              "set Compo['kick']['bruce:lee'] = 'oww';",
+              "set Compo['kick']['bruce:bruce'] = 'watch it, mate';",
+ 
+              "create column family CompoInt " +
+                        "with key_validation_class = UTF8Type " +
+                        "and default_validation_class = UTF8Type " +
+                        "and comparator = 'CompositeType(LongType,LongType)';",
+ 
+             "set CompoInt['clock']['1:0'] = 'z';",
+             "set CompoInt['clock']['1:30'] = 'zzzz';",
+             "set CompoInt['clock']['2:30'] = 'daddy?';",
+             "set CompoInt['clock']['6:30'] = 'coffee...';",
+ 
+             "create column family CompoIntCopy " +
+                         "with key_validation_class = UTF8Type " +
+                         "and default_validation_class = UTF8Type " +
+                         "and comparator = 
'CompositeType(LongType,LongType)';",
+ 
+             "create column family CompoKey " +
+                         "with key_validation_class = 
'CompositeType(UTF8Type,LongType)' " +
+                         "and default_validation_class = UTF8Type " +
+                         "and comparator = LongType;",
+ 
+             "set CompoKey['clock:10']['1'] = 'z';",
+             "set CompoKey['clock:20']['1'] = 'zzzz';",
+             "set CompoKey['clock:30']['2'] = 'daddy?';",
+             "set CompoKey['clock:40']['6'] = 'coffee...';",
+ 
+             "create column family CompoKeyCopy " +
+                         "with key_validation_class = 
'CompositeType(UTF8Type,LongType)' " +
+                         "and default_validation_class = UTF8Type " +
+                         "and comparator = LongType;"
+     };
+ 
+     @BeforeClass
 -    public static void setup() throws TTransportException, IOException, 
InterruptedException, ConfigurationException,
++    public static void setup() throws TTransportException, IOException,
+                                       AuthenticationException, 
AuthorizationException, InvalidRequestException, UnavailableException, 
TimedOutException, TException, NotFoundException, CharacterCodingException, 
ClassNotFoundException, NoSuchFieldException, IllegalAccessException, 
InstantiationException
+     {
+         startCassandra();
+         setupDataByCli(statements);
+         startHadoopCluster();
+     }
+ 
+     @Test
 -    public void testCqlStorage() throws IOException, ClassNotFoundException, 
TException, TimedOutException, NotFoundException, InvalidRequestException, 
NoSuchFieldException, UnavailableException, IllegalAccessException, 
InstantiationException, AuthenticationException, AuthorizationException
++    public void testCqlStorage() throws IOException
+     {
+         //regular thrift column families
+         pig.registerQuery("data = load 'cql://thriftKs/SomeApp?" + 
defaultParameters + "' using CqlStorage();");
+ 
+         //(bar,3.141592653589793,1335890877,User Bar,35.0,9,15000,like)
+         //(baz,1.61803399,1335890877,User Baz,95.3,3,512000,dislike)
+         //(foo,2.718281828459045,1335890877,User Foo,85.0,8,125000,like)
+         //(qux,0.660161815846869,1335890877,User Qux,64.7,2,12000,dislike)
+ 
+         //{key: chararray,atomic_weight: double,created: long,name: 
chararray,percent: float,rating: int,score: long,vote_type: chararray}
+         Iterator<Tuple> it = pig.openIterator("data");
+         int count = 0;
+         while (it.hasNext()) {
+             count ++;
+             Tuple t = it.next();
+             if (count == 1)
+             {
+                 Assert.assertEquals(t.get(0), "bar");
+                 Assert.assertEquals(t.get(1), 3.141592653589793d);
+                 Assert.assertEquals(t.get(3), "User Bar");
+                 Assert.assertEquals(t.get(4), 35.0f);
+                 Assert.assertEquals(t.get(5), 9);
+                 Assert.assertEquals(t.get(6), 15000L);
+                 Assert.assertEquals(t.get(7), "like");
+             }
+             else if (count == 2)
+             {
+                 Assert.assertEquals(t.get(0), "baz");
+                 Assert.assertEquals(t.get(1), 1.61803399d);
+                 Assert.assertEquals(t.get(3), "User Baz");
+                 Assert.assertEquals(t.get(4), 95.3f);
+                 Assert.assertEquals(t.get(5), 3);
+                 Assert.assertEquals(t.get(6), 512000L);
+                 Assert.assertEquals(t.get(7), "dislike");
+             }else if (count == 3)
+             {
+                 Assert.assertEquals(t.get(0), "foo");
+                 Assert.assertEquals(t.get(1), 2.718281828459045d);
+                 Assert.assertEquals(t.get(3), "User Foo");
+                 Assert.assertEquals(t.get(4), 85.0f);
+                 Assert.assertEquals(t.get(5), 8);
+                 Assert.assertEquals(t.get(6), 125000L);
+                 Assert.assertEquals(t.get(7), "like");
+             }
+             else if (count == 4)
+             {
+                 Assert.assertEquals(t.get(0), "qux");
+                 Assert.assertEquals(t.get(1), 0.660161815846869d);
+                 Assert.assertEquals(t.get(3), "User Qux");
+                 Assert.assertEquals(t.get(4), 64.7f);
+                 Assert.assertEquals(t.get(5), 2);
+                 Assert.assertEquals(t.get(6), 12000L);
+                 Assert.assertEquals(t.get(7), "dislike");
+             }
+         }
+         Assert.assertEquals(count, 4);
+ 
+         //Test counter colun family
+         pig.registerQuery("cc_data = load 'cql://thriftKs/CC?" + 
defaultParameters + "' using CqlStorage();");
+ 
+         //(chuck,fist,1)
+         //(chuck,kick,3)
+ 
+         // {key: chararray,column1: chararray,value: long}
+         it = pig.openIterator("cc_data");
+         count = 0;
+         while (it.hasNext()) {
+             count ++;
+             Tuple t = it.next();
+             if (count == 1)
+             {
+                 Assert.assertEquals(t.get(0), "chuck");
+                 Assert.assertEquals(t.get(1), "fist");
+                 Assert.assertEquals(t.get(2), 1L);
+             }
+             else if (count == 2)
+             {
+                 Assert.assertEquals(t.get(0), "chuck");
+                 Assert.assertEquals(t.get(1), "kick");
+                 Assert.assertEquals(t.get(2), 3L);
+             }
+         }
+         Assert.assertEquals(count, 2);
+ 
+         //Test composite column family
+         pig.registerQuery("compo_data = load 'cql://thriftKs/Compo?" + 
defaultParameters + "' using CqlStorage();");
+ 
+         //(kick,bruce,bruce,watch it, mate)
+         //(kick,bruce,lee,oww)
+         //(punch,bruce,bruce,hunh?)
+         //(punch,bruce,lee,ouch)
+ 
+         //{key: chararray,column1: chararray,column2: chararray,value: 
chararray}
+         it = pig.openIterator("compo_data");
+         count = 0;
+         while (it.hasNext()) {
+             count ++;
+             Tuple t = it.next();
+             if (count == 1)
+             {
+                 Assert.assertEquals(t.get(0), "kick");
+                 Assert.assertEquals(t.get(1), "bruce");
+                 Assert.assertEquals(t.get(2), "bruce");
+                 Assert.assertEquals(t.get(3), "watch it, mate");
+             }
+             else if (count == 2)
+             {
+                 Assert.assertEquals(t.get(0), "kick");
+                 Assert.assertEquals(t.get(1), "bruce");
+                 Assert.assertEquals(t.get(2), "lee");
+                 Assert.assertEquals(t.get(3), "oww");
+             }
+             else if (count == 3)
+             {
+                 Assert.assertEquals(t.get(0), "punch");
+                 Assert.assertEquals(t.get(1), "bruce");
+                 Assert.assertEquals(t.get(2), "bruce");
+                 Assert.assertEquals(t.get(3), "hunh?");
+             }
+             else if (count == 4)
+             {
+                 Assert.assertEquals(t.get(0), "punch");
+                 Assert.assertEquals(t.get(1), "bruce");
+                 Assert.assertEquals(t.get(2), "lee");
+                 Assert.assertEquals(t.get(3), "ouch");
+             }
+         }
+         Assert.assertEquals(count, 4);
+     }
+ 
+     @Test
 -    public void testCassandraStorageSchema() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException
++    public void testCassandraStorageSchema() throws IOException
+     {
+         //results: 
(qux,(atomic_weight,0.660161815846869),(created,1335890877),(name,User 
Qux),(percent,64.7),
+         //(rating,2),(score,12000),(vote_type,dislike),{(extra1,extra1),
+         //(extra2,extra2),(extra3,extra3),
+         //(extra4,extra4),(extra5,extra5),
+         //(extra6,extra6),(extra7,extra7)})
+         pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + 
defaultParameters + "' USING CassandraStorage();");
+ 
+         //schema: {key: chararray,atomic_weight: (name: chararray,value: 
double),created: (name: chararray,value: long),
+         //name: (name: chararray,value: chararray),percent: (name: 
chararray,value: float),
+         //rating: (name: chararray,value: int),score: (name: chararray,value: 
long),
+         //vote_type: (name: chararray,value: chararray),columns: {(name: 
chararray,value: chararray)}}
+         Iterator<Tuple> it = pig.openIterator("rows");
 -        int count = 0;
+         if (it.hasNext()) {
+             Tuple t = it.next();
+             String rowKey =  t.get(0).toString();
+             if ("qux".equals(rowKey))
+             {
+                 Tuple column = (Tuple) t.get(1);
+                 Assert.assertEquals(column.get(0), "atomic_weight");
+                 Assert.assertEquals(column.get(1), 0.660161815846869d);
+                 column = (Tuple) t.get(3);
+                 Assert.assertEquals(column.get(0), "name");
+                 Assert.assertEquals(column.get(1), "User Qux");
+                 column = (Tuple) t.get(4);
+                 Assert.assertEquals(column.get(0), "percent");
+                 Assert.assertEquals(column.get(1), 64.7f);
+                 column = (Tuple) t.get(5);
+                 Assert.assertEquals(column.get(0), "rating");
+                 Assert.assertEquals(column.get(1), 2);
+                 column = (Tuple) t.get(6);
+                 Assert.assertEquals(column.get(0), "score");
+                 Assert.assertEquals(column.get(1), 12000L);
+                 column = (Tuple) t.get(7);
+                 Assert.assertEquals(column.get(0), "vote_type");
+                 Assert.assertEquals(column.get(1), "dislike");
+                 DataBag columns = (DataBag) t.get(8);
+                 Iterator<Tuple> iter = columns.iterator();
+                 int i = 0;
+                 while(iter.hasNext())
+                 {
+                     i++;
+                     column = iter.next();
+                     Assert.assertEquals(column.get(0), "extra"+i);
+                 }
+                 Assert.assertEquals(7, columns.size());
+             }
+ 
+         }
+     }
+ 
+     @Test
 -    public void testCassandraStorageFullCopy() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException, AuthenticationException, 
AuthorizationException
++    public void testCassandraStorageFullCopy() throws IOException, 
TException, TimedOutException, NotFoundException, InvalidRequestException, 
NoSuchFieldException, UnavailableException, IllegalAccessException, 
InstantiationException, AuthenticationException, AuthorizationException
+     {
+         createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+         pig.setBatchOn();
+         pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + 
defaultParameters + "' USING CassandraStorage();");
+         //full copy
+         pig.registerQuery("STORE rows INTO 
'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING 
CassandraStorage();");
+         pig.executeBatch();
+         Assert.assertEquals("User Qux", getColumnValue("thriftKs", 
"CopyOfSomeApp", "name", "qux", "UTF8Type"));
+         Assert.assertEquals("dislike", getColumnValue("thriftKs", 
"CopyOfSomeApp", "vote_type", "qux", "UTF8Type"));
+         Assert.assertEquals("64.7", getColumnValue("thriftKs", 
"CopyOfSomeApp", "percent", "qux", "FloatType"));
+     }
+ 
+     @Test
 -    public void testCassandraStorageSigleTupleCopy() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException, AuthenticationException, 
AuthorizationException
++    public void testCassandraStorageSigleTupleCopy() throws IOException, 
TException, TimedOutException, NotFoundException, InvalidRequestException, 
NoSuchFieldException, UnavailableException, IllegalAccessException, 
InstantiationException, AuthenticationException, AuthorizationException
+     {
+         createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+         pig.setBatchOn();
+         pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + 
defaultParameters + "' USING CassandraStorage();");
+         //sigle tuple
+         pig.registerQuery("onecol = FOREACH rows GENERATE key, percent;");
+         pig.registerQuery("STORE onecol INTO 
'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING 
CassandraStorage();");
+         pig.executeBatch();
+         String value = null;
+         try
+         {
+             value = getColumnValue("thriftKs", "CopyOfSomeApp", "name", 
"qux", "UTF8Type");
+         }
+         catch (NotFoundException e)
+         {
+             Assert.assertTrue(true);
+         }
+         if (value != null)
+             Assert.fail();
+         try
+         {
+             value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", 
"qux", "UTF8Type");
+         }
+         catch (NotFoundException e)
+         {
+             Assert.assertTrue(true);
+         }
+         if (value != null)
+             Assert.fail();
+         Assert.assertEquals("64.7", getColumnValue("thriftKs", 
"CopyOfSomeApp", "percent", "qux", "FloatType"));
+     }
+ 
+     @Test
 -    public void testCassandraStorageBagOnlyCopy() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException, AuthenticationException, 
AuthorizationException
++    public void testCassandraStorageBagOnlyCopy() throws IOException, 
TException, TimedOutException, NotFoundException, InvalidRequestException, 
NoSuchFieldException, UnavailableException, IllegalAccessException, 
InstantiationException, AuthenticationException, AuthorizationException
+     {
+         createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+         pig.setBatchOn();
+         pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + 
defaultParameters + "' USING CassandraStorage();");
+         //bag only
+         pig.registerQuery("other = FOREACH rows GENERATE key, columns;");
+         pig.registerQuery("STORE other INTO 
'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING 
CassandraStorage();");
+         pig.executeBatch();
+         String value = null;
+         try
+         {
+             value = getColumnValue("thriftKs", "CopyOfSomeApp", "name", 
"qux", "UTF8Type");
+         }
+         catch (NotFoundException e)
+         {
+             Assert.assertTrue(true);
+         }
+         if (value != null)
+             Assert.fail();
+         try
+         {
+             value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", 
"qux", "UTF8Type");
+         }
+         catch (NotFoundException e)
+         {
+             Assert.assertTrue(true);
+         }
+         if (value != null)
+             Assert.fail();
+         try
+         {
+             value = getColumnValue("thriftKs", "CopyOfSomeApp", "percent", 
"qux", "FloatType");
+         }
+         catch (NotFoundException e)
+         {
+             Assert.assertTrue(true);
+         }
+         if (value != null)
+             Assert.fail();
+         Assert.assertEquals("extra1", getColumnValue("thriftKs", 
"CopyOfSomeApp", "extra1", "qux", "UTF8Type"));
+     }
+ 
+     @Test
 -    public void testCassandraStorageFilter() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException, AuthenticationException, 
AuthorizationException
++    public void testCassandraStorageFilter() throws IOException, TException, 
TimedOutException, NotFoundException, InvalidRequestException, 
NoSuchFieldException, UnavailableException, IllegalAccessException, 
InstantiationException, AuthenticationException, AuthorizationException
+     {
+         createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+         pig.setBatchOn();
+         pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + 
defaultParameters + "' USING CassandraStorage();");
+ 
+         //filter
+         pig.registerQuery("likes = FILTER rows by vote_type.value eq 'like' 
and rating.value > 5;");
+         pig.registerQuery("STORE likes INTO 
'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING 
CassandraStorage();");
+         pig.executeBatch();
+ 
+         Assert.assertEquals("like", getColumnValue("thriftKs", 
"CopyOfSomeApp", "vote_type", "bar", "UTF8Type"));
+         Assert.assertEquals("like", getColumnValue("thriftKs", 
"CopyOfSomeApp", "vote_type", "foo", "UTF8Type"));
+         String value = null;
+         try
+         {
+             value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", 
"qux", "UTF8Type");
+         }
+         catch (NotFoundException e)
+         {
+             Assert.assertTrue(true);
+         }
+         if (value != null)
+             Assert.fail();
+         try
+         {
+             value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", 
"baz", "UTF8Type");
+         }
+         catch (NotFoundException e)
+         {
+             Assert.assertTrue(true);
+         }
+         if (value != null)
+             Assert.fail();
+ 
+         createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+         pig.setBatchOn();
+         pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + 
defaultParameters + "' USING CassandraStorage();");
+         pig.registerQuery("dislikes_extras = FILTER rows by vote_type.value 
eq 'dislike' AND COUNT(columns) > 0;");
+         pig.registerQuery("STORE dislikes_extras INTO 
'cassandra://thriftKs/CopyOfSomeApp?" + defaultParameters + "' USING 
CassandraStorage();");
+         pig.registerQuery("visible = FILTER rows BY COUNT(columns) == 0;");
+         pig.executeBatch();
+         Assert.assertEquals("dislike", getColumnValue("thriftKs", 
"CopyOfSomeApp", "vote_type", "baz", "UTF8Type"));
+         Assert.assertEquals("dislike", getColumnValue("thriftKs", 
"CopyOfSomeApp", "vote_type", "qux", "UTF8Type"));
+         value = null;
+         try
+         {
+             value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", 
"bar", "UTF8Type");
+         }
+         catch (NotFoundException e)
+         {
+             Assert.assertTrue(true);
+         }
+         if (value != null)
+             Assert.fail();
+         try
+         {
+             value = getColumnValue("thriftKs", "CopyOfSomeApp", "vote_type", 
"foo", "UTF8Type");
+         }
+         catch (NotFoundException e)
+         {
+             Assert.assertTrue(true);
+         }
+         if (value != null)
+             Assert.fail();
+     }
+ 
+     @Test
 -    public void testCassandraStorageJoin() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException, AuthenticationException, 
AuthorizationException
++    public void testCassandraStorageJoin() throws IOException
+     {
+         //test key types with a join
+         pig.registerQuery("U8 = load 'cassandra://thriftKs/U8?" + 
defaultParameters + "' using CassandraStorage();");
+         pig.registerQuery("Bytes = load 'cassandra://thriftKs/Bytes?" + 
defaultParameters + "' using CassandraStorage();");
+ 
+         //cast key to chararray
+         pig.registerQuery("b = foreach Bytes generate (chararray)key, 
columns;");
+ 
+         //key in Bytes is a bytearray, U8 chararray
+         //(foo,{(x,Z)},foo,{(x,Z)})
+         pig.registerQuery("a = join Bytes by key, U8 by key;");
+         Iterator<Tuple> it = pig.openIterator("a");
+         if (it.hasNext()) {
+             Tuple t = it.next();
+             Assert.assertEquals(t.get(0), new 
DataByteArray("foo".getBytes()));
+             DataBag columns = (DataBag) t.get(1);
+             Iterator<Tuple> iter = columns.iterator();
+             Tuple t1 = iter.next();
+             Assert.assertEquals(t1.get(0), "x");
+             Assert.assertEquals(t1.get(1), new DataByteArray("Z".getBytes()));
+             String column = (String) t.get(2);
+             Assert.assertEquals(column, "foo");
+             columns = (DataBag) t.get(3);
+             iter = columns.iterator();
+             Tuple t2 = iter.next();
+             Assert.assertEquals(t2.get(0), "x");
+             Assert.assertEquals(t2.get(1), new DataByteArray("Z".getBytes()));
+         }
+         //key should now be cast into a chararray
+         //(foo,{(x,Z)},foo,{(x,Z)})
+         pig.registerQuery("c = join b by (chararray)key, U8 by 
(chararray)key;");
+         it = pig.openIterator("c");
+         if (it.hasNext()) {
+             Tuple t = it.next();
+             Assert.assertEquals(t.get(0), "foo");
+             DataBag columns = (DataBag) t.get(1);
+             Iterator<Tuple> iter = columns.iterator();
+             Tuple t1 = iter.next();
+             Assert.assertEquals(t1.get(0), "x");
+             Assert.assertEquals(t1.get(1), new DataByteArray("Z".getBytes()));
+             String column = (String) t.get(2);
+             Assert.assertEquals(column, "foo");
+             columns = (DataBag) t.get(3);
+             iter = columns.iterator();
+             Tuple t2 = iter.next();
+             Assert.assertEquals(t2.get(0), "x");
+             Assert.assertEquals(t2.get(1), new DataByteArray("Z".getBytes()));
+         }
+     }
+ 
+     @Test
 -    public void testCassandraStorageCounterCF() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException, AuthenticationException, 
AuthorizationException
++    public void testCassandraStorageCounterCF() throws IOException
+     {
+         pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + 
defaultParameters + "' USING CassandraStorage();");
+ 
+         //Test counter column family support
+         pig.registerQuery("CC = load 'cassandra://thriftKs/CC?" + 
defaultParameters + "' using CassandraStorage();");
+         pig.registerQuery("total_hits = foreach CC generate key, 
SUM(columns.value);");
+         //(chuck,4)
+         Iterator<Tuple> it = pig.openIterator("total_hits");
+         if (it.hasNext()) {
+             Tuple t = it.next();
+             Assert.assertEquals(t.get(0), "chuck");
+             Assert.assertEquals(t.get(1), 4l);
+         }
+     }
+ 
+     @Test
 -    public void testCassandraStorageCompositeColumnCF() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException, AuthenticationException, 
AuthorizationException
++    public void testCassandraStorageCompositeColumnCF() throws IOException
+     {
+         //Test CompositeType
+         pig.registerQuery("compo = load 'cassandra://thriftKs/Compo?" + 
defaultParameters + "' using CassandraStorage();");
+         pig.registerQuery("compo = foreach compo generate key as method, 
flatten(columns);");
+         pig.registerQuery("lee = filter compo by columns::name == 
('bruce','lee');");
+ 
+         //(kick,(bruce,lee),oww)
+         //(punch,(bruce,lee),ouch)
+         Iterator<Tuple> it = pig.openIterator("lee");
+         int count = 0;
+         while (it.hasNext()) {
+             count ++;
+             Tuple t = it.next();
+             if (count == 1)
+                 Assert.assertEquals(t.get(0), "kick");
+             else
+                 Assert.assertEquals(t.get(0), "punch");
+             Tuple t1 = (Tuple) t.get(1);
+             Assert.assertEquals(t1.get(0), "bruce");
+             Assert.assertEquals(t1.get(1), "lee");
+             if (count == 1)
+                 Assert.assertEquals(t.get(2), "oww");
+             else
+                 Assert.assertEquals(t.get(2), "ouch");
+         }
+         Assert.assertEquals(count, 2);
+         pig.registerQuery("night = load 'cassandra://thriftKs/CompoInt?" + 
defaultParameters + "' using CassandraStorage();");
+         pig.registerQuery("night = foreach night generate flatten(columns);");
+         pig.registerQuery("night = foreach night generate 
(int)columns::name.$0+(double)columns::name.$1/60 as hour, columns::value as 
noise;");
+ 
+         //What happens at the darkest hour?
+         pig.registerQuery("darkest = filter night by hour > 2 and hour < 5;");
+ 
+         //(2.5,daddy?)
+         it = pig.openIterator("darkest");
+         if (it.hasNext()) {
+             Tuple t = it.next();
+             Assert.assertEquals(t.get(0), 2.5d);
+             Assert.assertEquals(t.get(1), "daddy?");
+         }
+         pig.setBatchOn();
+         pig.registerQuery("compo_int_rows = LOAD 
'cassandra://thriftKs/CompoInt?" + defaultParameters + "' using 
CassandraStorage();");
+         pig.registerQuery("STORE compo_int_rows INTO 
'cassandra://thriftKs/CompoIntCopy?" + defaultParameters + "' using 
CassandraStorage();");
+         pig.executeBatch();
+         pig.registerQuery("compocopy_int_rows = LOAD 
'cassandra://thriftKs/CompoIntCopy?" + defaultParameters + "' using 
CassandraStorage();");
+         //(clock,{((1,0),z),((1,30),zzzz),((2,30),daddy?),((6,30),coffee...)})
+         it = pig.openIterator("compocopy_int_rows");
+         count = 0;
+         if (it.hasNext()) {
+             Tuple t = it.next();
+             Assert.assertEquals(t.get(0), "clock");
+             DataBag columns = (DataBag) t.get(1);
+             Iterator<Tuple> iter = columns.iterator();
+             while (iter.hasNext())
+             {
+                 count ++;
+                 Tuple t1 = iter.next();
+                 Tuple inner = (Tuple) t1.get(0);
+                 if (count == 1)
+                 {
+                     Assert.assertEquals(inner.get(0), 1L);
+                     Assert.assertEquals(inner.get(1), 0L);
+                     Assert.assertEquals(t1.get(1), "z");
+                 }
+                 else if (count == 2)
+                 {
+                     Assert.assertEquals(inner.get(0), 1L);
+                     Assert.assertEquals(inner.get(1), 30L);
+                     Assert.assertEquals(t1.get(1), "zzzz");
+                 }
+                 else if (count == 3)
+                 {
+                     Assert.assertEquals(inner.get(0), 2L);
+                     Assert.assertEquals(inner.get(1), 30L);
+                     Assert.assertEquals(t1.get(1), "daddy?");
+                 }
+                 else if (count == 4)
+                 {
+                     Assert.assertEquals(inner.get(0), 6L);
+                     Assert.assertEquals(inner.get(1), 30L);
+                     Assert.assertEquals(t1.get(1), "coffee...");
+                 }
+             }
+             Assert.assertEquals(count, 4);
+         }
+     }
+ 
+     @Test
 -    public void testCassandraStorageCompositeKeyCF() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException, AuthenticationException, 
AuthorizationException
++    public void testCassandraStorageCompositeKeyCF() throws IOException
+     {
+         //Test CompositeKey
+         pig.registerQuery("compokeys = load 'cassandra://thriftKs/CompoKey?" 
+ defaultParameters + "' using CassandraStorage();");
+         pig.registerQuery("compokeys = filter compokeys by key.$1 == 40;");
+         //((clock,40),{(6,coffee...)})
+         Iterator<Tuple> it = pig.openIterator("compokeys");
+         if (it.hasNext()) {
+             Tuple t = it.next();
+             Tuple key = (Tuple) t.get(0); 
+             Assert.assertEquals(key.get(0), "clock");
+             Assert.assertEquals(key.get(1), 40L);
+             DataBag columns = (DataBag) t.get(1);
+             Iterator<Tuple> iter = columns.iterator();
+             if (iter.hasNext())
+             {
+                 Tuple t1 = iter.next();
+                 Assert.assertEquals(t1.get(0), 6L);
+                 Assert.assertEquals(t1.get(1), "coffee...");
+             }
+         }
+         pig.setBatchOn();
+         pig.registerQuery("compo_key_rows = LOAD 
'cassandra://thriftKs/CompoKey?" + defaultParameters + "' using 
CassandraStorage();");
+         pig.registerQuery("STORE compo_key_rows INTO 
'cassandra://thriftKs/CompoKeyCopy?" + defaultParameters + "' using 
CassandraStorage();");
+         pig.executeBatch();
+         pig.registerQuery("compo_key_copy_rows = LOAD 
'cassandra://thriftKs/CompoKeyCopy?" + defaultParameters + "' using 
CassandraStorage();");
+         //((clock,10),{(1,z)})
+         //((clock,20),{(1,zzzz)})
+         //((clock,30),{(2,daddy?)})
+         //((clock,40),{(6,coffee...)})
+         it = pig.openIterator("compo_key_copy_rows");
+         int count = 0;
+         while (it.hasNext()) {
+             Tuple t = it.next();
+             count ++;
+             if (count == 1)
+             {
+                 Tuple key = (Tuple) t.get(0); 
+                 Assert.assertEquals(key.get(0), "clock");
+                 Assert.assertEquals(key.get(1), 10L);
+                 DataBag columns = (DataBag) t.get(1);
+                 Iterator<Tuple> iter = columns.iterator();
+                 if (iter.hasNext())
+                 {
+                     Tuple t1 = iter.next();
+                     Assert.assertEquals(t1.get(0), 1L);
+                     Assert.assertEquals(t1.get(1), "z");
+                 }
+             }
+             else if (count == 2)
+             {
+                 Tuple key = (Tuple) t.get(0); 
+                 Assert.assertEquals(key.get(0), "clock");
+                 Assert.assertEquals(key.get(1), 20L);
+                 DataBag columns = (DataBag) t.get(1);
+                 Iterator<Tuple> iter = columns.iterator();
+                 if (iter.hasNext())
+                 {
+                     Tuple t1 = iter.next();
+                     Assert.assertEquals(t1.get(0), 1L);
+                     Assert.assertEquals(t1.get(1), "zzzz");
+                 }
+             }
+             else if (count == 3)
+             {
+                 Tuple key = (Tuple) t.get(0); 
+                 Assert.assertEquals(key.get(0), "clock");
+                 Assert.assertEquals(key.get(1), 30L);
+                 DataBag columns = (DataBag) t.get(1);
+                 Iterator<Tuple> iter = columns.iterator();
+                 if (iter.hasNext())
+                 {
+                     Tuple t1 = iter.next();
+                     Assert.assertEquals(t1.get(0), 2L);
+                     Assert.assertEquals(t1.get(1), "daddy?");
+                 }
+             }
+             else if (count == 4)
+             {
+                 Tuple key = (Tuple) t.get(0); 
+                 Assert.assertEquals(key.get(0), "clock");
+                 Assert.assertEquals(key.get(1), 40L);
+                 DataBag columns = (DataBag) t.get(1);
+                 Iterator<Tuple> iter = columns.iterator();
+                 if (iter.hasNext())
+                 {
+                     Tuple t1 = iter.next();
+                     Assert.assertEquals(t1.get(0), 6L);
+                     Assert.assertEquals(t1.get(1), "coffee...");
+                 }
+             }
+         }
+         Assert.assertEquals(count, 4);
+     }
+ 
+     private String getColumnValue(String ks, String cf, String colName, 
String key, String validator)
+     throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, IOException
+     {
+         Cassandra.Client client = getClient();
+         client.set_keyspace(ks);
+ 
+         ByteBuffer key_user_id = ByteBufferUtil.bytes(key);
+ 
 -        long timestamp = System.currentTimeMillis();
+         ColumnPath cp = new ColumnPath(cf);
 -        ColumnParent par = new ColumnParent(cf);
+         cp.column = ByteBufferUtil.bytes(colName);
+ 
+         // read
+         ColumnOrSuperColumn got = client.get(key_user_id, cp, 
ConsistencyLevel.ONE);
+         return parseType(validator).getString(got.getColumn().value);
+     }
+ 
 -    private void createColumnFamily(String ks, String cf, String statement) 
throws CharacterCodingException, ClassNotFoundException, TException, 
TimedOutException, NotFoundException, InvalidRequestException, 
NoSuchFieldException, UnavailableException, IllegalAccessException, 
InstantiationException
++    private void createColumnFamily(String ks, String cf, String statement) 
throws CharacterCodingException, TException, TimedOutException, 
NotFoundException, InvalidRequestException, NoSuchFieldException, 
UnavailableException, IllegalAccessException, InstantiationException
+     {
+         CliMain.connect("127.0.0.1", 9170);
+         try
+         {
+             CliMain.processStatement("use " + ks + ";");
+             CliMain.processStatement("drop column family " + cf + ";");
+         }
+         catch (Exception e)
+         {
+         }
+         CliMain.processStatement(statement);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57e959d8/test/pig/org/apache/pig/test/MiniCluster.java
----------------------------------------------------------------------
diff --cc test/pig/org/apache/pig/test/MiniCluster.java
index 0000000,3216392..e8f1f6e
mode 000000,100644..100644
--- a/test/pig/org/apache/pig/test/MiniCluster.java
+++ b/test/pig/org/apache/pig/test/MiniCluster.java
@@@ -1,0 -1,78 +1,82 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.pig.test;
+ 
+ import java.io.File;
+ import java.io.FileOutputStream;
+ import java.io.IOException;
++import java.io.OutputStream;
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hdfs.MiniDFSCluster;
+ import org.apache.hadoop.mapred.MiniMRCluster;
+ 
+ public class MiniCluster extends MiniGenericCluster {
+     private MiniMRCluster m_mr = null;
+     public MiniCluster() {
+         super();
+     }
+ 
+     @Override
+     protected void setupMiniDfsAndMrClusters() {
+         try {
+             System.setProperty("hadoop.log.dir", "build/test/logs");
+             final int dataNodes = 4;     // There will be 4 data nodes
+             final int taskTrackers = 4;  // There will be 4 task tracker nodes
+ 
+             // Create the configuration hadoop-site.xml file
+             File conf_dir = new File("build/classes/");
+             conf_dir.mkdirs();
+             File conf_file = new File(conf_dir, "hadoop-site.xml");
+ 
+             conf_file.delete();
+ 
+             // Builds and starts the mini dfs and mapreduce clusters
+             Configuration config = new Configuration();
+             m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
+             m_fileSys = m_dfs.getFileSystem();
+             m_mr = new MiniMRCluster(taskTrackers, 
m_fileSys.getUri().toString(), 1);
+ 
+             // Write the necessary config info to hadoop-site.xml
+             m_conf = m_mr.createJobConf();
+             m_conf.setInt("mapred.submit.replication", 2);
+             m_conf.set("dfs.datanode.address", "0.0.0.0:0");
+             m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
+             m_conf.set("mapred.map.max.attempts", "2");
+             m_conf.set("mapred.reduce.max.attempts", "2");
+             m_conf.set("pig.jobcontrol.sleep", "100");
 -            m_conf.writeXml(new FileOutputStream(conf_file));
++            try (OutputStream os = new FileOutputStream(conf_file))
++            {
++                m_conf.writeXml(os);
++            }
+ 
+             // Set the system properties needed by Pig
+             System.setProperty("cluster", m_conf.get("mapred.job.tracker"));
+             System.setProperty("namenode", m_conf.get("fs.default.name"));
+             System.setProperty("junit.hadoop.conf", conf_dir.getPath());
+         } catch (IOException e) {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     @Override
+     protected void shutdownMiniMrClusters() {
+         if (m_mr != null) { m_mr.shutdown(); }
+             m_mr = null;
+     }
+ }

Reply via email to