Thanks! On Thu, Mar 5, 2015 at 11:10 AM, Aby Kuruvilla <aby.kuruvi...@envisagesystems.com> wrote: > Thanks Yuki, have created a JIRA ticket > > https://issues.apache.org/jira/browse/CASSANDRA-8924 > > On Thu, Mar 5, 2015 at 10:34 AM, Yuki Morishita <mor.y...@gmail.com> wrote: >> >> Thanks. >> It looks like a bug. Can you create a ticket on JIRA? >> >> https://issues.apache.org/jira/browse/CASSANDRA >> >> On Thu, Mar 5, 2015 at 7:56 AM, Aby Kuruvilla >> <aby.kuruvi...@envisagesystems.com> wrote: >> > Hi Yuki >> > >> > Thanks for the reply! >> > >> > Here is the log from Cassandra server for the stream failure >> > >> > INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816 >> > StreamResultFuture.java:109 - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508 >> > ID#0] Creating new streaming plan for Bulk Load >> > INFO [STREAM-INIT-/192.168.56.1:58578] 2015-03-04 09:20:23,816 >> > StreamResultFuture.java:116 - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508, >> > ID#0] Received streaming plan for Bulk Load >> > INFO [STREAM-INIT-/192.168.56.1:58579] 2015-03-04 09:20:23,819 >> > StreamResultFuture.java:116 - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508, >> > ID#0] Received streaming plan for Bulk Load >> > INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,822 >> > StreamResultFuture.java:166 - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508 >> > ID#0] Prepare completed. Receiving 1 files(617874 bytes), sending 0 >> > files(0 >> > bytes) >> > WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,823 >> > StreamSession.java:597 >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Retrying for following >> > error >> > java.io.IOException: CF d6d35793-729c-3cab-bee0-84e971e48675 was dropped >> > during streaming >> > at >> > >> > org.apache.cassandra.streaming.compress.CompressedStreamReader.read(CompressedStreamReader.java:71) >> > ~[main/:na] >> > at >> > >> > org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:48) >> > [main/:na] >> > at >> > >> > org.apache.cassandra.streaming.messages.IncomingFileMessage$1.deserialize(IncomingFileMessage.java:38) >> > [main/:na] >> > at >> > >> > org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:55) >> > [main/:na] >> > at >> > >> > org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245) >> > [main/:na] >> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >> > ERROR [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,828 >> > StreamSession.java:477 >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error >> > occurred >> > java.lang.IllegalArgumentException: Unknown type 0 >> > at >> > >> > org.apache.cassandra.streaming.messages.StreamMessage$Type.get(StreamMessage.java:89) >> > ~[main/:na] >> > at >> > >> > org.apache.cassandra.streaming.messages.StreamMessage.deserialize(StreamMessage.java:54) >> > ~[main/:na] >> > at >> > >> > org.apache.cassandra.streaming.ConnectionHandler$IncomingMessageHandler.run(ConnectionHandler.java:245) >> > ~[main/:na] >> > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65] >> > INFO [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829 >> > StreamResultFuture.java:180 - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508] >> > Session with /127.0.0.1 is complete >> > WARN [STREAM-IN-/127.0.0.1] 2015-03-04 09:20:23,829 >> > StreamResultFuture.java:207 - [Stream >> > #98ba8730-c279-11e4-b8e9-55374d280508] >> > Stream failed >> > >> > >> > >> > On Wed, Mar 4, 2015 at 1:18 PM, Yuki Morishita <mor.y...@gmail.com> >> > wrote: >> >> >> >> Do you have corresponding error in the other side of the stream >> >> (/192.168.56.11)? >> >> >> >> >> >> On Wed, Mar 4, 2015 at 9:11 AM, Aby Kuruvilla >> >> <aby.kuruvi...@envisagesystems.com> wrote: >> >> > I am trying to use the CqlBulkOutputFormat in a Hadoop job to bulk >> >> > load >> >> > data >> >> > into Cassandra. Was not able to find any documentation of this new >> >> > output >> >> > format , but from looking through the code this uses CQLSSTableWriter >> >> > to >> >> > write SSTable files to disk , which are then streamed to Cassandra >> >> > using >> >> > SSTableLoader. On running the Hadoop job, I can see that the SSTable >> >> > files >> >> > do get generated but fails to stream the data out. I get the same >> >> > exception >> >> > when I try with Cassndra node on "localhost" as well as a remote >> >> > Cassandra >> >> > cluster. Also I get this exception on C* versions 2.1.1, 2.1.2 and >> >> > 2.1.3. >> >> > >> >> > Relevant portion of logs and stack trace >> >> > >> >> > 09:20:23.207 [Thread-6] WARN org.apache.cassandra.utils.CLibrary - >> >> > JNA >> >> > link >> >> > failure, one or more native method will be unavailable. >> >> > 09:20:23.208 [Thread-6] DEBUG org.apache.cassandra.utils.CLibrary - >> >> > JNA >> >> > link >> >> > failure details: Error looking up function 'posix_fadvise': >> >> > dlsym(0x7fff6ab8a5e0, posix_fadvise): symbol not found >> >> > 09:20:23.504 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> >> > Renaming >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Filter.db >> >> > to >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Filter.db >> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> >> > Renaming >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Digest.sha1 >> >> > to >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Digest.sha1 >> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> >> > Renaming >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Statistics.db >> >> > to >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Statistics.db >> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> >> > Renaming >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Index.db >> >> > to >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Index.db >> >> > 09:20:23.505 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> >> > Renaming >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-CompressionInfo.db >> >> > to >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-CompressionInfo.db >> >> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> >> > Renaming >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-TOC.txt >> >> > to >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-TOC.txt >> >> > 09:20:23.506 [Thread-6] DEBUG o.apache.cassandra.io.util.FileUtils - >> >> > Renaming >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-tmp-ka-1-Data.db >> >> > to >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db >> >> > 09:20:23.727 [Thread-2] DEBUG o.a.c.i.s.m.MetadataSerializer - Load >> >> > metadata >> >> > for >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1 >> >> > 09:20:23.729 [Thread-2] INFO o.a.c.io.sstable.SSTableReader - >> >> > Opening >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1 >> >> > (617874 bytes) >> >> > 09:20:23.780 [Thread-2] INFO o.a.c.streaming.StreamResultFuture - >> >> > [Stream >> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Executing streaming plan for >> >> > Bulk >> >> > Load >> >> > 09:20:23.781 [StreamConnectionEstablisher:1] INFO >> >> > o.a.c.streaming.StreamSession - [Stream >> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Starting streaming to >> >> > /192.168.56.11 >> >> > 09:20:23.781 [StreamConnectionEstablisher:1] DEBUG >> >> > o.a.c.streaming.ConnectionHandler - [Stream >> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for >> >> > incoming >> >> > stream >> >> > 09:20:23.792 [StreamConnectionEstablisher:1] DEBUG >> >> > o.a.c.streaming.ConnectionHandler - [Stream >> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending stream init for >> >> > outgoing >> >> > stream >> >> > 09:20:23.794 [STREAM-OUT-/192.168.56.11] DEBUG >> >> > o.a.c.streaming.ConnectionHandler - [Stream >> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending Prepare (0 requests, >> >> > 1 >> >> > files} >> >> > 09:20:23.795 [StreamConnectionEstablisher:1] INFO >> >> > o.a.c.streaming.StreamResultFuture - [Stream >> >> > #98ba8730-c279-11e4-b8e9-55374d280508 ID#0] Prepare completed. >> >> > Receiving >> >> > 0 >> >> > files(0 bytes), sending 1 files(617874 bytes) >> >> > 09:20:23.799 [StreamConnectionEstablisher:1] INFO >> >> > o.a.c.streaming.StreamCoordinator - [Stream >> >> > #98ba8730-c279-11e4-b8e9-55374d280508, ID#0] Beginning stream session >> >> > with >> >> > /192.168.56.11 >> >> > 09:20:23.799 [STREAM-OUT-/192.168.56.11] DEBUG >> >> > o.a.c.streaming.ConnectionHandler - [Stream >> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Sending File (Header (cfId: >> >> > d6d35793-729c-3cab-bee0-84e971e48675, #0, version: ka, estimated >> >> > keys: >> >> > 3072, >> >> > transfer size: 617874, compressed?: true, repairedAt: 0), file: >> >> > >> >> > >> >> > /var/folders/bb/c4416mx95xsbb11jx5g5zq15ddhhh4/T/dev/participant-262ce044-0a2d-48f4-9baa-ad4d626e743a/dev-participant-ka-1-Data.db) >> >> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG >> >> > o.a.c.streaming.ConnectionHandler - [Stream >> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Retry >> >> > (d6d35793-729c-3cab-bee0-84e971e48675, #0) >> >> > 09:20:23.808 [STREAM-IN-/192.168.56.11] DEBUG >> >> > o.a.c.streaming.ConnectionHandler - [Stream >> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Received Session Failed >> >> > 09:20:23.809 [STREAM-IN-/192.168.56.11] DEBUG >> >> > o.a.c.streaming.ConnectionHandler - [Stream >> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Closing stream connection >> >> > handler >> >> > on >> >> > /192.168.56.11 >> >> > 09:20:23.811 [STREAM-IN-/192.168.56.11] INFO >> >> > o.a.c.streaming.StreamResultFuture - [Stream >> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Session with /192.168.56.11 is >> >> > complete >> >> > 09:20:23.812 [STREAM-IN-/192.168.56.11] WARN >> >> > o.a.c.streaming.StreamResultFuture - [Stream >> >> > #98ba8730-c279-11e4-b8e9-55374d280508] Stream failed >> >> > 09:20:23.815 [STREAM-OUT-/192.168.56.11] ERROR >> >> > o.a.c.streaming.StreamSession >> >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error >> >> > occurred >> >> > java.io.IOException: Broken pipe >> >> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) >> >> > ~[na:1.7.0_51] >> >> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) >> >> > ~[na:1.7.0_51] >> >> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) >> >> > ~[na:1.7.0_51] >> >> > at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.7.0_51] >> >> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487) >> >> > ~[na:1.7.0_51] >> >> > at >> >> > >> >> > >> >> > sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:473) >> >> > ~[na:1.7.0_51] >> >> > at >> >> > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:569) >> >> > ~[na:1.7.0_51] >> >> > at >> >> > >> >> > >> >> > org.apache.cassandra.streaming.compress.CompressedStreamWriter.write(CompressedStreamWriter.java:74) >> >> > ~[cassandra-all-2.1.2.jar:2.1.2] >> >> > at >> >> > >> >> > >> >> > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:56) >> >> > ~[cassandra-all-2.1.2.jar:2.1.2] >> >> > at >> >> > >> >> > >> >> > org.apache.cassandra.streaming.messages.OutgoingFileMessage$1.serialize(OutgoingFileMessage.java:40) >> >> > ~[cassandra-all-2.1.2.jar:2.1.2] >> >> > at >> >> > >> >> > >> >> > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:45) >> >> > ~[cassandra-all-2.1.2.jar:2.1.2] >> >> > at >> >> > >> >> > >> >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346) >> >> > [cassandra-all-2.1.2.jar:2.1.2] >> >> > at >> >> > >> >> > >> >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:318) >> >> > [cassandra-all-2.1.2.jar:2.1.2] >> >> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] >> >> > 09:20:23.816 [STREAM-OUT-/192.168.56.11] ERROR >> >> > o.a.c.streaming.StreamSession >> >> > - [Stream #98ba8730-c279-11e4-b8e9-55374d280508] Streaming error >> >> > occurred >> >> > java.io.IOException: Broken pipe >> >> > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) >> >> > ~[na:1.7.0_51] >> >> > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) >> >> > ~[na:1.7.0_51] >> >> > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) >> >> > ~[na:1.7.0_51] >> >> > at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.7.0_51] >> >> > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:487) >> >> > ~[na:1.7.0_51] >> >> > at >> >> > >> >> > >> >> > org.apache.cassandra.io.util.DataOutputStreamAndChannel.write(DataOutputStreamAndChannel.java:48) >> >> > ~[cassandra-all-2.1.2.jar:2.1.2] >> >> > at >> >> > >> >> > >> >> > org.apache.cassandra.streaming.messages.StreamMessage.serialize(StreamMessage.java:44) >> >> > ~[cassandra-all-2.1.2.jar:2.1.2] >> >> > at >> >> > >> >> > >> >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.sendMessage(ConnectionHandler.java:346) >> >> > [cassandra-all-2.1.2.jar:2.1.2] >> >> > at >> >> > >> >> > >> >> > org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:326) >> >> > [cassandra-all-2.1.2.jar:2.1.2] >> >> > at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] >> >> > >> >> > ---------------------------------------------------- >> >> > >> >> > Here is what I have tried >> >> > >> >> > >> >> > Hadoop Driver >> >> > >> >> > public class CassandraBulkImporter extends Configured implements >> >> > Tool{ >> >> > >> >> > ..... >> >> > >> >> > public static void main(String[] args) throws Exception >> >> > { >> >> > int exitCode = ToolRunner.run(new >> >> > CassandraBulkImporter(), args); >> >> > System.exit(exitCode); >> >> > } >> >> > >> >> > @Override >> >> > public int run(String[] arg0) throws Exception { >> >> > ....... >> >> > Job job = new Job(conf); >> >> > ...... >> >> > >> >> > job.setOutputFormatClass(CqlBulkOutputFormat.class); >> >> > >> >> > >> >> > ConfigHelper.setOutputInitialAddress(job.getConfiguration(), >> >> > "192.168.56.11"); >> >> > >> >> > ConfigHelper.setOutputPartitioner(job.getConfiguration(), >> >> > "Murmur3Partitioner"); >> >> > >> >> > ConfigHelper.setOutputRpcPort(job.getConfiguration(), >> >> > "9160"); >> >> > >> >> > ConfigHelper.setOutputKeyspace(job.getConfiguration(), >> >> > CASSANDRA_KEYSPACE_NAME); >> >> > ConfigHelper.setOutputColumnFamily( >> >> > job.getConfiguration(), >> >> > CASSANDRA_KEYSPACE_NAME, >> >> > CASSANDRA_TABLE_NAME >> >> > ); >> >> > //Set the properties for CqlBulkOutputFormat >> >> > MultipleOutputs.addNamedOutput(job, >> >> > CASSANDRA_TABLE_NAME, CqlBulkOutputFormat.class, Object.class, >> >> > List.class); >> >> > >> >> > CqlBulkOutputFormat.setColumnFamilySchema(job.getConfiguration(), >> >> > CASSANDRA_TABLE_NAME, "CREATE TABLE dev.participant(........)"); >> >> > >> >> > >> >> > >> >> > CqlBulkOutputFormat.setColumnFamilyInsertStatement(job.getConfiguration(), >> >> > CASSANDRA_TABLE_NAME, "INSERT into dev.participant(........) values >> >> > (?,?,?,?,?) "); >> >> > >> >> > ..... >> >> > } >> >> > >> >> > } >> >> > >> >> > Reducer Code >> >> > >> >> > public class ReducerToCassandra extends Reducer<Text, Text, Object, >> >> > List<ByteBuffer>> { >> >> > >> >> > private MultipleOutputs multipleOutputs; >> >> > >> >> > @SuppressWarnings("unchecked") >> >> > protected void setup(Context context) throws IOException, >> >> > InterruptedException { >> >> > multipleOutputs = new MultipleOutputs(context); >> >> > } >> >> > >> >> > @Override >> >> > public void reduce(Text id, Iterable<Text> pInfo, Context context) >> >> > throws >> >> > IOException, InterruptedException { >> >> > .... >> >> > List<ByteBuffer> bVariables = new ArrayList<ByteBuffer>(); >> >> > >> >> > ..... >> >> > multipleOutputs.write(CASSANDRA_TABLE1, null, bVariables); >> >> > >> >> > } >> >> > >> >> > >> >> > >> >> >> >> >> >> >> >> -- >> >> Yuki Morishita >> >> t:yukim (http://twitter.com/yukim) >> > >> > >> >> >> >> -- >> Yuki Morishita >> t:yukim (http://twitter.com/yukim) > >
-- Yuki Morishita t:yukim (http://twitter.com/yukim)