Thanks Yuki, have created a JIRA ticket https://issues.apache.org/jira/browse/CASSANDRA-8924
On Thu, Mar 5, 2015 at 10:34 AM, Yuki Morishita <mor.y...@gmail.com> wrote: > Thanks. > It looks like a bug. Can you create a ticket on JIRA? > > https://issues.apache.org/jira/browse/CASSANDRA > > On Thu, Mar 5, 2015 at 7:56 AM, Aby Kuruvilla > <aby.kuruvi...@envisagesystems.com> wrote: > > Hi Yuki > > > > Thanks for the reply! > > > > Here is the log from Cassandra server for the stream failure > > > > INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816 > > StreamResultFuture.java:109 - [Stream > #98ba8730-c279-11e4-b8e9-55374d280508 > > ID#0] Creating new streaming plan for Bulk Load > > INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816 > > StreamResultFuture.java:116 - [Stream > #98ba8730-c279-11e4-b8e9-55374d280508, > > ID#0] Received streaming plan for Bulk Load > > INFO [STREAM-INIT-/192.168.56.1:58579] 2015-03-04 09:20:23,819 > > StreamResultFuture.java:116 - [Stream > #98ba8730-c279-11e4-b8e9-55374d280508, > > ID#0] Received streaming plan for Bulk Load > > INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,822 > > StreamResultFuture.java:166 - [Stream > #98ba8730-c279-11e4-b8e9-55374d280508 > > ID#0] Prepare completed. Receiving 1 files(617874 bytes), sending 0 > files(0 > > bytes) > > WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,823 > StreamSession.java:597 > > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Retrying for following > > error > > java.io.IOException: CF d6d35793-729c-3cab-bee0-84e971e48675 was dropped > > during streaming > > at > > > org.apache.cassandra.streaming.compress.CompressedStreamReader.read(CompressedStreamReader.java:71) > > ~[main/:na] > > at > > > org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:48) > > [main/:na] > > at > > > org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:38) > > [main/:na] > > at > > > org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:55) > > [main/:na] > > at > > > org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245) > > [main/:na] > > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] > > ERROR [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,828 > StreamSession.java:477 > > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error occurred > > java.lang.IllegalArgumentException: Unknown type 0 > > at > > > org.apache.cassandra.streaming.messages.StreamMessage$Type.get(StreamMessage.java:89) > > ~[main/:na] > > at > > > org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:54) > > ~[main/:na] > > at > > > org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245) > > ~[main/:na] > > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] > > INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829 > > StreamResultFuture.java:180 - [Stream > #98ba8730-c279-11e4-b8e9-55374d280508] > > Session with /127.0.0.1 is complete > > WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829 > > StreamResultFuture.java:207 - [Stream > #98ba8730-c279-11e4-b8e9-55374d280508] > > Stream failed > > > > > > > > On Wed, Mar 4, 2015 at 1:18 PM, Yuki Morishita <mor.y...@gmail.com> > wrote: > >> > >> Do you have corresponding error in the other side of the stream > >> (/192.168.56.11)? > >> > >> > >> On Wed, Mar 4, 2015 at 9:11 AM, Aby Kuruvilla > >> <aby.kuruvi...@envisagesystems.com> wrote: > >> > I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk > load > >> > data > >> > into Cassandra. Was not able to find any documentation of this new > >> > output > >> > format , but from looking through the code this uses CQLSSTableWriter > to > >> > write SSTable files to disk , which are then streamed to Cassandra > using > >> > SSTableLoader. On running the Hadoop job, I can see that the SSTable > >> > files > >> > do get generated but fails to stream the data out. I get the same > >> > exception > >> > when I try with Cassndra node on "localhost" as well as a remote > >> > Cassandra > >> > cluster. Also I get this exception on C* versions 2.1.1, 2.1.2 and > >> > 2.1.3. > >> > > >> > Relevant portion of logs and stack trace > >> > > >> > 09:20:23.207 [Thread-6] WARN org.apache.cassandra.utils.CLibrary - > JNA > >> > link > >> > failure, one or more native method will be unavailable. > >> > 09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary - > JNA > >> > link > >> > failure details: Error looking up function 'posix_fadvise': > >> > dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found > >> > 09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > >> > Renaming > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db > >> > to > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db > >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > >> > Renaming > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1 > >> > to > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1 > >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > >> > Renaming > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db > >> > to > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db > >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > >> > Renaming > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db > >> > to > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db > >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > >> > Renaming > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db > >> > to > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db > >> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > >> > Renaming > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt > >> > to > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt > >> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - > >> > Renaming > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db > >> > to > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db > >> > 09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load > >> > metadata > >> > for > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1 > >> > 09:20:23.729 [Thread-2] INFO o.a.c.io.sstable.SSTableReader - Opening > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1 > >> > (617874 bytes) > >> > 09:20:23.780 [Thread-2] INFO o.a.c.streaming.StreamResultFuture - > >> > [Stream > >> > #98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for > Bulk > >> > Load > >> > 09:20:23.781 [StreamConnectionEstablisher:1] INFO > >> > o.a.c.streaming.StreamSession - [Stream > >> > #98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to > >> > /192.168.56.11 > >> > 09:20:23.781 [StreamConnectionEstablisher:1] DEBUG > >> > o.a.c.streaming.ConnectionHandler - [Stream > >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for > incoming > >> > stream > >> > 09:20:23.792 [StreamConnectionEstablisher:1] DEBUG > >> > o.a.c.streaming.ConnectionHandler - [Stream > >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for > outgoing > >> > stream > >> > 09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG > >> > o.a.c.streaming.ConnectionHandler - [Stream > >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests, 1 > >> > files} > >> > 09:20:23.795 [StreamConnectionEstablisher:1] INFO > >> > o.a.c.streaming.StreamResultFuture - [Stream > >> > #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed. > Receiving > >> > 0 > >> > files(0 bytes), sending 1 files(617874 bytes) > >> > 09:20:23.799 [StreamConnectionEstablisher:1] INFO > >> > o.a.c.streaming.StreamCoordinator - [Stream > >> > #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session > >> > with > >> > /192.168.56.11 > >> > 09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG > >> > o.a.c.streaming.ConnectionHandler - [Stream > >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId: > >> > d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated keys: > >> > 3072, > >> > transfer size: 617874, compressed?: true, repairedAt: 0), file: > >> > > >> > > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db) > >> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG > >> > o.a.c.streaming.ConnectionHandler - [Stream > >> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Retry > >> > (d6d35793-729c-3cab-bee0-84e971e48675, #0) > >> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG > >> > o.a.c.streaming.ConnectionHandler - [Stream > >> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed > >> > 09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG > >> > o.a.c.streaming.ConnectionHandler - [Stream > >> > #98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection > handler > >> > on > >> > /192.168.56.11 > >> > 09:20:23.811 [STREAM-IN-/192.168.56.11] INFO > >> > o.a.c.streaming.StreamResultFuture - [Stream > >> > #98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11 is > >> > complete > >> > 09:20:23.812 [STREAM-IN-/192.168.56.11] WARN > >> > o.a.c.streaming.StreamResultFuture - [Stream > >> > #98ba8730-c279-11e4-b8e9-55374d280508] Stream failed > >> > 09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR > >> > o.a.c.streaming.StreamSession > >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error > >> > occurred > >> > java.io.IOException: Broken pipe > >> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > >> > ~[na:1.7.0_51] > >> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > >> > ~[na:1.7.0_51] > >> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > >> > ~[na:1.7.0_51] > >> > at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7.0_51] > >> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487) > >> > ~[na:1.7.0_51] > >> > at > >> > > >> > > sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473) > >> > ~[na:1.7.0_51] > >> > at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569) > >> > ~[na:1.7.0_51] > >> > at > >> > > >> > > org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(CompressedStreamWriter.java:74) > >> > ~[cassandra-all-2.1.2.jar:2.1.2] > >> > at > >> > > >> > > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:56) > >> > ~[cassandra-all-2.1.2.jar:2.1.2] > >> > at > >> > > >> > > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:40) > >> > ~[cassandra-all-2.1.2.jar:2.1.2] > >> > at > >> > > >> > > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:45) > >> > ~[cassandra-all-2.1.2.jar:2.1.2] > >> > at > >> > > >> > > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346) > >> > [cassandra-all-2.1.2.jar:2.1.2] > >> > at > >> > > >> > > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:318) > >> > [cassandra-all-2.1.2.jar:2.1.2] > >> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] > >> > 09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR > >> > o.a.c.streaming.StreamSession > >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error > >> > occurred > >> > java.io.IOException: Broken pipe > >> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > >> > ~[na:1.7.0_51] > >> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > >> > ~[na:1.7.0_51] > >> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > >> > ~[na:1.7.0_51] > >> > at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7.0_51] > >> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487) > >> > ~[na:1.7.0_51] > >> > at > >> > > >> > > org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutputStreamAndChannel.java:48) > >> > ~[cassandra-all-2.1.2.jar:2.1.2] > >> > at > >> > > >> > > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:44) > >> > ~[cassandra-all-2.1.2.jar:2.1.2] > >> > at > >> > > >> > > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346) > >> > [cassandra-all-2.1.2.jar:2.1.2] > >> > at > >> > > >> > > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:326) > >> > [cassandra-all-2.1.2.jar:2.1.2] > >> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] > >> > > >> > ---------------------------------------------------- > >> > > >> > Here is what I have tried > >> > > >> > > >> > Hadoop Driver > >> > > >> > public class CassandraBulkImporter extends Configured implements Tool{ > >> > > >> > ..... > >> > > >> > public static void main(String[] args) throws Exception { > >> > int exitCode = ToolRunner.run(new > >> > CassandraBulkImporter(), args); > >> > System.exit(exitCode); > >> > } > >> > > >> > @Override > >> > public int run(String[] arg0) throws Exception { > >> > ....... > >> > Job job = new Job(conf); > >> > ...... > >> > > >> > job.setOutputFormatClass(CqlBulkOutputFormat.class); > >> > > >> > > >> > ConfigHelper.setOutputInitialAddress(job.getConfiguration(), > >> > "192.168.56.11"); > >> > > >> > ConfigHelper.setOutputPartitioner(job.getConfiguration(), > >> > "Murmur3Partitioner"); > >> > > >> > ConfigHelper.setOutputRpcPort(job.getConfiguration(), > >> > "9160"); > >> > > >> > ConfigHelper.setOutputKeyspace(job.getConfiguration(), > >> > CASSANDRA_KEYSPACE_NAME); > >> > ConfigHelper.setOutputColumnFamily( > >> > job.getConfiguration(), > >> > CASSANDRA_KEYSPACE_NAME, > >> > CASSANDRA_TABLE_NAME > >> > ); > >> > //Set the properties for CqlBulkOutputFormat > >> > MultipleOutputs.addNamedOutput(job, > >> > CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class, > >> > List.class); > >> > > >> > CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(), > >> > CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)"); > >> > > >> > > >> > > CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguration(), > >> > CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) values > >> > (?,?,?,?,?) "); > >> > > >> > ..... > >> > } > >> > > >> > } > >> > > >> > Reducer Code > >> > > >> > public class ReducerToCassandra extends Reducer<Text, Text, Object, > >> > List<ByteBuffer>> { > >> > > >> > private MultipleOutputs multipleOutputs; > >> > > >> > @SuppressWarnings("unchecked") > >> > protected void setup(Context context) throws IOException, > >> > InterruptedException { > >> > multipleOutputs = new MultipleOutputs(context); > >> > } > >> > > >> > @Override > >> > public void reduce(Text id, Iterable<Text> pInfo, Context context) > >> > throws > >> > IOException, InterruptedException { > >> > .... > >> > List<ByteBuffer> bVariables = new ArrayList<ByteBuffer>(); > >> > > >> > ..... > >> > multipleOutputs.write(CASSANDRA_TABLE1, null, bVariables); > >> > > >> > } > >> > > >> > > >> > > >> > >> > >> > >> -- > >> Yuki Morishita > >> t:yukim (http://twitter.com/yukim) > > > > > > > > -- > Yuki Morishita > t:yukim (http://twitter.com/yukim) >