I suggested turning up the logging to see if the server processed a 
batch_mutate call. This is done from the CassandraServer class 
(https://github.com/apache/cassandra/blob/cassandra-0.8.4/src/java/org/apache/cassandra/thrift/CassandraServer.java#L531)
 , not the CFOF.

The first step will be to determine if a call is been made to the server. If 
not then work out why not. If so then were is the data going. 

Cheers

-----------------
Aaron Morton
Freelance Cassandra Developer
@aaronmorton
http://www.thelastpickle.com

On 17/08/2011, at 1:36 AM, Jian Fang wrote:

> If you look at the source code and you will find there is no log message in 
> the ColumnFamilyOutputFormat class and the related classes.
> How to trace the problem then? No one actually got this working?
> 
> On Thu, Aug 11, 2011 at 6:10 PM, aaron morton <aa...@thelastpickle.com> wrote:
> Turn the logging up in cassandra or your MR job and make sure the 
> batch_mutation is sent. Sounds like it's not. 
> 
> Cheers
> 
> -----------------
> Aaron Morton
> Freelance Cassandra Developer
> @aaronmorton
> http://www.thelastpickle.com
> 
> On 12 Aug 2011, at 07:22, Jian Fang wrote:
> 
>> 53 seconds included the map phase to read and process the input file. The 
>> records were updated at the end of the reduce phase.
>> 
>> I checked the sales ranks in the update file and the sales ranks in the 
>> Cassandra, they are different and thus, the records
>> were not actually updated.
>> 
>> I remember I run the word count example for Cassandra 0.8.0 some time ago, I 
>> saw the similar behavior, i.e., the results were not written to the 
>> Cassandra column family. But If I changed hadoop to write to the file 
>> system, I can see the results in the output file. I didn't try the word 
>> count example for Cassandra 0.8.2 though.
>> 
>> Anyway to solve this problem?
>> 
>> Thanks,
>> 
>> John 
>> On Thu, Aug 11, 2011 at 12:17 AM, aaron morton <aa...@thelastpickle.com> 
>> wrote:
>> I'm a simple guy. My first step would be see if the expected data is in the 
>> data base, if not what's missing. 
>> 
>> 2.5M updates / 3 nodes = 833,333 per node 
>> 833,333 / 53 seconds = 15,723 per second 
>> 1 / 15,723  = 0.00006 seconds / 0.06 milliseconds per mutation 
>>  
>> sounds reasonable to me. 
>> 
>> check the Write Latency in nodetool cfstats and the row count estimates. 
>> 
>> Cheers
>> 
>> -----------------
>> Aaron Morton
>> Freelance Cassandra Developer
>> @aaronmorton
>> http://www.thelastpickle.com
>> 
>> On 11 Aug 2011, at 14:50, Jian Fang wrote:
>> 
>>> There are data and each Cassandra cluster node holds about 100G. From the 
>>> application point of view, if I run the job twice with the same input file, 
>>> i.e., the sales rank update file, then I should see a much smaller number 
>>> of products, whose rank change exceeds the threshold, in the output file 
>>> for the second run because the sales ranks have been updated to be the same 
>>> as the ranks in the input file during the first run. But actually, I saw 
>>> the output files stay the same for the two runs. One explanation is that 
>>> the ranks were not actually updated for the first run. Also, 53 seconds to 
>>> run the whole hadoop job with 2.5 million Cassandra updates on three nodes, 
>>> do you think that is possible? Each node is a regular Linux box with 8 
>>> CPUs. 
>>> 
>>> Thanks,
>>> 
>>> John  
>>> 
>>> On Wed, Aug 10, 2011 at 5:40 PM, aaron morton <aa...@thelastpickle.com> 
>>> wrote:
>>>>  Seems the data are not actually written to Cassandra.
>>> 
>>> Before jumping into the Hadoop side of things are you saying there is no 
>>> data in Cassandra ? Can you retrieve any using the CLI  ? Take a look at 
>>> cfstats on each node to see the estimated record count.
>>> 
>>> Cheers
>>>  
>>> -----------------
>>> Aaron Morton
>>> Freelance Cassandra Developer
>>> @aaronmorton
>>> http://www.thelastpickle.com
>>> 
>>> On 11 Aug 2011, at 08:20, Jian Fang wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I am using Cassandra 0.8.2 with Hadoop 0.20.2. My application read a file 
>>>> and then write about 2.5 million records
>>>> to Cassandra. I used ColumnFamilyOutputFormat to write to Cassandra. My 
>>>> Cassandra cluster has three nodes with
>>>> one Hadoop task tracker on each node. The wired problem is that I only saw 
>>>> one map and one reducer tasks and job only took
>>>> 53 seconds to finish. Seems the data are not actually written to Cassandra.
>>>> 
>>>> Here is status from Hadoop web admin:
>>>> 
>>>> User: hadoop
>>>> Job Name: SalesRankWriter
>>>> Job File: 
>>>> hdfs://xxxxx:54310/hadoop/tmp/mapred/system/job_201108051329_0060/job.xml
>>>> Job Setup: Successful
>>>> Status: Succeeded
>>>> Started at: Wed Aug 10 15:24:43 EDT 2011
>>>> Finished at: Wed Aug 10 15:25:36 EDT 2011
>>>> Finished in: 52sec
>>>> Job Cleanup: Successful
>>>> Kind       % Complete      Num Tasks       Pending Running Complete        
>>>> Killed  Failed/Killed
>>>> Task Attempts
>>>> map        100.00%
>>>> 1  0       0       1       0       0 / 0
>>>> reduce     100.00%
>>>> 1  0       0       1       0       0 / 0
>>>> 
>>>> Counter    Map     Reduce  Total
>>>> Job Counters       Launched reduce tasks   0       0       1
>>>> Launched map tasks 0       0       1
>>>> Data-local map tasks       0       0       1
>>>> FileSystemCounters FILE_BYTES_READ 50,698,700      50,698,646      
>>>> 101,397,346
>>>> HDFS_BYTES_READ    56,149,360      0       56,149,360
>>>> FILE_BYTES_WRITTEN 101,397,378     50,698,646      152,096,024
>>>> Map-Reduce Framework       Reduce input groups     0       2,534,932       
>>>> 2,534,932
>>>> Combine output records     0       0       0
>>>> Map input records  2,534,932       0       2,534,932
>>>> Reduce shuffle bytes       0       0       0
>>>> Reduce output records      0       2,534,932       2,534,932
>>>> Spilled Records    5,069,864       2,534,932       7,604,796
>>>> Map output bytes   45,628,776      0       45,628,776
>>>> Map output records 2,534,932       0       2,534,932
>>>> Combine input records      0       0       0
>>>> Reduce input records       0       2,534,932       2,534,932
>>>> 
>>>> and the log for the mapper
>>>> 
>>>>  2011-08-10 15:24:48,717 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: 
>>>> Initializing JVM Metrics with processName=MAP, sessionId=
>>>> 2011-08-10 15:24:48,857 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb 
>>>> = 100
>>>> 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: data buffer 
>>>> = 79691776/99614720
>>>> 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: record 
>>>> buffer = 262144/327680
>>>> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: Spilling 
>>>> map output: record full = true
>>>> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>>>> 0; bufend = 4718592; bufvoid = 99614720
>>>> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>>>> 0; kvend = 262144; length = 327680
>>>> 2011-08-10 15:24:50,364 INFO org.apache.hadoop.mapred.MapTask: Finished 
>>>> spill 0
>>>> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: Spilling 
>>>> map output: record full = true
>>>> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>>>> 4718592; bufend = 9437166; bufvoid = 99614720
>>>> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>>>> 262144; kvend = 196607; length = 327680
>>>> 2011-08-10 15:24:51,238 INFO org.apache.hadoop.mapred.MapTask: Finished 
>>>> spill 1
>>>> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: Spilling 
>>>> map output: record full = true
>>>> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>>>> 9437166; bufend = 14155740; bufvoid = 99614720
>>>> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>>>> 196607; kvend = 131070; length = 327680
>>>> 2011-08-10 15:24:52,084 INFO org.apache.hadoop.mapred.MapTask: Finished 
>>>> spill 2
>>>> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: Spilling 
>>>> map output: record full = true
>>>> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>>>> 14155740; bufend = 18874314; bufvoid = 99614720
>>>> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>>>> 131070; kvend = 65533; length = 327680
>>>> 2011-08-10 15:24:52,877 INFO org.apache.hadoop.mapred.MapTask: Finished 
>>>> spill 3
>>>> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: Spilling 
>>>> map output: record full = true
>>>> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>>>> 18874314; bufend = 23592906; bufvoid = 99614720
>>>> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>>>> 65533; kvend = 327677; length = 327680
>>>> 2011-08-10 15:24:53,660 INFO org.apache.hadoop.mapred.MapTask: Finished 
>>>> spill 4
>>>> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: Spilling 
>>>> map output: record full = true
>>>> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>>>> 23592906; bufend = 28311480; bufvoid = 99614720
>>>> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>>>> 327677; kvend = 262140; length = 327680
>>>> 2011-08-10 15:24:54,447 INFO org.apache.hadoop.mapred.MapTask: Finished 
>>>> spill 5
>>>> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: Spilling 
>>>> map output: record full = true
>>>> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>>>> 28311480; bufend = 33030054; bufvoid = 99614720
>>>> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>>>> 262140; kvend = 196603; length = 327680
>>>> 2011-08-10 15:24:55,237 INFO org.apache.hadoop.mapred.MapTask: Finished 
>>>> spill 6
>>>> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: Spilling 
>>>> map output: record full = true
>>>> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>>>> 33030054; bufend = 37748628; bufvoid = 99614720
>>>> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>>>> 196603; kvend = 131066; length = 327680
>>>> 2011-08-10 15:24:55,981 INFO org.apache.hadoop.mapred.MapTask: Finished 
>>>> spill 7
>>>> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: Spilling 
>>>> map output: record full = true
>>>> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>>>> 37748628; bufend = 42467202; bufvoid = 99614720
>>>> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>>>> 131066; kvend = 65529; length = 327680
>>>> 2011-08-10 15:24:56,848 INFO org.apache.hadoop.mapred.MapTask: Finished 
>>>> spill 8
>>>> 2011-08-10 15:24:57,051 INFO org.apache.hadoop.mapred.MapTask: Starting 
>>>> flush of map output
>>>> 2011-08-10 15:24:57,282 INFO org.apache.hadoop.mapred.MapTask: Finished 
>>>> spill 9
>>>> 2011-08-10 15:24:57,291 INFO org.apache.hadoop.mapred.Merger: Merging 10 
>>>> sorted segments
>>>> 2011-08-10 15:24:57,297 INFO org.apache.hadoop.mapred.Merger: Down to the 
>>>> last merge-pass, with 10 segments left of total size: 50698660 bytes
>>>> 2011-08-10 15:24:59,552 INFO org.apache.hadoop.mapred.TaskRunner: 
>>>> Task:attempt_201108051329_0060_m_000000_0 is done. And is in the process 
>>>> of commiting
>>>> 2011-08-10 15:24:59,555 INFO org.apache.hadoop.mapred.TaskRunner: Task 
>>>> 'attempt_201108051329_0060_m_000000_0' done.
>>>> 
>>>> and the log for the reducer
>>>> 
>>>> 2011-08-10 15:25:00,835 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: 
>>>> Initializing JVM Metrics with processName=SHUFFLE, sessionId=
>>>> 2011-08-10 15:25:01,005 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> ShuffleRamManager: MemoryLimit=140699232, MaxSingleShuffleLimit=35174808
>>>> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging 
>>>> on-disk files
>>>> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging in 
>>>> memory files
>>>> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging 
>>>> on-disk files
>>>> 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> attempt_201108051329_0060_r_000000_0 Need another 1 map output(s) where 0 
>>>> is already in progress
>>>> 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> attempt_201108051329_0060_r_000000_0 Thread started: Thread for polling 
>>>> Map Completion Events
>>>> 2011-08-10 15:25:01,037 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> attempt_201108051329_0060_r_000000_0 Scheduled 0 outputs (0 slow hosts 
>>>> and0 dup hosts)
>>>> 2011-08-10 15:25:01,038 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> attempt_201108051329_0060_r_000000_0: Got 1 new map-outputs
>>>> 2011-08-10 15:25:06,039 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> attempt_201108051329_0060_r_000000_0 Scheduled 1 outputs (0 slow hosts 
>>>> and0 dup hosts)
>>>> 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: header: 
>>>> attempt_201108051329_0060_m_000000_0, compressed len: 50698646, 
>>>> decompressed len: 50698642
>>>> 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> Shuffling 50698642 bytes (50698646 raw bytes) into Local-FS from 
>>>> attempt_201108051329_0060_m_000000_0
>>>> 2011-08-10 15:25:06,314 INFO org.apache.hadoop.mapred.ReduceTask: Read 
>>>> 50698646 bytes from map-output for attempt_201108051329_0060_m_000000_0
>>>> 2011-08-10 15:25:06,315 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging 
>>>> on-disk files
>>>> 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> GetMapEventsThread exiting
>>>> 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> getMapsEventsThread joined.
>>>> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: Closed 
>>>> ram manager
>>>> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> Interleaved on-disk merge complete: 1 files left.
>>>> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: 
>>>> In-memory merge complete: 0 files left.
>>>> 2011-08-10 15:25:07,061 INFO org.apache.hadoop.mapred.ReduceTask: Merging 
>>>> 1 files, 50698646 bytes from disk
>>>> 2011-08-10 15:25:07,062 INFO org.apache.hadoop.mapred.ReduceTask: Merging 
>>>> 0 segments, 0 bytes from memory into reduce
>>>> 2011-08-10 15:25:07,065 INFO org.apache.hadoop.mapred.Merger: Merging 1 
>>>> sorted segments
>>>> 2011-08-10 15:25:07,072 INFO org.apache.hadoop.mapred.Merger: Down to the 
>>>> last merge-pass, with 1 segments left of total size: 50698642 bytes
>>>> 2011-08-10 15:25:30,126 INFO org.apache.hadoop.mapred.TaskRunner: 
>>>> Task:attempt_201108051329_0060_r_000000_0 is done. And is in the process 
>>>> of commiting
>>>> 2011-08-10 15:25:30,129 INFO org.apache.hadoop.mapred.TaskRunner: Task 
>>>> 'attempt_201108051329_0060_r_000000_0' done.
>>>> 
>>>> My code is similar to the word count example:
>>>> 
>>>>     public int run(String[] args) throws Exception {
>>>>         ...
>>>> 
>>>>         getConf().set(CONF_COLUMN_NAME, columnName);
>>>> 
>>>>         Job job4 = new Job(getConf(), "SalesRankWriter");
>>>>         job4.setJarByClass(SalesRankLoader.class);
>>>>         job4.setMapperClass(RankUpdateMapper.class);
>>>>         job4.setReducerClass(RankUpdateReducer.class);
>>>>         job4.setMapOutputKeyClass(Text.class);
>>>>         job4.setMapOutputValueClass(IntWritable.class);
>>>>         job4.setOutputKeyClass(ByteBuffer.class);
>>>>         job4.setOutputValueClass(List.class);
>>>>         job4.setOutputFormatClass(ColumnFamilyOutputFormat.class);
>>>>         job4.setInputFormatClass(TextInputFormat.class);
>>>>         FileInputFormat.addInputPath(job4, new Path(prePath));
>>>> 
>>>>         ConfigHelper.setOutputColumnFamily(job4.getConfiguration(), 
>>>> KEYSPACE, columnFamily);
>>>>         ConfigHelper.setRpcPort(job4.getConfiguration(), "9260");
>>>>         ConfigHelper.setInitialAddress(job4.getConfiguration(), 
>>>> "dnjsrcha01");
>>>>         ConfigHelper.setPartitioner(job4.getConfiguration(), 
>>>> "org.apache.cassandra.dht.RandomPartitioner");
>>>> 
>>>> 
>>>>         job4.waitForCompletion(true);
>>>>         ...
>>>>     }
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> where the mapper and reducer are defined as:
>>>> 
>>>>     public static class RankUpdateMapper extends Mapper<LongWritable, 
>>>> Text, Text, IntWritable> {
>>>>         public void map(LongWritable key, Text value, Context context) 
>>>> throws IOException, InterruptedException {
>>>>             String line = value.toString();
>>>>             StringTokenizer tokenizer = new StringTokenizer(line);
>>>>             String ean = tokenizer.nextToken();
>>>>             int rank = Integer.parseInt(tokenizer.nextToken());
>>>> 
>>>>             context.write(new Text(ean), new IntWritable(rank));
>>>>         }
>>>>     }
>>>> 
>>>>     public static class RankUpdateReducer extends Reducer<Text, 
>>>> IntWritable, ByteBuffer, List<Mutation>>
>>>>     {
>>>>         private ByteBuffer outputKey;
>>>> 
>>>>         protected void setup(org.apache.hadoop.mapreduce.Reducer.Context 
>>>> context) throws IOException, InterruptedException
>>>>         {
>>>>             outputKey = 
>>>> ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME));
>>>>         }
>>>> 
>>>>         public void reduce(Text key, Iterable<IntWritable> values, Context 
>>>> context) throws IOException, InterruptedException
>>>>         {
>>>>             context.write(outputKey, 
>>>> Collections.singletonList(getMutation(key, 
>>>> values.iterator().next().get())));
>>>>         }
>>>> 
>>>>         private static Mutation getMutation(Text key, int value)
>>>>         {
>>>>             Column c = new Column();
>>>>             c.setName(ByteBuffer.wrap(Arrays.copyOf(key.getBytes(), 
>>>> key.getLength())));
>>>>             c.setValue(ByteBufferUtil.bytes(String.valueOf(value)));
>>>>             c.setTimestamp(System.currentTimeMillis() * 1000);
>>>> 
>>>>             Mutation m = new Mutation();
>>>>             m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
>>>>             m.column_or_supercolumn.setColumn(c);
>>>>             return m;
>>>>         }
>>>>     }
>>>> 
>>>> Any thing wrong here?
>>>> 
>>>> Thanks,
>>>> 
>>>> John
>>> 
>>> 
>> 
>> 
> 
> 

Reply via email to