Hey guys, so the problem i'm trying to tackle is the following: - I need a data source that emits messages at a certain frequency - There are N neural nets that need to process each message individually - The outputs from all neural nets are aggregated and only when all N outputs for each message are collected, should a message be declared fully processed - At the end i should measure the time it took for a message to be fully processed (time between when it was emitted and when all N neural net outputs from that message have been collected)
What i'm mostly interested in is if i approached the problem correctly in the first place and if so some best practice pointers on my approach. And my current implementation if the following: For a data source i created the class public class JavaRandomReceiver extends Receiver<Map<String, Object>> As i decided a key-value store would be best suited to holding emitted data. The onStart() method initializes a custom random sequence generator and starts a thread that continuously generates new neural net inputs and stores them as following: SensorData sdata = generator.createSensorData(); Map<String, Object> result = new HashMap<String, Object>(); result.put("msgNo", sdata.getMsgNo()); result.put("sensorTime", sdata.getSampleTime()); result.put("list", sdata.getPayload()); result.put("timeOfProc", sdata.getCreationTime()); store(result); // sleeps for a given amount of time set at generator creation generator.waitForNextTuple(); The msgNo here is incremented for each newly created message and is used to keep The neural net functionality is added by creating a custom mapper public class NeuralNetMapper implements Function<Map<String, Object>, Map<String, Object>> whose call function basically just takes the input map, plugs its "list" object as the input to the neural net object, replaces the map's initial list with the neural net output and returns the modified map. The aggregator is implemented as a single class that has the following form public class JavaSyncBarrier implements Function<JavaRDD<Map<String,Object>>, Void> This class maintains a google guava cache of neural net outputs that it has received in the form of <Long, List<Map<String, Object>>>, where the Long value is the msgNo and the list contains all maps containing said message number. When a new map is received, it is added to the cache, its list's length is compared to to the total number of neural nets and, if these numbers match, that message number is said to be fully processed and a difference between timeOfProc (all maps with the same msgNo have the same timeOfProc) and the current system time is displayed as the total time necessary for processing. Now the way all these components are linked together is the following: public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("SimpleSparkStreamingTest"); JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1000)); jssc.checkpoint("/tmp/spark-tempdir"); // Generator config goes here // Set to emit new message every 1 second // --- // Neural net config goes here // --- JavaReceiverInputDStream<Map<String, Object>> rndLists = jssc .receiverStream(new JavaRandomReceiver(generatorConfig); List<JavaDStream<Map<String, Object>>> neuralNetOutputStreams = new ArrayList<JavaDStream<Map<String, Object>>>(); for(int i = 0; i < numberOfNets; i++){ neuralNetOutputStreams .add( rndLists.map(new NeuralNetMapper(neuralNetConfig)) ); } JavaDStream<Map<String, Object>> joined = joinStreams(neuralNetOutputs); joined.foreach(new JavaSyncBarrier(numberOfNets)); jssc.start(); jssc.awaitTermination(); } where joinStreams unifies a list of streams: public static <T> JavaDStream<T> joinStreams(List<JavaDStream<T>> streams) { JavaDStream<T> result = streams.get(0); for (int i = 1; i < streams.size(); i++) { result = result.union(streams.get(i)); } return result; } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-tp12893.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org