Hi Julien,

I'm using it to have a metric of how many tuples are beeing processed.

The problem isn't the value setted to high, the problem is that the
nextTuple are not called.

If you look a this screen :

- Green color : Queue size (Another thread in Spout, populating tuple from
ES)
- Blue color : Beeing processed queue

As you can seed, in the begining, nextTuple is called and beeing processed
is up until reaching a max value, then decreseasing because the nextTuple
isn't called anymore by the topology....

[image: Images intégrées 1]

I don't understand why nextTuple isn't called anymore....











2016-05-10 11:01 GMT+02:00 Julien Nioche <[email protected]>:

>
> I haven't set topology.max.spout.pending. Using the default value, but
>> I'm watching a internal Set to check the size of beeing processed tuple.
>>
>
> Found it in the snippet you posted earlier. This duplicates what
> topology.max.spout.pending does so unless you have another use for that
> internal set, it would be a good idea to rely on the default mechanism
> instead.
>
> 200000 is a very large value. Do you have any idea of how diverse your URLs 
> are in terms of hostname / domain / IP?
>
> Bear in mind that the FetcherBolts (both implementations) are polite and
> will block URLs until the minimum configured amount of time has elapsed
> since the previous call to the same server has completed. also if you have
> more URLs in flight than fetching threads then they will sit and wait in
> the queues. Either way, these might trigger a timeout after a while  (30
> secs by default =>
> https://github.com/apache/storm/blob/master/conf/defaults.yaml#L228)
> which could explain what you are experiencing.
>
>
>> I've logged : nextTuple, fail and ack method. And on the log, sometimes,
>> about thirty seconds none of these methods are called... seems that the
>> Thread is busy doing other things
>>
>
> See above.
>
>
>>
>> Yeah I'm using your SDK, very great btw. I've just change the outlinks
>> indexer to store data on redis :-)
>>
>
> Do you mean a StatusUpdaterBolt
> <https://github.com/DigitalPebble/storm-crawler/blob/master/core/src/main/java/com/digitalpebble/storm/crawler/persistence/AbstractStatusUpdaterBolt.java>
> ?
>
> What does the rest of your topology look like?
>
> This thread is quite specific to StormCrawler and might not be of interest
> for other Storm users, feel free to continue to [
> http://groups.google.com/group/digitalpebble] or use the tag stormcrawler
> <http://stackoverflow.com/questions/tagged/stormcrawler> on StackOverflow.
>
> HTH
>
> Julien
>
>
>>
>> 2016-05-09 22:27 GMT+02:00 Julien Nioche <[email protected]>:
>>
>>> Hi Adrien
>>>
>>> Did you set a value to max spout pending? Could it be that you have
>>> reached the max number of tuples in process? Do you see acks or fails
>>> happen during that period?
>>>
>>> Great to hear that you are using StormCrawler BTW
>>>
>>> Julien
>>>
>>>
>>> On 9 May 2016 at 20:48, Adrien Carreira <[email protected]> wrote:
>>>
>>>> I think the problem is when My topology is working the thread calling
>>>> nextTuple seems to be busy... Why the method isn't called ?
>>>>
>>>> Someone can guid me to some documentation or the code calling nextTuple
>>>> just to understand what is blocking....
>>>>
>>>> Thank you guys
>>>>
>>>> 2016-05-09 9:57 GMT+02:00 Adrien Carreira <[email protected]>:
>>>>
>>>>> Hi there,
>>>>>
>>>>> I'm using Storm to build a web-crawler, using Storm Crawler SDK.
>>>>>
>>>>> I'm also using Redis to store new links discovered.
>>>>>
>>>>> I've a Spout to consume those url. After many debug , I've built the
>>>>> Spout like this :
>>>>>
>>>>> public class OutlinkSpoutRedis extends BaseRichSpout {
>>>>>
>>>>>     private static final Logger LOG = LoggerFactory
>>>>>             .getLogger(OutlinkSpoutRedis.class);
>>>>>     private LinkedBlockingQueue<Values> queue = new 
>>>>> LinkedBlockingQueue<>();
>>>>>     private LinkedBlockingQueue<String> ackQueue = new 
>>>>> LinkedBlockingQueue<>();
>>>>>     private LinkedBlockingQueue<String> failQueue = new 
>>>>> LinkedBlockingQueue<>();
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public void nextTuple() {
>>>>>         LOG.info(">>> Calling nextTuple");
>>>>>
>>>>>         if (beingProcessed.size() >= 200000) {
>>>>>             LOG.info("Too much beeing processed");
>>>>>             Utils.sleep(50);
>>>>>             return;
>>>>>         }
>>>>>
>>>>>         LOG.info("Pooling from queue");
>>>>>         Values ret = queue.poll();
>>>>>
>>>>>         if (ret == null) {
>>>>>             LOG.info("Pooling from queue = null");
>>>>>             Utils.sleep(50);
>>>>>             return;
>>>>>         }
>>>>>
>>>>>         LOG.info("Emitting one url");
>>>>>
>>>>>         String url = ret.get(0).toString();
>>>>>         beingProcessed.put(url, "");
>>>>>
>>>>>         this._collector.emit(ret, url);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void ack(Object msgId) {
>>>>>         LOG.info("Acking");
>>>>>         this.beingProcessed.remove(msgId);
>>>>>         this.ackQueue.offer((String) msgId);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void fail(Object msgId) {
>>>>>         LOG.error("Fail tuple {}", msgId);
>>>>>         this.beingProcessed.remove(msgId);
>>>>>         this.failQueue.offer((String) msgId);
>>>>>     }
>>>>>
>>>>>     private class ProducerThread extends Thread {
>>>>>         @Override
>>>>>         public void run() {
>>>>>             while (activated) {
>>>>>                 try {
>>>>>                     if (this.queue.size() <= 1000) {
>>>>>                         this.populateQueue();
>>>>>                     }
>>>>>
>>>>>                     Utils.sleep(100);
>>>>>                 } catch (Exception e) {
>>>>>                     LOG.error("Error reading queues from redis", e);
>>>>>                 }
>>>>>             }
>>>>>         }
>>>>>
>>>>>         private void populateQueue() {
>>>>>             // Calling Redis to populate Queue
>>>>>             queue.offer(new Values(url, metadata));
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private abstract class AckFailThread extends Thread {
>>>>>         @Override
>>>>>         public void run() {
>>>>>             while (activated) {
>>>>>                 String message = queue.poll(1, TimeUnit.SECONDS);
>>>>>
>>>>>                 if (message != null) {
>>>>>                     this.handleMessage(message);
>>>>>                 }
>>>>>             }
>>>>>         }
>>>>>
>>>>>         protected abstract void handleMessage(String message);
>>>>>     }}
>>>>>
>>>>>
>>>>> I've remove unnecessary code.
>>>>> To understand : nextTuple is polling from a queue (populated on
>>>>> another thread) and ack,fail are emitting to a queue, consumed in two
>>>>> another thread. So, those three methods are not blocking.
>>>>>
>>>>> My problem is on running state, my spout is not called sometimes about
>>>>> thirty seconds, but there still message on nextTuple queue to be consumed.
>>>>> The spout is not acking or failling, So why the  Spout thread is
>>>>> blocked ?
>>>>>
>>>>> Thank
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> *Open Source Solutions for Text Engineering*
>>>
>>> http://www.digitalpebble.com
>>> http://digitalpebble.blogspot.com/
>>> #digitalpebble <http://twitter.com/digitalpebble>
>>>
>>
>>
>
>
> --
>
> *Open Source Solutions for Text Engineering*
>
> http://www.digitalpebble.com
> http://digitalpebble.blogspot.com/
> #digitalpebble <http://twitter.com/digitalpebble>
>

Reply via email to