Hey guys, so the problem i'm trying to tackle is the following:

- I need a data source that emits messages at a certain frequency
- There are N neural nets that need to process each message individually
- The outputs from all neural nets are aggregated and only when all N
outputs for each message are collected, should a message be declared fully
processed
- At the end i should measure the time it took for a message to be fully
processed (time between when it was emitted and when all N neural net
outputs from that message have been collected)


What i'm mostly interested in is if i approached the problem correctly in
the first place and if so some best practice pointers on my approach.






And my current implementation if the following:


For a data source i created the class
    public class JavaRandomReceiver extends Receiver<Map&lt;String, Object>>

As i decided a key-value store would be best suited to holding emitted data.


The onStart() method initializes a custom random sequence generator and
starts a thread that
continuously generates new neural net inputs and stores them as following:

    SensorData sdata = generator.createSensorData();

    Map<String, Object> result = new HashMap<String, Object>();
                                
    result.put("msgNo", sdata.getMsgNo());
    result.put("sensorTime", sdata.getSampleTime());
    result.put("list", sdata.getPayload());
    result.put("timeOfProc", sdata.getCreationTime());

    store(result);

    // sleeps for a given amount of time set at generator creation
    generator.waitForNextTuple();

The msgNo here is incremented for each newly created message and is used to
keep 


The neural net functionality is added by creating a custom mapper
    public class NeuralNetMapper implements Function<Map&lt;String, Object>,
Map<String, Object>>

whose call function basically just takes the input map, plugs its "list"
object as the input to the neural net object, replaces the map's initial
list with the neural net output and returns the modified map.




The aggregator is implemented as a single class that has the following form

    public class JavaSyncBarrier implements
Function<JavaRDD&lt;Map&lt;String,Object>>, Void>



This class maintains a google guava cache of neural net outputs that it has
received in the form of
<Long, List&lt;Map&lt;String, Object>>>, where the Long value is the msgNo
and the list contains all maps containing said message number.

When a new map is received, it is added to the cache, its list's length is
compared to to the total number of neural nets and, if these numbers match,
that message number is said to be fully processed and a difference between
timeOfProc (all maps with the same msgNo have the same timeOfProc) and the
current system time is displayed as the total time necessary for processing.





Now the way all these components are linked together is the following:

public static void main(String[] args) {


        SparkConf conf = new SparkConf();
        conf.setAppName("SimpleSparkStreamingTest");
                
                
        JavaStreamingContext jssc = new JavaStreamingContext(conf, new
Duration(1000));

        jssc.checkpoint("/tmp/spark-tempdir");

        // Generator config goes here
        // Set to emit new message every 1 second
        // ---

        // Neural net config goes here
        // ---          

        JavaReceiverInputDStream<Map&lt;String, Object>> rndLists = jssc
                .receiverStream(new JavaRandomReceiver(generatorConfig);

        List<JavaDStream&lt;Map&lt;String, Object>>> neuralNetOutputStreams = 
new
ArrayList<JavaDStream&lt;Map&lt;String, Object>>>();
                
        for(int i = 0; i < numberOfNets; i++){
                        
                neuralNetOutputStreams .add(
                        rndLists.map(new NeuralNetMapper(neuralNetConfig))
                );
        }

        JavaDStream<Map&lt;String, Object>> joined = 
joinStreams(neuralNetOutputs);
                
        joined.foreach(new JavaSyncBarrier(numberOfNets));

        jssc.start();
        jssc.awaitTermination();
}

where joinStreams unifies a list of streams:
        public static <T> JavaDStream<T> joinStreams(List<JavaDStream&lt;T>>
streams) {

                JavaDStream<T> result = streams.get(0);
                for (int i = 1; i < streams.size(); i++) {
                        result = result.union(streams.get(i));
                }

                return result;
        }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-tp12893.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to