What is the best spout implementation for reading input data from file? I have 
implemented a spout for reading input data from file using a scanner which 
seems to perform better than buffered file reader. 
However i still loose some values, not many this time about 1%, but the problem 
is that after a few minutes of run i get java out of memory exception and i 
believe it has to do with values buffering.
My spout implementation is:

package tuc.LSH.storm.spouts;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;

import javax.rmi.CORBA.Util;
import java.io.*;
import java.util.Map;
import java.util.Scanner;

/**
 * Created by mixtou on 15/5/15.
 */
public class FileReaderSpout extends BaseRichSpout {
//public class FileReaderSpout implements IRichSpout {

    private SpoutOutputCollector collector;
    private Scanner scanner;
    private boolean completed;
    private TopologyContext context;
    private int spout_idx;
    private int spout_id;
    private Map config;
    private int noOfFailedWords;
    private int noOfAckedWords;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("data", new Fields("streamId", 
"timestamp", "value"));


    }

    @Override
    public void open(Map config, TopologyContext topologyContext, 
SpoutOutputCollector spoutOutputCollector) {
        this.context = topologyContext;
        this.spout_idx = context.getThisTaskIndex();
        this.spout_id = context.getThisTaskId();
        this.collector = spoutOutputCollector;
        this.config = config;
        this.completed = false;
        this.noOfFailedWords = 0;
        this.noOfAckedWords = 0;

        try {
            this.scanner = new Scanner(new 
File(config.get(file_to_read()).toString()));
            System.err.println("Scanner Reading File: " + 
config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void nextTuple() {

        if(!completed) {
            if (scanner.hasNextLine()) {
                String[] temp = scanner.nextLine().split(",");
//            System.err.println("============== " + temp[0] + " + " + temp[2] 
+ " + " + temp[3]); //0-id,2-timestamp,3-value
                collector.emit("data", new Values(temp[0], temp[2], temp[3]), 
temp[0]); //emmit the correct data to next bolt without guarantee delivery
                Utils.sleep(1);
            } else {
                System.err.println("End of File Closing Reader");
                scanner.close();
                completed = true;
            }
        }

    }

    private String file_to_read() {
//        this.spout_id = context.getThisTaskId();
        if (Consts.NO_OF_SPOUTS > 1) {
            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
            return "data" + file_no;
        } else {
            return "data";
        }
    }

    @Override
    public void ack(Object msgId) {
        super.ack(msgId);
        noOfAckedWords++;
//        System.out.println("OK tuple acked from bolt: " + msgId + " no of 
acked word " + noOfAckedWords);
    }

    @Override
    public void fail(Object msgId) {
        super.fail(msgId);
        noOfFailedWords++;
        System.err.println("ERROR: " + context.getThisComponentId() + " " + 
msgId + " no of words failed " + noOfFailedWords);

    }

}

Reply via email to