Hi Yuki

Thanks for the reply!

Here is the log from Cassandra server for the stream failure

INFO  [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816
StreamResultFuture.java:109 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508
ID#0] Creating new streaming plan for Bulk Load
INFO  [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816
StreamResultFuture.java:116 - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Received streaming plan for
Bulk Load
INFO  [STREAM-INIT-/192.168.56.1:58579] 2015-03-04 09:20:23,819
StreamResultFuture.java:116 - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Received streaming plan for
Bulk Load
INFO  [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,822
StreamResultFuture.java:166 - [Stream #98ba8730-c279-11e4-b8e9-55374d280508
ID#0] Prepare completed. Receiving 1 files(617874 bytes), sending 0 files(0
bytes)
WARN  [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,823 StreamSession.java:597
- [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Retrying for following
error
java.io.IOException: CF d6d35793-729c-3cab-bee0-84e971e48675 was dropped
during streaming
        at
org.apache.cassandra.streaming.compress.CompressedStreamReader.read(CompressedStreamReader.java:71)
~[main/:na]
        at
org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:48)
[main/:na]
        at
org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:38)
[main/:na]
        at
org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:55)
[main/:na]
        at
org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
[main/:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
ERROR [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,828 StreamSession.java:477
- [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
java.lang.IllegalArgumentException: Unknown type 0
        at
org.apache.cassandra.streaming.messages.StreamMessage$Type.get(StreamMessage.java:89)
~[main/:na]
        at
org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:54)
~[main/:na]
        at
org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245)
~[main/:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
INFO  [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829
StreamResultFuture.java:180 - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Session with /127.0.0.1 is complete
WARN  [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829
StreamResultFuture.java:207 - [Stream
#98ba8730-c279-11e4-b8e9-55374d280508] Stream failed



On Wed, Mar 4, 2015 at 1:18 PM, Yuki Morishita <mor.y...@gmail.com> wrote:

> Do you have corresponding error in the other side of the stream
> (/192.168.56.11)?
>
>
> On Wed, Mar 4, 2015 at 9:11 AM, Aby Kuruvilla
> <aby.kuruvi...@envisagesystems.com> wrote:
> > I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk load
> data
> > into Cassandra.  Was not able to find any documentation of this new
> output
> > format , but from looking through the code this uses CQLSSTableWriter to
> > write SSTable files to disk , which are then streamed to Cassandra using
> > SSTableLoader. On running the Hadoop job, I can see that the SSTable
> files
> > do get generated but fails to stream the data out. I get the same
> exception
> > when I try with Cassndra node on "localhost" as well as a remote
> Cassandra
> > cluster. Also I get this exception on C* versions 2.1.1,  2.1.2 and
> 2.1.3.
> >
> > Relevant portion of logs and stack trace
> >
> > 09:20:23.207 [Thread-6] WARN  org.apache.cassandra.utils.CLibrary - JNA
> link
> > failure, one or more native method will be unavailable.
> > 09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary - JNA
> link
> > failure details: Error looking up function 'posix_fadvise':
> > dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found
> > 09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db
> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1
> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db
> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db
> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db
> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt
> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils -
> > Renaming
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db
> > to
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db
> > 09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load
> metadata
> > for
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
> > 09:20:23.729 [Thread-2] INFO  o.a.c.io.sstable.SSTableReader - Opening
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1
> > (617874 bytes)
> > 09:20:23.780 [Thread-2] INFO  o.a.c.streaming.StreamResultFuture -
> [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for Bulk
> > Load
> > 09:20:23.781 [StreamConnectionEstablisher:1] INFO
> > o.a.c.streaming.StreamSession - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to /
> 192.168.56.11
> > 09:20:23.781 [StreamConnectionEstablisher:1] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for incoming
> > stream
> > 09:20:23.792 [StreamConnectionEstablisher:1] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for outgoing
> > stream
> > 09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests,  1
> > files}
> > 09:20:23.795 [StreamConnectionEstablisher:1] INFO
> > o.a.c.streaming.StreamResultFuture - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed. Receiving
> 0
> > files(0 bytes), sending 1 files(617874 bytes)
> > 09:20:23.799 [StreamConnectionEstablisher:1] INFO
> > o.a.c.streaming.StreamCoordinator - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session
> with
> > /192.168.56.11
> > 09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId:
> > d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated keys:
> 3072,
> > transfer size: 617874, compressed?: true, repairedAt: 0), file:
> >
> /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db)
> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Retry
> > (d6d35793-729c-3cab-bee0-84e971e48675, #0)
> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed
> > 09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG
> > o.a.c.streaming.ConnectionHandler - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection handler
> on
> > /192.168.56.11
> > 09:20:23.811 [STREAM-IN-/192.168.56.11] INFO
> > o.a.c.streaming.StreamResultFuture - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11 is
> > complete
> > 09:20:23.812 [STREAM-IN-/192.168.56.11] WARN
> > o.a.c.streaming.StreamResultFuture - [Stream
> > #98ba8730-c279-11e4-b8e9-55374d280508] Stream failed
> > 09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR
> o.a.c.streaming.StreamSession
> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
> > java.io.IOException: Broken pipe
> >     at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.7.0_51]
> >     at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> > ~[na:1.7.0_51]
> >     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> > ~[na:1.7.0_51]
> >     at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7.0_51]
> >     at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
> > ~[na:1.7.0_51]
> >     at
> >
> sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473)
> > ~[na:1.7.0_51]
> >     at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569)
> > ~[na:1.7.0_51]
> >     at
> >
> org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(CompressedStreamWriter.java:74)
> > ~[cassandra-all-2.1.2.jar:2.1.2]
> >     at
> >
> org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:56)
> > ~[cassandra-all-2.1.2.jar:2.1.2]
> >     at
> >
> org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:40)
> > ~[cassandra-all-2.1.2.jar:2.1.2]
> >     at
> >
> org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:45)
> > ~[cassandra-all-2.1.2.jar:2.1.2]
> >     at
> >
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
> > [cassandra-all-2.1.2.jar:2.1.2]
> >     at
> >
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:318)
> > [cassandra-all-2.1.2.jar:2.1.2]
> >     at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
> > 09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR
> o.a.c.streaming.StreamSession
> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred
> > java.io.IOException: Broken pipe
> >     at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.7.0_51]
> >     at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> > ~[na:1.7.0_51]
> >     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> > ~[na:1.7.0_51]
> >     at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7.0_51]
> >     at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487)
> > ~[na:1.7.0_51]
> >     at
> >
> org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutputStreamAndChannel.java:48)
> > ~[cassandra-all-2.1.2.jar:2.1.2]
> >     at
> >
> org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:44)
> > ~[cassandra-all-2.1.2.jar:2.1.2]
> >     at
> >
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346)
> > [cassandra-all-2.1.2.jar:2.1.2]
> >     at
> >
> org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:326)
> > [cassandra-all-2.1.2.jar:2.1.2]
> >     at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
> >
> > ----------------------------------------------------
> >
> > Here is what I have tried
> >
> >
> > Hadoop Driver
> >
> > public class CassandraBulkImporter extends Configured implements Tool{
> >
> > .....
> >
> >              public static void main(String[] args) throws Exception {
> >                          int exitCode = ToolRunner.run(new
> > CassandraBulkImporter(), args);
> >                          System.exit(exitCode);
> >              }
> >
> >              @Override
> >               public int run(String[] arg0) throws Exception {
> >                        .......
> >                        Job job = new Job(conf);
> >                        ......
> >
> job.setOutputFormatClass(CqlBulkOutputFormat.class);
> >
> >
> > ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
> > "192.168.56.11");
> >
> > ConfigHelper.setOutputPartitioner(job.getConfiguration(),
> > "Murmur3Partitioner");
> >
> ConfigHelper.setOutputRpcPort(job.getConfiguration(),
> > "9160");
> >
> > ConfigHelper.setOutputKeyspace(job.getConfiguration(),
> > CASSANDRA_KEYSPACE_NAME);
> >                        ConfigHelper.setOutputColumnFamily(
> >                                   job.getConfiguration(),
> >                                  CASSANDRA_KEYSPACE_NAME,
> >                                  CASSANDRA_TABLE_NAME
> >                         );
> >                      //Set the properties for CqlBulkOutputFormat
> >                      MultipleOutputs.addNamedOutput(job,
> > CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class,
> List.class);
> >
> > CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(),
> > CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)");
> >
> >
> CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguration(),
> > CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) values
> > (?,?,?,?,?) ");
> >
> >                     .....
> >             }
> >
> > }
> >
> > Reducer Code
> >
> > public class ReducerToCassandra extends Reducer<Text, Text, Object,
> > List<ByteBuffer>> {
> >
> >   private MultipleOutputs multipleOutputs;
> >
> >   @SuppressWarnings("unchecked")
> >    protected void setup(Context context) throws IOException,
> > InterruptedException {
> >         multipleOutputs = new MultipleOutputs(context);
> >    }
> >
> >   @Override
> >    public void reduce(Text id, Iterable<Text> pInfo, Context context)
> throws
> > IOException, InterruptedException {
> >        ....
> >        List<ByteBuffer> bVariables = new ArrayList<ByteBuffer>();
> >
> >       .....
> >       multipleOutputs.write(CASSANDRA_TABLE1, null, bVariables);
> >
> > }
> >
> >
> >
>
>
>
> --
> Yuki Morishita
>  t:yukim (http://twitter.com/yukim)
>

Reply via email to