On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <[email protected]> wrote: > > I took a good look at the Route Policy - at first the > ThrottlingInflightRoutePolicy class seemed like it could work - as I really > only want 5 exchanges to be in-flight at a time. > > Unfortunately it would never suspend the consumer. I dug deeper into the > code and discovered why. The ThrottlingInflightRoutePolicy class only > checks the number of inflight exchanges AFTER an exchange has been processed > (the code to stop or start a consumer is all done in the onExchangeDone > method). > > Since in my case the exchanges will take a while to process - it wouldn't > know it had exceeded the maximum number until after it had finished > processing one of them. > > In my opinion that is a bug - or at the very least an important thing to > note in the documentation. I spent a fair bit of time trying to figure out > why I could not get it to work as it appeared it was supposed to. All > because it was not checking the inflight numbers to the threshold until > after it had finished processing an exchange. > > I also tried writing my own FileThrottlingRoutePolicy that would test how > many files were in a "inprogress" directory - and stop the consumer if it > exceeded the max concurrent files. > > However I ran into read/write issues when I used the preMove of files - for > some reason my processes later would throw exceptions about file not found > or file lock (I can't remember which - i have been trying so many different > things today to try and get this working). > > In the end I solved my problem by avoiding my problem :) > > The primary reason I didn't want the file locks to occur is it would be a > manual cleanup if we ever had to kill the process while it's running. > Otherwise the next time it started, it would ignore any of the files that > had a lock file as well. >
We have a ticket for that https://issues.apache.org/activemq/browse/CAMEL-2082 > I re-wrote my route to work as follows: > > from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process() > Nice solution :) > This way - when files are "finished" they will be placed in a "processed" > directory, when they fail they are put in a "failed" directory. Anything > still in the incoming directory is to be processed. Because the memory of > what was processed and what hasn't been was all in memory - restarting the > process will just re-start any of the files still in the incoming directory. > > No more Lock files means restarting it won't cause us to have to delete > .lock files. > > I wish there was still an easier way to do what I wanted. Now I just have > to rely on the threads(5) to do the limiting to 5 files at a time. Although > if I understand your comment (and the documentation) I can't actually rely > on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads > depending on the system load? > > Jonathan > > > > > Claus Ibsen-2 wrote: >> >> Hi >> >> See also route policy to throttle the file consumer to a pace of 5 >> concurrent files >> http://camel.apache.org/routepolicy.html >> >> >> >> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez <[email protected]> >> wrote: >>> >>> >>> jonathanq wrote: >>>> >>>> I am trying to write a process that will use a file endpoint (camel >>>> 2.1.0) >>>> to read from a directory. >>>> >>>> I need the process to read a file from the directory and then do some >>>> processing on the contents (namely hitting a REST service for each >>>> record >>>> in the file). We have been asked to limit the number of threads that >>>> are >>>> hitting the service to 5. So we decided to simply process 5 files at a >>>> time (to avoid concurrency issues reading 1 file and writing to 1 file >>>> with 5 threads) >>>> >>>> I tried a few different approaches, and I wanted to see if there was a >>>> way >>>> to do what I want. >>>> >>>> Approach 1: >>>> >>>> from("file://incoming").to("seda:filequeue") >>>> >>>> from("seda:filequeue").thread(5).process() >>>> >>>> Now - this reads in ALL of the files in the directory (places camelLock >>>> on >>>> all) and then sends them to the seda endpoint. I saw log messages that >>>> referred to thread 1 through 6. But from what I read on the >>>> documentation, thread() is not necessarily going t limit it at that >>>> number. >>>> >> >> thread(5) will limit to at most 5 concurrent threads from this point >> forward. >> >> >>>> Approach 2: >>>> >>>> from("file://incoming").thread(5).process() >>>> >>>> This only processed 5 files at a time - but created camelLocks on all >>>> files in the directory. >>>> >>>> So then I tried approach 3: >>>> >>>> from("file://incoming").to("seda:filequeue") >>>> >>>> from("seda:filequeue?concurrentConsumers=5").process() >>>> >>>> Again this seems to work, however it puts a camelLock on all the files >>>> (because they were all processed by the first part of the route, they >>>> are >>>> just queued up in the second). >>>> >>>> >>>> While approach 3 works - what I would really like is to not have the >>>> camelLock placed on the files that are not being processed. >>>> >>>> So watching the directory, there would be (at most) 5 files with >>>> camelLock >>>> files created at a time, when they finish they are moved to the .camel >>>> directory, and then it starts processing the next file in the directory. >>>> >> >> You can also implement your own ProcessStrategy where you can deny >> consuming in more files than 5 at any given time. >> See the processStrategy option on the file consumer. Just return false >> on the begin() method. >> >> See >> http://camel.apache.org/file2.html >> in the bottom of the page. >> >> >>>> Is that possible? Is there anything I should be sure to do in an error >>>> route so that I "roll back" the camel locks to ensure that unprocessed >>>> files are ready to process the next time the application starts? >>>> >>> >>> Hi, >>> >>> Maybe you can try to use the parameter maxMessagesPerPoll on the file >>> endpoint i.e.: >>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process() >>> >>> Check the file component documentation : >>> http://camel.apache.org/file2.html >>> >>> -- >>> View this message in context: >>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html >>> Sent from the Camel - Users mailing list archive at Nabble.com. >>> >>> >> >> >> >> -- >> Claus Ibsen >> Apache Camel Committer >> >> Author of Camel in Action: http://www.manning.com/ibsen/ >> Open Source Integration: http://fusesource.com >> Blog: http://davsclaus.blogspot.com/ >> Twitter: http://twitter.com/davsclaus >> >> > > -- > View this message in context: > http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html > Sent from the Camel - Users mailing list archive at Nabble.com. > > -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus
