Hi,  
yes, the work can be split between different mappers, but each one will process 
one row at the time. In fact, the method

>  public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, 
> Context context)

processes 1 row, with the specified ByteBuffer key and the list of columns 
SortedMap<ByteBuffer, IColumn> columns.


That doesn't mean you will make millions of requests to Cassandra to retrieve 
one row at the time though. Requests are batched, and the parameter
cassandra.range.batch.size
determines "The number of rows to request with each get range slices request" 
(as per javadoc).

Performance-wise, that shouldn't be a problem… the operation you are doing is 
very simple, and Cassandra will be fast to retrieve such a short rows.  
In any case, your business logic works well in parallel, so you can split the 
job between many concurrent mappers and distribute the work among them.

--  
Filippo



On Thursday, 31 May 2012 at 09:59, murat migdisoglu wrote:

>  
> Hi,  
>  
> I'm working on some use cases to understand how cassandra-hadoop integration 
> works.  
>  
> I have a very basic scenario: I have a column family that keeps the session 
> id and some bson data that contains the username in two separate columns. I 
> want to go through all rows and dump the row to a file when the username is 
> matching to a certain criteria. And I don't need any Reducer or Combiner for 
> now.  
>  
> After I've written the following very simple hadoop job, I see from the logs 
> that my mapper function is called per each row.  Is that normal? If that is 
> the case, doing such a search operation in a big dataset would take hours if 
> not days...Besides that, I see many small output files being created on HDFS. 
>  
>  
> I guess i need a better understanding on how splitting the job into tasks 
> works exactly..  
>  
>  
>     @Override
>     public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, 
> Context context)
>     throws IOException, InterruptedException
>     {
>         String rowkey = ByteBufferUtil.string(key);
>         String ip = context.getConfiguration().
> get(IP);
>         IColumn column = columns.get(sourceColumn);
>         if (column == null)
>             return;
>         ByteBuffer byteBuffer = column.value();         
>         ByteBuffer bb2 = byteBuffer.duplicate();
>          
>         DataConvertor convertor= fromBson(byteBuffer, DataConvertor.class);   
>       
>         String username= convertor.getUsername();
>         BytesWritable value = new BytesWritable();
>         if (username != null && username.equals(cip)) {        
>             byte[] arr = convertToByteArray(bb2);
>             value.set(new BytesWritable(arr));
>             Text tkey = new Text(rowkey);
>             context.write( tkey, value);
>         } else {
>             log.info (http://log.info)("ip not match [" + ip + "]");
>         }
>     }
>  
> Thanks in advance
> Kind Regards
>  
>  
> --  
> "Find a job you enjoy, and you'll never work a day in your life."
> Confucius  
>  

Reply via email to