I haven't set topology.max.spout.pending. Using the default value, but I'm watching a internal Set to check the size of beeing processed tuple.
I've logged : nextTuple, fail and ack method. And on the log, sometimes, about thirty seconds none of these methods are called... seems that the Thread is busy doing other things Yeah I'm using your SDK, very great btw. I've just change the outlinks indexer to store data on redis :-) 2016-05-09 22:27 GMT+02:00 Julien Nioche <[email protected]>: > Hi Adrien > > Did you set a value to max spout pending? Could it be that you have > reached the max number of tuples in process? Do you see acks or fails > happen during that period? > > Great to hear that you are using StormCrawler BTW > > Julien > > > On 9 May 2016 at 20:48, Adrien Carreira <[email protected]> wrote: > >> I think the problem is when My topology is working the thread calling >> nextTuple seems to be busy... Why the method isn't called ? >> >> Someone can guid me to some documentation or the code calling nextTuple >> just to understand what is blocking.... >> >> Thank you guys >> >> 2016-05-09 9:57 GMT+02:00 Adrien Carreira <[email protected]>: >> >>> Hi there, >>> >>> I'm using Storm to build a web-crawler, using Storm Crawler SDK. >>> >>> I'm also using Redis to store new links discovered. >>> >>> I've a Spout to consume those url. After many debug , I've built the >>> Spout like this : >>> >>> public class OutlinkSpoutRedis extends BaseRichSpout { >>> >>> private static final Logger LOG = LoggerFactory >>> .getLogger(OutlinkSpoutRedis.class); >>> private LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<>(); >>> private LinkedBlockingQueue<String> ackQueue = new >>> LinkedBlockingQueue<>(); >>> private LinkedBlockingQueue<String> failQueue = new >>> LinkedBlockingQueue<>(); >>> >>> >>> @Override >>> public void nextTuple() { >>> LOG.info(">>> Calling nextTuple"); >>> >>> if (beingProcessed.size() >= 200000) { >>> LOG.info("Too much beeing processed"); >>> Utils.sleep(50); >>> return; >>> } >>> >>> LOG.info("Pooling from queue"); >>> Values ret = queue.poll(); >>> >>> if (ret == null) { >>> LOG.info("Pooling from queue = null"); >>> Utils.sleep(50); >>> return; >>> } >>> >>> LOG.info("Emitting one url"); >>> >>> String url = ret.get(0).toString(); >>> beingProcessed.put(url, ""); >>> >>> this._collector.emit(ret, url); >>> } >>> >>> @Override >>> public void ack(Object msgId) { >>> LOG.info("Acking"); >>> this.beingProcessed.remove(msgId); >>> this.ackQueue.offer((String) msgId); >>> } >>> >>> @Override >>> public void fail(Object msgId) { >>> LOG.error("Fail tuple {}", msgId); >>> this.beingProcessed.remove(msgId); >>> this.failQueue.offer((String) msgId); >>> } >>> >>> private class ProducerThread extends Thread { >>> @Override >>> public void run() { >>> while (activated) { >>> try { >>> if (this.queue.size() <= 1000) { >>> this.populateQueue(); >>> } >>> >>> Utils.sleep(100); >>> } catch (Exception e) { >>> LOG.error("Error reading queues from redis", e); >>> } >>> } >>> } >>> >>> private void populateQueue() { >>> // Calling Redis to populate Queue >>> queue.offer(new Values(url, metadata)); >>> } >>> } >>> >>> private abstract class AckFailThread extends Thread { >>> @Override >>> public void run() { >>> while (activated) { >>> String message = queue.poll(1, TimeUnit.SECONDS); >>> >>> if (message != null) { >>> this.handleMessage(message); >>> } >>> } >>> } >>> >>> protected abstract void handleMessage(String message); >>> }} >>> >>> >>> I've remove unnecessary code. >>> To understand : nextTuple is polling from a queue (populated on another >>> thread) and ack,fail are emitting to a queue, consumed in two another >>> thread. So, those three methods are not blocking. >>> >>> My problem is on running state, my spout is not called sometimes about >>> thirty seconds, but there still message on nextTuple queue to be consumed. >>> The spout is not acking or failling, So why the Spout thread is blocked >>> ? >>> >>> Thank >>> >>> >>> >>> >> > > > -- > > *Open Source Solutions for Text Engineering* > > http://www.digitalpebble.com > http://digitalpebble.blogspot.com/ > #digitalpebble <http://twitter.com/digitalpebble> >
