I am trying to read some data from text file and process them. I am currently 
using scanner. In the beginning everything works fine for the first 10000 
values and then it looks like no other input lines are sent to the bolt that 
implements the algorithm. Finally after a few minutes of run i get 
java.lang.OutOfMemoryError: unable to create new native thread

The file reader spout implementation is:

package tuc.LSH.storm.spouts;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;

import javax.rmi.CORBA.Util;
import java.io.*;
import java.util.Map;
import java.util.Scanner;

/**
 * Created by mixtou on 15/5/15.
 */
public class FileReaderSpout extends BaseRichSpout {
//public class FileReaderSpout implements IRichSpout {

    private SpoutOutputCollector collector;
    private Scanner scanner;
    private boolean completed;
    private TopologyContext context;
    private int spout_idx;
    private int spout_id;
    private Map config;
    private int noOfFailedWords;
    private int noOfAckedWords;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("data", new Fields("streamId", 
"timestamp", "value"));


    }

    @Override
    public void open(Map config, TopologyContext topologyContext, 
SpoutOutputCollector spoutOutputCollector) {
        this.context = topologyContext;
        this.spout_idx = context.getThisTaskIndex();
        this.spout_id = context.getThisTaskId();
        this.collector = spoutOutputCollector;
        this.config = config;
        this.completed = false;
        this.noOfFailedWords = 0;
        this.noOfAckedWords = 0;

        try {
            this.scanner = new Scanner(new 
File(config.get(file_to_read()).toString()));
            System.err.println("Scanner Reading File: " + 
config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void nextTuple() {

        if(!completed) {
            if (scanner.hasNextLine()) {
                String[] temp = scanner.nextLine().split(",");
//            System.err.println("============== " + temp[0] + " + " + temp[2] 
+ " + " + temp[3]); //0-id,2-timestamp,3-value
                collector.emit("data", new Values(temp[0], temp[2], temp[3]), 
temp[0]); //emmit the correct data to next bolt without guarantee delivery
                Utils.sleep(1);
            } else {
                System.err.println("End of File Closing Reader");
                scanner.close();
                completed = true;
            }
        }

    }

    private String file_to_read() {
//        this.spout_id = context.getThisTaskId();
        if (Consts.NO_OF_SPOUTS > 1) {
            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
            return "data" + file_no;
        } else {
            return "data";
        }
    }

    @Override
    public void ack(Object msgId) {
        super.ack(msgId);
        noOfAckedWords++;
//        System.out.println("OK tuple acked from bolt: " + msgId + " no of 
acked word " + noOfAckedWords);
        System.out.println("no of acked tuples: "+noOfAckedWords);
    }

    @Override
    public void fail(Object msgId) {
        super.fail(msgId);
        noOfFailedWords++;
        System.err.println("ERROR: " + context.getThisComponentId() + " " + 
msgId + " no of words failed " + noOfFailedWords);

    }

}
And the bolt that gets the tuples from file reader spout and process them is:

package tuc.LSH.storm.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import sun.jvm.hotspot.runtime.*;
import tuc.LSH.core.hashfunctions.HashFunctionsGen;
import tuc.LSH.core.timeseries.UniversalBasicWindow;


import java.lang.Thread;
import java.util.*;

/**
 * Created by mixtou on 17/5/15.
 */
public class LSHBolt extends BaseRichBolt {
    private int task_id;
    private OutputCollector collector;
    private UniversalBasicWindow universalBasicWindow;

    private String streamId;
    private String time;
    private Float value;

    @Override
    public void prepare(Map conf, TopologyContext topologyContext, 
OutputCollector outputCollector) {
        this.task_id = topologyContext.getThisTaskIndex();
        this.collector = outputCollector;
        this.universalBasicWindow = new UniversalBasicWindow();
        streamId = null;
        time = null;
        value = 0f;
        System.err.println("New Bolt with id: " + task_id);

    }

    @Override
    public void execute(Tuple tuple) {

        if (tuple.getSourceStreamId().equals("sync")) {
            System.out.println("Bolt task id " + task_id + " received from " + 
tuple.getSourceComponent() + " message " + tuple.getString(0));
            System.out.println("Normalizing: Basic Window of Bolt " + task_id);
            universalBasicWindow.normalize(); //fill the rest of the streams 
with last received value to make them same size
            universalBasicWindow = null;
            universalBasicWindow = new UniversalBasicWindow();
//            Utils.sleep(1);
        }

        if (tuple.getSourceStreamId().equals("data")) {

            streamId = tuple.getStringByField("streamId");
            time = tuple.getStringByField("timestamp");
            value = Float.parseFloat(tuple.getStringByField("value"));

            universalBasicWindow.pushStream(streamId, value);

            if (universalBasicWindow.isFull(task_id)) { //check if any stream 
of the window is full

//                System.out.println("Univ. Basic Window of bolt " + task_id + 
" is Filled Up");

                collector.emit("bwFilled", new Values(task_id));
                Utils.sleep(1);

//                universalBasicWindow.normalize();
//                universalBasicWindow = new UniversalBasicWindow();

//                TODO:: Add basic window to sliding window and clear

            }

        }


//        System.err.println("SourceComponent: "+tuple.getSourceComponent());
//        System.err.println("SourceStreamId: "+tuple.getSourceStreamId());
//        System.err.println("Source Task: "+tuple.getSourceTask());
//        System.err.println("SourceGlobalStreamId: 
"+tuple.getSourceGlobalStreamid());
//        System.err.println("MessageId: "+tuple.getMessageId());

        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));

    }
}
File reader spout stops acking values by the time that universalBasicWindow is 
full

The Bolt that gets the stream “bwfilled” is just a bolt that sends reset back 
to LSHBolt so that all instances will reset basicWindow.

ResetBolt implementation is:
package tuc.LSH.storm.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;
import tuc.LSH.core.hashfunctions.HashFunctionsGen;

import javax.rmi.CORBA.Util;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * Created by mixtou on 2/6/15.
 */
public class ResetBolt extends BaseRichBolt {

    private int task_id;
    private OutputCollector collector;
    //    private int noOfFilledBoltsInstaces;
    private Set<Integer> filledBoltInstances;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {

        this.task_id = topologyContext.getThisTaskIndex();
        this.collector = outputCollector;
        this.filledBoltInstances = new HashSet<>();
        HashFunctionsGen.generateHashFunctionsForUBW();

    }

    @Override
    public void execute(Tuple tuple) {

        if (tuple.getSourceStreamId().equals("bwFilled")) {
//            System.err.println("Reset Bolt Received window filled from bolt 
with task id " + tuple.getInteger(0));
//            filledBoltInstances.add(tuple.getInteger(0));

            //GENERATE NEW HASH FUNCTIONS WHEN ALL BASIC WINDOW INSTANCES ARE 
FILLED??????
//            if (filledBoltInstances.size() == Consts.NO_OF_LSH_BOLTS) {
//                filledBoltInstances.clear();
////                System.out.println("Updating Basic Window Hash Functions");
////                HashFunctionsGen.clear();
////                HashFunctionsGen.generateHashFunctionsForUBW();
//
//            }
            collector.emit("sync", new Values("Reset"));
//            Utils.sleep(1);
//            System.err.println("Emitted ResetBW to all LSH Bolt Instances");
            collector.ack(tuple);
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("sync", new Fields("message"));
    }
}
Finally the topology implementation is:

package tuc.LSH.storm.topologies;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;
import tuc.LSH.core.timeseries.UniversalBasicWindow;
import tuc.LSH.storm.bolts.LSHBolt;
import tuc.LSH.storm.bolts.ResetBolt;
import tuc.LSH.storm.spouts.FileReaderSpout;

import java.util.Map;

/**
 * Created by mixtou on 13/5/15.
 */
public class LSHTopology {

    public static void main(String[] args) throws Exception{

        TopologyBuilder builder = new TopologyBuilder();

//        builder.setSpout("RandomStreamSpout", new RandomStreamSpout(), 
Consts.NO_OF_SPOUTS);
        builder.setSpout("FileReaderSpout", new FileReaderSpout(), 
Consts.NO_OF_SPOUTS);
        BoltDeclarer lshBolt = builder.setBolt("LSH", new LSHBolt(), 
Consts.NO_OF_LSH_BOLTS)
                .fieldsGrouping("FileReaderSpout", "data", new 
Fields("streamId"));
        builder.setBolt("ResetBolt", new ResetBolt(), 1).shuffleGrouping("LSH", 
"bwFilled");

        lshBolt.allGrouping("ResetBolt", "sync");

        Config config = new Config();

        for(Map.Entry<String, String> entry : Consts.data_files.entrySet()){
            config.put(entry.getKey(), entry.getValue());
        }

        config.setDebug(false);
        config.setFallBackOnJavaSerialization(false);
//        config.registerSerialization(UniversalBasicWindow.class);



        if (args != null && args.length > 0) {
            try {
                StormSubmitter.submitTopology(args[0], config, 
builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LSHTopology", config, 
builder.createTopology());

            Thread.sleep(500000);
            Utils.sleep(500000);
            cluster.killTopology("LSHTopology");
            cluster.shutdown();

        }

    }

}

Reply via email to