Hi,
I am new to Spark. I have written custom rabbit mq receiver which calls the
store method of Receiver interface. I can see that the block is being
stored. I am trying to process each rdd in the dstream using the foreach
function, but am unable to figure out why this block is not getting invoked
with a populated rdd even after the data is stored (by the receiver).
/This code initialises the stream:/
public void testListeningToStateChannel() throws IOException {
// Create the context with a 1 second batch size
SparkConf sparkConf = new
SparkConf().setAppName("JavaCustomReceiver");
sparkConf.setMaster("local");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
Duration(1000));
JavaDStream<String> customReceiverStream = ssc.receiverStream(new
StateBusListener("abcde"));
customReceiverStream.foreachRDD(doSomething);
customReceiverStream.print();
ssc.start();
System.out.println("started :");
ssc.awaitTermination();
}
public static Function2<JavaRDD<String>, Time, Void> doSomething =
new Function2<JavaRDD<String>, Time, Void>() {
@Override
public Void call(JavaRDD<String> v1, Time v2) throws Exception {
System.out.println(v1.toString());
if (!v1.collect().isEmpty()) {
long count = v1.count();
System.out.println("found :" + count);
}
return null;
}
};
/This is the receiver :/
public class StateBusListener extends Receiver<String> {
private String QUEUE_NAME;
private Channel stateBusChannel;
private Connection connection = null;
public StateBusListener(String queue_name) {
super(StorageLevel.MEMORY_AND_DISK_2());
QUEUE_NAME = queue_name;
}
@Override
public void onStart() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
connection = factory.newConnection();
stateBusChannel = connection.createChannel();
stateBusChannel.queueDeclare("abcde", false, false, false,
null);
System.out.println("Started!");
new Thread() {
@Override
public void run() {
try {
receive();
} catch (IOException e) {
restart("Could not connect", e);
} catch (InterruptedException e) {
restart("Error receiving data", e);
}
}
}.start();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onStop() {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void receive() throws IOException, InterruptedException {
QueueingConsumer consumer = new QueueingConsumer(stateBusChannel);
stateBusChannel.basicConsume(QUEUE_NAME, true, consumer);
List<String> messages= Lists.newArrayList();
while (!isStopped()) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
messages.add(message);
if(messages.size()>5){
store(messages.iterator());
messages.clear();
}
}
restart("Trying to connect again");
}
}
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-not-getting-generated-tp15439.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]