Is there a way i can control the sequence spouts and bolts are generated? I am 
using Hazelcast a distributed data collection and in the spout constructor i am 
generating some hash functions that will be used by the bolts. However it looks 
like bolts are initialized before spout, or before hash functions generation 
thus getting empy hash functions. Universal window object when initialized in 
constructor reads HazelCast collection which is empty. Hasn’t got any values 
yet.  My spout, bolt and UniversalWindow class code follows:

package tuc.LSH.storm.spouts;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;
import tuc.LSH.core.hashfunctions.HashFunctionsGen;

import javax.rmi.CORBA.Util;
import java.io.*;
import java.util.Map;
import java.util.Scanner;

/**
 * Created by mixtou on 15/5/15.
 */
public class FileReaderSpout extends BaseRichSpout {
//public class FileReaderSpout implements IRichSpout {

    private SpoutOutputCollector collector;
    private Scanner scanner;
    private boolean completed;
    private TopologyContext context;
    private int spout_idx;
    private int spout_id;
    private Map config;
    private int noOfFailedWords;
    private int noOfAckedWords;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("data", new Fields("streamId", 
"timestamp", "value"));


    }

    @Override
    public void open(Map config, TopologyContext topologyContext, 
SpoutOutputCollector spoutOutputCollector) {
        this.context = topologyContext;
        this.spout_idx = context.getThisTaskIndex();
        this.spout_id = context.getThisTaskId();
        this.collector = spoutOutputCollector;
        this.config = config;
        this.completed = false;
        this.noOfFailedWords = 0;
        this.noOfAckedWords = 0;
        HashFunctionsGen.generateHashFunctionsForUBW(); //HazelCast 
HashFunction Generation
        HashFunctionsGen.size();
        System.err.println("Generating hashFunctions");

        try {
            this.scanner = new Scanner(new 
File(config.get(file_to_read()).toString()));
            System.err.println("Scanner Reading File: " + 
config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void nextTuple() {

        if (!completed) {
            if (scanner.hasNextLine()) {
                String[] temp = scanner.nextLine().split(",");
//            System.err.println("============== " + temp[0] + " + " + temp[2] 
+ " + " + temp[3]); //0-id,2-timestamp,3-value
                collector.emit("data", new Values(temp[0], temp[2], temp[3]), 
temp[0]); //emmit the correct data to next bolt without guarantee delivery
//                Utils.sleep(1);
            } else {
                System.err.println("End of File Closing Reader");
                scanner.close();
                completed = true;
            }
        }

    }

    private String file_to_read() {
//        this.spout_id = context.getThisTaskId();
        if (Consts.NO_OF_SPOUTS > 1) {
            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
            return "data" + file_no;
        } else {
            return "data";
        }
    }

    @Override
    public void ack(Object msgId) {
        super.ack(msgId);
        noOfAckedWords++;
//        System.out.println("OK tuple acked from bolt: " + msgId + " no of 
acked word " + noOfAckedWords);
//        System.out.println("no of acked tuples: "+noOfAckedWords);
    }

    @Override
    public void fail(Object msgId) {
        super.fail(msgId);
        noOfFailedWords++;
        System.err.println("ERROR: " + context.getThisComponentId() + " " + 
msgId + " no of words failed " + noOfFailedWords);

    }

}

package tuc.LSH.storm.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import tuc.LSH.conf.HazelCastConfig;
import tuc.LSH.core.timeseries.SlidingWindow;
import tuc.LSH.core.timeseries.UniversalBasicWindow;

import java.util.Map;
import java.util.Queue;

/**
 * Created by mixtou on 17/5/15.
 */
public class LSHBolt extends BaseRichBolt {
    private int task_id;
    private OutputCollector collector;
    private UniversalBasicWindow universalBasicWindow;
    private SlidingWindow slidingWindow;

    private String streamId;
    private String time;
    private Float value;

    @Override
    public void prepare(Map conf, TopologyContext topologyContext, 
OutputCollector outputCollector) {
        this.task_id = topologyContext.getThisTaskIndex();
        this.collector = outputCollector;
        streamId = null;
        time = null;
        value = 0f;
        this.universalBasicWindow = new UniversalBasicWindow();
        this.slidingWindow = new SlidingWindow();
        System.err.println("New Bolt with id: " + task_id);

    }

    @Override
    public void execute(Tuple tuple) {

        if (tuple.getSourceStreamId().equals("sync")) {
//            System.out.println("Bolt task id " + task_id + " received from " 
+ tuple.getSourceComponent() + " message " + tuple.getString(0));
//            System.out.println("Normalizing: Basic Window of Bolt " + 
task_id);
            universalBasicWindow.normalize(); //fill the rest of the streams 
with last received value to make them same size
            slidingWindow.addBasicWindow(universalBasicWindow);
//            universalBasicWindow.print();
//            System.err.println("==Bolt: "+task_id);
//            System.out.println("adding basic window to sliding window");
//            System.err.println("Size: "+slidingWindow.getwindow().size());
//            slidingWindow.print();

//            System.out.println("Resetting Window Bolt "+task_id);
            universalBasicWindow.reset();
//            Utils.sleep(1);
//            collector.ack(tuple);

        }else if (tuple.getSourceStreamId().equals("data")) {

            streamId = tuple.getStringByField("streamId");
            time = tuple.getStringByField("timestamp");
            value = Float.parseFloat(tuple.getStringByField("value"));
//            System.out.println("From Bolt: "+task_id);
            universalBasicWindow.pushStream(streamId, value);

            if (universalBasicWindow.isFull()) { //check if any stream of the 
window is full

//                System.out.println("Univ. Basic Window of bolt " + task_id + 
" is Filled Up");

                collector.emit("bwFilled", new Values(task_id));
//                Utils.sleep(1);

            }
        }
        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));

    }
}

//        System.err.println("SourceComponent: "+tuple.getSourceComponent());
//        System.err.println("SourceStreamId: "+tuple.getSourceStreamId());
//        System.err.println("Source Task: "+tuple.getSourceTask());
//        System.err.println("SourceGlobalStreamId: 
"+tuple.getSourceGlobalStreamid());
//        System.err.println("MessageId: "+tuple.getMessageId());



package tuc.LSH.core.timeseries;

import clojure.lang.Cons;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import tuc.LSH.conf.Consts;
import tuc.LSH.conf.HazelCastConfig;
import tuc.LSH.core.hashfunctions.HashFunctionsGen;

import java.util.*;

/**
 * Created by mixtou on 19/5/15.
 */
public class UniversalBasicWindow {

    private HashMap<String, StreamBasicWindow> allStreamsSignatures;
    private IQueue<boolean[]> hashFunctions;
    private LinkedList<boolean[]> localHashFunction;

    public  UniversalBasicWindow() {
//        HashFunctionsGen.generateHashFunctionsForUBW(); //generate new 
hashFunctions for new UniversalBasicWindow
        HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new 
HazelCastConfig().getConfig());
        hashFunctions = hz.getQueue("hashFunctions");
        localHashFunction = new LinkedList<>();
        localHashFunction.addAll(hashFunctions);
        this.allStreamsSignatures = new HashMap<>();

        System.out.println("New Universal Basic Window");
        System.out.println("size: "+hashFunctions.size());
    }

    public void reset() {
        localHashFunction.clear();
        hashFunctions.clear();
        clearValues();
    }

    public void clearValues() {

        for (Map.Entry<String, StreamBasicWindow> entry : 
allStreamsSignatures.entrySet()) {
            entry.getValue().clear();
//            System.err.println("na sou p: " +entry.getKey());
        }

    }

    public void pushStream(String streamId, float value) {

        StreamBasicWindow streamBasicWindow = 
allStreamsSignatures.get(streamId);

        if (streamBasicWindow == null) {
//            System.out.println("New Stream: " + streamId);
            streamBasicWindow = new StreamBasicWindow(streamId, 
localHashFunction);
            allStreamsSignatures.put(streamId, streamBasicWindow);
        }

        streamBasicWindow.pushValue(value);

//        System.out.println("Updated Basic Window of Stream: " + streamId+" at 
position "+streamBasicWindow.getCurrentPosition());

    }

    public HashMap<String, StreamBasicWindow> getAllStreamsSignatures() {
        return allStreamsSignatures;
    }


    public boolean isFull() {

        boolean reply = false;

        for (Map.Entry<String, StreamBasicWindow> entry : 
allStreamsSignatures.entrySet()) {
            if (entry.getValue().getCurrentPosition() == 
Consts.BASIC_WINDOW_SIZE) {
//                System.out.println("Bolt id "+task_id+" StreamId: " + 
entry.getKey() + " size " + entry.getValue().getSize() + " curr position: " + 
entry.getValue().getCurrentPosition());
                reply = true;
                break; //even if one full stream is found exit loop and return 
true;
            }
        }
        return reply;

    }

    public void print() {

        Iterator<Map.Entry<String, StreamBasicWindow>> iterator = 
allStreamsSignatures.entrySet().iterator();
//        for(Map.Entry<String, StreamBasicWindow> entry : 
allStreamsSignatures.entrySet()){
        System.out.println("====================================");
        while (iterator.hasNext()) {
//            System.out.println("Stream Id: " + iterator.next().getKey() + " 
size: " + iterator.next().getValue().getCurrentPosition());
            System.err.println("SIGNATURES: 
"+iterator.next().getValue().getStreamSignatures());
            System.err.println("SUMS: 
"+iterator.next().getValue().getStreamSums());
        }
        System.out.println("====================================");
    }

    public void printLocalFunctions() {
        int temp = 0;
        for (boolean[] entry : localHashFunction) {
            temp++;
            for (int i = 0; i < entry.length; i++) {
                System.out.println("Window Local Functions: " + temp + " entry 
" + i + " value: " + entry[i]);
            }
        }
    }

    public void normalize() {

//        System.out.println("Normalizing");

        Iterator<Map.Entry<String, StreamBasicWindow>> iterator = 
allStreamsSignatures.entrySet().iterator();

        while (iterator.hasNext()) {
            Map.Entry<String, StreamBasicWindow> entry = iterator.next();
            if (entry.getValue().getCurrentPosition() < 
Consts.BASIC_WINDOW_SIZE) {
                for (int i = entry.getValue().getCurrentPosition(); i < 
Consts.BASIC_WINDOW_SIZE; i++) {
                    
entry.getValue().pushValue(entry.getValue().getLastReceivedValue());
                }
            }
        }
    }


}

package tuc.LSH.core.hashfunctions;

import backtype.storm.utils.Utils;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import tuc.LSH.conf.Consts;
import tuc.LSH.conf.HazelCastConfig;

import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Lock;

/**
 * Created by mixtou on 14/5/15.
 */
public class HashFunctionsGen {

    static IQueue<boolean[]> hashFunctions;
    static Random rnd;

    public static boolean cosineFamilyValue() {
        return rnd.nextBoolean();
    }

    public static void generateHashFunctionsForUBW() {
        HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new 
HazelCastConfig().getConfig());
        hashFunctions = hz.getQueue("hashFunctions");
        rnd = new Random();

        //do nothing if the required nu of hashfunctions has been generated
        if (!(hashFunctions.size() > 0)) {
//            System.out.println("Already generated functions DO NOTHING");

//        System.out.println("Generating New Hash Functions");
            Lock lock = hz.getLock("hashFunctions");
            lock.lock();
            try {

                for (int i = 0; i < Consts.NO_OF_HASH_FUNCTIONS; i++) {
                    boolean[] temp = new boolean[Consts.BASIC_WINDOW_SIZE];
                    for (int j = 0; j < Consts.BASIC_WINDOW_SIZE; j++) {
                        temp[j] = HashFunctionsGen.cosineFamilyValue();
                    }
                    hashFunctions.add(temp);
                }
            } finally {
                lock.unlock();
            }
        }

    }

    public static void size() {
        System.out.println("Size of queue: " + hashFunctions.size());
    }

    public static void print() {
        HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new 
HazelCastConfig().getConfig());
        Queue<boolean[]> functions = hz.getQueue("hashFunctions");

        int temp = 0;
        for (boolean[] entry : functions) {
            temp++;
            for (int i = 0; i < entry.length; i++) {
                System.err.println("Hazelcast Function " + temp + " entry " + i 
+ " value: " + entry[i]);
            }
        }
    }

    public static void clear() {
        HazelcastInstance hz = Hazelcast.getOrCreateHazelcastInstance(new 
HazelCastConfig().getConfig());
        hashFunctions = hz.getQueue("hashFunctions");
        Lock lock = hz.getLock("hashFunctions");
        lock.lock();
        try {
            hashFunctions.clear();
//                hashFunctions = hz.getQueue("hashFunctions");
        } finally {
            lock.unlock();
        }


    }


}

Reply via email to