On Thu, Dec 31, 2009 at 5:39 PM, jonathanq <[email protected]> wrote: > > Excellent - that would definitely help my solution, as I could use lock files > and if we had to kill the process, it would just delete those on next start > up and re-process the files. > > So when is 2.2.0 coming out? :-) > I implemented this feature today so yeah it will be in 2.2
Early 2010. I hope we get it out in the start of February. The last major goal is to have an improved thread pool configuration. > The solution I have works - and will probably what we will use for this > application. But that will help with future applications as we do end up > writing a lot of file based camel processes. > Yeah file is actually much harder than at first thought. Our goal is to make the file / ftp components in Camel flexible and to cover many of the use cases out there. So any feedback is valuable. I will git it some though to see if we can change the dynamic inflight throttler to be measuring metrics a bit earlier. Okay I guess its time to celebrate the new year. > Thanks for all the help! > > Jonathan > > > Claus Ibsen-2 wrote: >> >> On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <[email protected]> wrote: >>> >>> I took a good look at the Route Policy - at first the >>> ThrottlingInflightRoutePolicy class seemed like it could work - as I >>> really >>> only want 5 exchanges to be in-flight at a time. >>> >>> Unfortunately it would never suspend the consumer. I dug deeper into the >>> code and discovered why. The ThrottlingInflightRoutePolicy class only >>> checks the number of inflight exchanges AFTER an exchange has been >>> processed >>> (the code to stop or start a consumer is all done in the onExchangeDone >>> method). >>> >>> Since in my case the exchanges will take a while to process - it wouldn't >>> know it had exceeded the maximum number until after it had finished >>> processing one of them. >>> >>> In my opinion that is a bug - or at the very least an important thing to >>> note in the documentation. I spent a fair bit of time trying to figure >>> out >>> why I could not get it to work as it appeared it was supposed to. All >>> because it was not checking the inflight numbers to the threshold until >>> after it had finished processing an exchange. >>> >>> I also tried writing my own FileThrottlingRoutePolicy that would test how >>> many files were in a "inprogress" directory - and stop the consumer if it >>> exceeded the max concurrent files. >>> >>> However I ran into read/write issues when I used the preMove of files - >>> for >>> some reason my processes later would throw exceptions about file not >>> found >>> or file lock (I can't remember which - i have been trying so many >>> different >>> things today to try and get this working). >>> >>> In the end I solved my problem by avoiding my problem :) >>> >>> The primary reason I didn't want the file locks to occur is it would be a >>> manual cleanup if we ever had to kill the process while it's running. >>> Otherwise the next time it started, it would ignore any of the files that >>> had a lock file as well. >>> >> >> We have a ticket for that >> https://issues.apache.org/activemq/browse/CAMEL-2082 >> >> >> >>> I re-wrote my route to work as follows: >>> >>> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process() >>> >> >> Nice solution :) >> >>> This way - when files are "finished" they will be placed in a "processed" >>> directory, when they fail they are put in a "failed" directory. Anything >>> still in the incoming directory is to be processed. Because the memory >>> of >>> what was processed and what hasn't been was all in memory - restarting >>> the >>> process will just re-start any of the files still in the incoming >>> directory. >>> >>> No more Lock files means restarting it won't cause us to have to delete >>> .lock files. >>> >>> I wish there was still an easier way to do what I wanted. Now I just >>> have >>> to rely on the threads(5) to do the limiting to 5 files at a time. >>> Although >>> if I understand your comment (and the documentation) I can't actually >>> rely >>> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads >>> depending on the system load? >>> >>> Jonathan >>> >>> >>> >>> >>> Claus Ibsen-2 wrote: >>>> >>>> Hi >>>> >>>> See also route policy to throttle the file consumer to a pace of 5 >>>> concurrent files >>>> http://camel.apache.org/routepolicy.html >>>> >>>> >>>> >>>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez <[email protected]> >>>> wrote: >>>>> >>>>> >>>>> jonathanq wrote: >>>>>> >>>>>> I am trying to write a process that will use a file endpoint (camel >>>>>> 2.1.0) >>>>>> to read from a directory. >>>>>> >>>>>> I need the process to read a file from the directory and then do some >>>>>> processing on the contents (namely hitting a REST service for each >>>>>> record >>>>>> in the file). We have been asked to limit the number of threads that >>>>>> are >>>>>> hitting the service to 5. So we decided to simply process 5 files at >>>>>> a >>>>>> time (to avoid concurrency issues reading 1 file and writing to 1 file >>>>>> with 5 threads) >>>>>> >>>>>> I tried a few different approaches, and I wanted to see if there was a >>>>>> way >>>>>> to do what I want. >>>>>> >>>>>> Approach 1: >>>>>> >>>>>> from("file://incoming").to("seda:filequeue") >>>>>> >>>>>> from("seda:filequeue").thread(5).process() >>>>>> >>>>>> Now - this reads in ALL of the files in the directory (places >>>>>> camelLock >>>>>> on >>>>>> all) and then sends them to the seda endpoint. I saw log messages >>>>>> that >>>>>> referred to thread 1 through 6. But from what I read on the >>>>>> documentation, thread() is not necessarily going t limit it at that >>>>>> number. >>>>>> >>>> >>>> thread(5) will limit to at most 5 concurrent threads from this point >>>> forward. >>>> >>>> >>>>>> Approach 2: >>>>>> >>>>>> from("file://incoming").thread(5).process() >>>>>> >>>>>> This only processed 5 files at a time - but created camelLocks on all >>>>>> files in the directory. >>>>>> >>>>>> So then I tried approach 3: >>>>>> >>>>>> from("file://incoming").to("seda:filequeue") >>>>>> >>>>>> from("seda:filequeue?concurrentConsumers=5").process() >>>>>> >>>>>> Again this seems to work, however it puts a camelLock on all the files >>>>>> (because they were all processed by the first part of the route, they >>>>>> are >>>>>> just queued up in the second). >>>>>> >>>>>> >>>>>> While approach 3 works - what I would really like is to not have the >>>>>> camelLock placed on the files that are not being processed. >>>>>> >>>>>> So watching the directory, there would be (at most) 5 files with >>>>>> camelLock >>>>>> files created at a time, when they finish they are moved to the .camel >>>>>> directory, and then it starts processing the next file in the >>>>>> directory. >>>>>> >>>> >>>> You can also implement your own ProcessStrategy where you can deny >>>> consuming in more files than 5 at any given time. >>>> See the processStrategy option on the file consumer. Just return false >>>> on the begin() method. >>>> >>>> See >>>> http://camel.apache.org/file2.html >>>> in the bottom of the page. >>>> >>>> >>>>>> Is that possible? Is there anything I should be sure to do in an >>>>>> error >>>>>> route so that I "roll back" the camel locks to ensure that unprocessed >>>>>> files are ready to process the next time the application starts? >>>>>> >>>>> >>>>> Hi, >>>>> >>>>> Maybe you can try to use the parameter maxMessagesPerPoll on the file >>>>> endpoint i.e.: >>>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process() >>>>> >>>>> Check the file component documentation : >>>>> http://camel.apache.org/file2.html >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html >>>>> Sent from the Camel - Users mailing list archive at Nabble.com. >>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> Claus Ibsen >>>> Apache Camel Committer >>>> >>>> Author of Camel in Action: http://www.manning.com/ibsen/ >>>> Open Source Integration: http://fusesource.com >>>> Blog: http://davsclaus.blogspot.com/ >>>> Twitter: http://twitter.com/davsclaus >>>> >>>> >>> >>> -- >>> View this message in context: >>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html >>> Sent from the Camel - Users mailing list archive at Nabble.com. >>> >>> >> >> >> >> -- >> Claus Ibsen >> Apache Camel Committer >> >> Author of Camel in Action: http://www.manning.com/ibsen/ >> Open Source Integration: http://fusesource.com >> Blog: http://davsclaus.blogspot.com/ >> Twitter: http://twitter.com/davsclaus >> >> > > -- > View this message in context: > http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26980045.html > Sent from the Camel - Users mailing list archive at Nabble.com. > > -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus
