I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk load
data into Cassandra.  Was not able to find any documentation of this new
output format , but from looking through the code this uses
CQLSSTableWriter to write SSTable files to disk , which are then streamed
to Cassandra using SSTableLoader. On running the Hadoop job, I can see that
the SSTable files do get generated but fails to stream the data out. I get
the same exception when I try with Cassndra node on "localhost" as well as
a remote Cassandra cluster. Also I get this exception on C* versions
2.1.1,  2.1.2 and 2.1.3.

*Relevant portion of logs* *and stack trace*

09:20:23.207 [Thread-6] WARN  org.apache.cassandra.utils.CLibrary - JNA
link failure, one or more native method will be unavailable.
09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary - JNA
link failure details: Error looking up function 'posix_fadvise':
dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found
09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db
09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1
09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db
09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db
09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db
09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt
09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
Renaming
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db
to
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db
09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load
metadata for
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
09:20:23.729 [Thread-2] INFO  o.a.c.io.sstable.SSTableReader - Opening
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
(617874 bytes)
09:20:23.780 [Thread-2] INFO  o.a.c.streaming.StreamResultFuture - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for Bulk
Load
09:20:23.781 [StreamConnectionEstablisher:1] INFO
o.a.c.streaming.StreamSession - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to /192.168.56.11
09:20:23.781 [StreamConnectionEstablisher:1] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for incoming
stream
09:20:23.792 [StreamConnectionEstablisher:1] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for outgoing
stream
09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests,  1
files}
09:20:23.795 [StreamConnectionEstablisher:1] INFO
o.a.c.streaming.StreamResultFuture - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed. Receiving 0
files(0 bytes), sending 1 files(617874 bytes)
09:20:23.799 [StreamConnectionEstablisher:1] INFO
o.a.c.streaming.StreamCoordinator - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session with /
192.168.56.11
09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId:
d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated keys:
3072, transfer size: 617874, compressed?: true, repairedAt: 0), file:
/var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db)
09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Received Retry
(d6d35793-729c-3cab-bee0-84e971e48675, #0)
09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed
09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG
o.a.c.streaming.ConnectionHandler - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection handler on
/192.168.56.11
09:20:23.811 [STREAM-IN-/192.168.56.11] INFO
o.a.c.streaming.StreamResultFuture - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11 is
complete
09:20:23.812 [STREAM-IN-/192.168.56.11] WARN
o.a.c.streaming.StreamResultFuture - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Stream failed
09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR
o.a.c.streaming.StreamSession - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.7.0_51]
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
~[na:1.7.0_51]
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
~[na:1.7.0_51]
    at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7.0_51]
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
~[na:1.7.0_51]
    at
sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473)
~[na:1.7.0_51]
    at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569)
~[na:1.7.0_51]
    at
org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(CompressedStreamWriter.java:74)
~[cassandra-all-2.1.2.jar:2.1.2]
    at
org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:56)
~[cassandra-all-2.1.2.jar:2.1.2]
    at
org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:40)
~[cassandra-all-2.1.2.jar:2.1.2]
    at
org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:45)
~[cassandra-all-2.1.2.jar:2.1.2]
    at
org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
[cassandra-all-2.1.2.jar:2.1.2]
    at
org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:318)
[cassandra-all-2.1.2.jar:2.1.2]
    at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR
o.a.c.streaming.StreamSession - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.7.0_51]
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
~[na:1.7.0_51]
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
~[na:1.7.0_51]
    at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7.0_51]
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
~[na:1.7.0_51]
    at
org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutputStreamAndChannel.java:48)
~[cassandra-all-2.1.2.jar:2.1.2]
    at
org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:44)
~[cassandra-all-2.1.2.jar:2.1.2]
    at
org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
[cassandra-all-2.1.2.jar:2.1.2]
    at
org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:326)
[cassandra-all-2.1.2.jar:2.1.2]
    at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]

----------------------------------------------------

Here is what I have tried


*Hadoop Driver*

public class CassandraBulkImporter extends Configured implements Tool{

.....

             public static void main(String[] args) throws Exception {
                         int exitCode = ToolRunner.run(new
CassandraBulkImporter(), args);
                         System.exit(exitCode);
             }

             @Override
              public int run(String[] arg0) throws Exception {
                       .......
                       Job job = new Job(conf);
                       ......
                       job.setOutputFormatClass(CqlBulkOutputFormat.class);


ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
"192.168.56.11");

ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"Murmur3Partitioner");

ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");

ConfigHelper.setOutputKeyspace(job.getConfiguration(),
CASSANDRA_KEYSPACE_NAME);
                       ConfigHelper.setOutputColumnFamily(
                                  job.getConfiguration(),
                                 CASSANDRA_KEYSPACE_NAME,
                                 CASSANDRA_TABLE_NAME
                        );
                     //Set the properties for CqlBulkOutputFormat
                     MultipleOutputs.addNamedOutput(job,
CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class, List.class);

CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(),
CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)");

CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguration(),
CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) values
(?,?,?,?,?) ");

                    .....
            }

}

*Reducer Code*

public class ReducerToCassandra extends Reducer<Text, Text, Object,
List<ByteBuffer>> {

  private MultipleOutputs multipleOutputs;

  @SuppressWarnings("unchecked")
   protected void setup(Context context) throws IOException,
InterruptedException {
        multipleOutputs = new MultipleOutputs(context);
   }

  @Override
   public void reduce(Text id, Iterable<Text> pInfo, Context context)
throws IOException, InterruptedException {
       ....
       List<ByteBuffer> bVariables = new ArrayList<ByteBuffer>();

      .....
      multipleOutputs.write(CASSANDRA_TABLE1, null, bVariables);

}

Reply via email to