Hi Julien, I'm using it to have a metric of how many tuples are beeing processed.
The problem isn't the value setted to high, the problem is that the nextTuple are not called. If you look a this screen : - Green color : Queue size (Another thread in Spout, populating tuple from ES) - Blue color : Beeing processed queue As you can seed, in the begining, nextTuple is called and beeing processed is up until reaching a max value, then decreseasing because the nextTuple isn't called anymore by the topology.... [image: Images intégrées 1] I don't understand why nextTuple isn't called anymore.... 2016-05-10 11:01 GMT+02:00 Julien Nioche <[email protected]>: > > I haven't set topology.max.spout.pending. Using the default value, but >> I'm watching a internal Set to check the size of beeing processed tuple. >> > > Found it in the snippet you posted earlier. This duplicates what > topology.max.spout.pending does so unless you have another use for that > internal set, it would be a good idea to rely on the default mechanism > instead. > > 200000 is a very large value. Do you have any idea of how diverse your URLs > are in terms of hostname / domain / IP? > > Bear in mind that the FetcherBolts (both implementations) are polite and > will block URLs until the minimum configured amount of time has elapsed > since the previous call to the same server has completed. also if you have > more URLs in flight than fetching threads then they will sit and wait in > the queues. Either way, these might trigger a timeout after a while (30 > secs by default => > https://github.com/apache/storm/blob/master/conf/defaults.yaml#L228) > which could explain what you are experiencing. > > >> I've logged : nextTuple, fail and ack method. And on the log, sometimes, >> about thirty seconds none of these methods are called... seems that the >> Thread is busy doing other things >> > > See above. > > >> >> Yeah I'm using your SDK, very great btw. I've just change the outlinks >> indexer to store data on redis :-) >> > > Do you mean a StatusUpdaterBolt > <https://github.com/DigitalPebble/storm-crawler/blob/master/core/src/main/java/com/digitalpebble/storm/crawler/persistence/AbstractStatusUpdaterBolt.java> > ? > > What does the rest of your topology look like? > > This thread is quite specific to StormCrawler and might not be of interest > for other Storm users, feel free to continue to [ > http://groups.google.com/group/digitalpebble] or use the tag stormcrawler > <http://stackoverflow.com/questions/tagged/stormcrawler> on StackOverflow. > > HTH > > Julien > > >> >> 2016-05-09 22:27 GMT+02:00 Julien Nioche <[email protected]>: >> >>> Hi Adrien >>> >>> Did you set a value to max spout pending? Could it be that you have >>> reached the max number of tuples in process? Do you see acks or fails >>> happen during that period? >>> >>> Great to hear that you are using StormCrawler BTW >>> >>> Julien >>> >>> >>> On 9 May 2016 at 20:48, Adrien Carreira <[email protected]> wrote: >>> >>>> I think the problem is when My topology is working the thread calling >>>> nextTuple seems to be busy... Why the method isn't called ? >>>> >>>> Someone can guid me to some documentation or the code calling nextTuple >>>> just to understand what is blocking.... >>>> >>>> Thank you guys >>>> >>>> 2016-05-09 9:57 GMT+02:00 Adrien Carreira <[email protected]>: >>>> >>>>> Hi there, >>>>> >>>>> I'm using Storm to build a web-crawler, using Storm Crawler SDK. >>>>> >>>>> I'm also using Redis to store new links discovered. >>>>> >>>>> I've a Spout to consume those url. After many debug , I've built the >>>>> Spout like this : >>>>> >>>>> public class OutlinkSpoutRedis extends BaseRichSpout { >>>>> >>>>> private static final Logger LOG = LoggerFactory >>>>> .getLogger(OutlinkSpoutRedis.class); >>>>> private LinkedBlockingQueue<Values> queue = new >>>>> LinkedBlockingQueue<>(); >>>>> private LinkedBlockingQueue<String> ackQueue = new >>>>> LinkedBlockingQueue<>(); >>>>> private LinkedBlockingQueue<String> failQueue = new >>>>> LinkedBlockingQueue<>(); >>>>> >>>>> >>>>> @Override >>>>> public void nextTuple() { >>>>> LOG.info(">>> Calling nextTuple"); >>>>> >>>>> if (beingProcessed.size() >= 200000) { >>>>> LOG.info("Too much beeing processed"); >>>>> Utils.sleep(50); >>>>> return; >>>>> } >>>>> >>>>> LOG.info("Pooling from queue"); >>>>> Values ret = queue.poll(); >>>>> >>>>> if (ret == null) { >>>>> LOG.info("Pooling from queue = null"); >>>>> Utils.sleep(50); >>>>> return; >>>>> } >>>>> >>>>> LOG.info("Emitting one url"); >>>>> >>>>> String url = ret.get(0).toString(); >>>>> beingProcessed.put(url, ""); >>>>> >>>>> this._collector.emit(ret, url); >>>>> } >>>>> >>>>> @Override >>>>> public void ack(Object msgId) { >>>>> LOG.info("Acking"); >>>>> this.beingProcessed.remove(msgId); >>>>> this.ackQueue.offer((String) msgId); >>>>> } >>>>> >>>>> @Override >>>>> public void fail(Object msgId) { >>>>> LOG.error("Fail tuple {}", msgId); >>>>> this.beingProcessed.remove(msgId); >>>>> this.failQueue.offer((String) msgId); >>>>> } >>>>> >>>>> private class ProducerThread extends Thread { >>>>> @Override >>>>> public void run() { >>>>> while (activated) { >>>>> try { >>>>> if (this.queue.size() <= 1000) { >>>>> this.populateQueue(); >>>>> } >>>>> >>>>> Utils.sleep(100); >>>>> } catch (Exception e) { >>>>> LOG.error("Error reading queues from redis", e); >>>>> } >>>>> } >>>>> } >>>>> >>>>> private void populateQueue() { >>>>> // Calling Redis to populate Queue >>>>> queue.offer(new Values(url, metadata)); >>>>> } >>>>> } >>>>> >>>>> private abstract class AckFailThread extends Thread { >>>>> @Override >>>>> public void run() { >>>>> while (activated) { >>>>> String message = queue.poll(1, TimeUnit.SECONDS); >>>>> >>>>> if (message != null) { >>>>> this.handleMessage(message); >>>>> } >>>>> } >>>>> } >>>>> >>>>> protected abstract void handleMessage(String message); >>>>> }} >>>>> >>>>> >>>>> I've remove unnecessary code. >>>>> To understand : nextTuple is polling from a queue (populated on >>>>> another thread) and ack,fail are emitting to a queue, consumed in two >>>>> another thread. So, those three methods are not blocking. >>>>> >>>>> My problem is on running state, my spout is not called sometimes about >>>>> thirty seconds, but there still message on nextTuple queue to be consumed. >>>>> The spout is not acking or failling, So why the Spout thread is >>>>> blocked ? >>>>> >>>>> Thank >>>>> >>>>> >>>>> >>>>> >>>> >>> >>> >>> -- >>> >>> *Open Source Solutions for Text Engineering* >>> >>> http://www.digitalpebble.com >>> http://digitalpebble.blogspot.com/ >>> #digitalpebble <http://twitter.com/digitalpebble> >>> >> >> > > > -- > > *Open Source Solutions for Text Engineering* > > http://www.digitalpebble.com > http://digitalpebble.blogspot.com/ > #digitalpebble <http://twitter.com/digitalpebble> >
