hi all, I'm use spark-streaming with spark ,I configure flume like this: a1.channels = c1 a1.sinks = k1 a1.sources = r1 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 33333 a1.sources.r1.type = avro a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
my spark code is like this : def main(args: Array[String]) { if (args.length < 2) { System.err.println( "Usage: FlumeEventCount <host> <port>") System.exit(1) } // val Array(host, new Integer(port)) = args val host=args(0) val port=args(1).toInt val batchInterval = Milliseconds(2000) // Create the context and set the batch size val sparkConf = new SparkConf().setAppName("FlumeEventCount") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() ssc.start() ssc.awaitTermination() } when I run this with local model it works okay the steps is like this first run spart-streaming jobs by spark-submit --master local[8] --class com.nd.test.FlumeEventCount simple-projectnew-1.2.1-jar-with-dependencies.jar localhost 3333 then start flume by flume-ng agent -c ../conf -f ./sparkflum.conf -n a1 -Dflume.root.logger=INFO,console at last send data by flume-ng avro-client --conf ./conf -H localhost -p 44444 -F ./example.conf -Dflume.root.logger=DEBUG,console it works okay but when I run spark-streaming job with yarn-client or yarn cluster ERRORS happen the error message is like this : 15/07/16 14:36:36 ERROR receiver.ReceiverSupervisorImpl: Stopped executor with error: org.jboss.netty.channel.ChannelException: Failed to bind to: sparktest/192.168.1.17:33333 15/07/16 14:36:36 ERROR executor.Executor: Exception in task 0.0 in stage 8.0 (TID 74) org.jboss.netty.channel.ChannelException: Failed to bind to: sparktest/192.168.1.17:33333 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:74) at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:68) at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164) at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:288) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:280) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Caused by: java.net.BindException: Cannot assign requested address at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:344) at sun.nio.ch.Net.bind(Net.java:336) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:199) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296) at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) and flume also have errors error message like this : [ERROR - org.apache.flume.SinkRunner$PollingRun ner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: Failed to send events at org.apache.flume.sink.AvroSink.process(AvroSink.java:325) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:722) Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient { host: sparktest.com, port: 33333 }: RPC conne ction error at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:117) at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:93) at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:507) at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88) at org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:182) at org.apache.flume.sink.AvroSink.verifyConnection(AvroSink.java:222) at org.apache.flume.sink.AvroSink.process(AvroSink.java:282) ... 3 more Caused by: java.io.IOException: Error connecting to /192.168.1.17:33333 at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261) at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203) at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152) at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:106) ... 9 more Dose any body know what happens ,what can I do ,thanks!