Hi On Thu, Dec 31, 2009 at 8:59 PM, jonathanq <[email protected]> wrote: > > Sorry - I meant - this won't cause me too much trouble since I could just > change the code type to be File not GenericFile. Just thought it was > strange. > > I actually changed it to do a .convertBodyTo(String.class) in my route - > since I was just reading the file and that was why I needed a GenericFile > object to get the name to read. Not sure why I didn't think of doing that > earlier. >
GenericFile should really be an internal representation only. End users is encouraged to use regular types such as java.io.File or String as you did. > Jonathan > > jonathanq wrote: >> >> Claus, >> >> I figured out why my custom Route Policy wasn't working for me. >> >> It seems that there is an issue when I use preMove on the File endpoint - >> when the Exchange reaches my first Processor, it is no longer a >> "GenericFile" class. >> >> My processor does this on the first line: >> >> GenericFile file = exchange.getIn().getBody(GenericFile.class); >> >> This works fine with my route - but as soon as I use preMove and put the >> in progress files into a "processing" directory. The type of the body is >> no longer GenericFile, but is now a java.io.File. So of course, file is >> now null. >> >> This isn't an issue from my code, I can change the type and everything >> will work. But it seems very strange that it changes types when "preMove" >> is set on the endpoint. >> >> Is that expected behavior? >> >> Jonathan >> >> >> Claus Ibsen-2 wrote: >>> >>> On Thu, Dec 31, 2009 at 5:39 PM, jonathanq <[email protected]> wrote: >>>> >>>> Excellent - that would definitely help my solution, as I could use lock >>>> files >>>> and if we had to kill the process, it would just delete those on next >>>> start >>>> up and re-process the files. >>>> >>>> So when is 2.2.0 coming out? :-) >>>> >>> I implemented this feature today so yeah it will be in 2.2 >>> >>> Early 2010. I hope we get it out in the start of February. The last >>> major goal is to have an improved thread pool configuration. >>> >>> >>>> The solution I have works - and will probably what we will use for this >>>> application. But that will help with future applications as we do end up >>>> writing a lot of file based camel processes. >>>> >>> >>> Yeah file is actually much harder than at first thought. Our goal is >>> to make the file / ftp components in Camel flexible and to cover many >>> of the use cases out there. So any feedback is valuable. >>> >>> I will git it some though to see if we can change the dynamic inflight >>> throttler to be measuring metrics a bit earlier. >>> >>> Okay I guess its time to celebrate the new year. >>> >>> >>> >>>> Thanks for all the help! >>>> >>>> Jonathan >>>> >>>> >>>> Claus Ibsen-2 wrote: >>>>> >>>>> On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <[email protected]> >>>>> wrote: >>>>>> >>>>>> I took a good look at the Route Policy - at first the >>>>>> ThrottlingInflightRoutePolicy class seemed like it could work - as I >>>>>> really >>>>>> only want 5 exchanges to be in-flight at a time. >>>>>> >>>>>> Unfortunately it would never suspend the consumer. I dug deeper into >>>>>> the >>>>>> code and discovered why. The ThrottlingInflightRoutePolicy class only >>>>>> checks the number of inflight exchanges AFTER an exchange has been >>>>>> processed >>>>>> (the code to stop or start a consumer is all done in the >>>>>> onExchangeDone >>>>>> method). >>>>>> >>>>>> Since in my case the exchanges will take a while to process - it >>>>>> wouldn't >>>>>> know it had exceeded the maximum number until after it had finished >>>>>> processing one of them. >>>>>> >>>>>> In my opinion that is a bug - or at the very least an important thing >>>>>> to >>>>>> note in the documentation. I spent a fair bit of time trying to >>>>>> figure >>>>>> out >>>>>> why I could not get it to work as it appeared it was supposed to. All >>>>>> because it was not checking the inflight numbers to the threshold >>>>>> until >>>>>> after it had finished processing an exchange. >>>>>> >>>>>> I also tried writing my own FileThrottlingRoutePolicy that would test >>>>>> how >>>>>> many files were in a "inprogress" directory - and stop the consumer if >>>>>> it >>>>>> exceeded the max concurrent files. >>>>>> >>>>>> However I ran into read/write issues when I used the preMove of files >>>>>> - >>>>>> for >>>>>> some reason my processes later would throw exceptions about file not >>>>>> found >>>>>> or file lock (I can't remember which - i have been trying so many >>>>>> different >>>>>> things today to try and get this working). >>>>>> >>>>>> In the end I solved my problem by avoiding my problem :) >>>>>> >>>>>> The primary reason I didn't want the file locks to occur is it would >>>>>> be a >>>>>> manual cleanup if we ever had to kill the process while it's running. >>>>>> Otherwise the next time it started, it would ignore any of the files >>>>>> that >>>>>> had a lock file as well. >>>>>> >>>>> >>>>> We have a ticket for that >>>>> https://issues.apache.org/activemq/browse/CAMEL-2082 >>>>> >>>>> >>>>> >>>>>> I re-wrote my route to work as follows: >>>>>> >>>>>> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process() >>>>>> >>>>> >>>>> Nice solution :) >>>>> >>>>>> This way - when files are "finished" they will be placed in a >>>>>> "processed" >>>>>> directory, when they fail they are put in a "failed" directory. >>>>>> Anything >>>>>> still in the incoming directory is to be processed. Because the >>>>>> memory >>>>>> of >>>>>> what was processed and what hasn't been was all in memory - restarting >>>>>> the >>>>>> process will just re-start any of the files still in the incoming >>>>>> directory. >>>>>> >>>>>> No more Lock files means restarting it won't cause us to have to >>>>>> delete >>>>>> .lock files. >>>>>> >>>>>> I wish there was still an easier way to do what I wanted. Now I just >>>>>> have >>>>>> to rely on the threads(5) to do the limiting to 5 files at a time. >>>>>> Although >>>>>> if I understand your comment (and the documentation) I can't actually >>>>>> rely >>>>>> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads >>>>>> depending on the system load? >>>>>> >>>>>> Jonathan >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Claus Ibsen-2 wrote: >>>>>>> >>>>>>> Hi >>>>>>> >>>>>>> See also route policy to throttle the file consumer to a pace of 5 >>>>>>> concurrent files >>>>>>> http://camel.apache.org/routepolicy.html >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez >>>>>>> <[email protected]> >>>>>>> wrote: >>>>>>>> >>>>>>>> >>>>>>>> jonathanq wrote: >>>>>>>>> >>>>>>>>> I am trying to write a process that will use a file endpoint (camel >>>>>>>>> 2.1.0) >>>>>>>>> to read from a directory. >>>>>>>>> >>>>>>>>> I need the process to read a file from the directory and then do >>>>>>>>> some >>>>>>>>> processing on the contents (namely hitting a REST service for each >>>>>>>>> record >>>>>>>>> in the file). We have been asked to limit the number of threads >>>>>>>>> that >>>>>>>>> are >>>>>>>>> hitting the service to 5. So we decided to simply process 5 files >>>>>>>>> at >>>>>>>>> a >>>>>>>>> time (to avoid concurrency issues reading 1 file and writing to 1 >>>>>>>>> file >>>>>>>>> with 5 threads) >>>>>>>>> >>>>>>>>> I tried a few different approaches, and I wanted to see if there >>>>>>>>> was a >>>>>>>>> way >>>>>>>>> to do what I want. >>>>>>>>> >>>>>>>>> Approach 1: >>>>>>>>> >>>>>>>>> from("file://incoming").to("seda:filequeue") >>>>>>>>> >>>>>>>>> from("seda:filequeue").thread(5).process() >>>>>>>>> >>>>>>>>> Now - this reads in ALL of the files in the directory (places >>>>>>>>> camelLock >>>>>>>>> on >>>>>>>>> all) and then sends them to the seda endpoint. I saw log messages >>>>>>>>> that >>>>>>>>> referred to thread 1 through 6. But from what I read on the >>>>>>>>> documentation, thread() is not necessarily going t limit it at that >>>>>>>>> number. >>>>>>>>> >>>>>>> >>>>>>> thread(5) will limit to at most 5 concurrent threads from this point >>>>>>> forward. >>>>>>> >>>>>>> >>>>>>>>> Approach 2: >>>>>>>>> >>>>>>>>> from("file://incoming").thread(5).process() >>>>>>>>> >>>>>>>>> This only processed 5 files at a time - but created camelLocks on >>>>>>>>> all >>>>>>>>> files in the directory. >>>>>>>>> >>>>>>>>> So then I tried approach 3: >>>>>>>>> >>>>>>>>> from("file://incoming").to("seda:filequeue") >>>>>>>>> >>>>>>>>> from("seda:filequeue?concurrentConsumers=5").process() >>>>>>>>> >>>>>>>>> Again this seems to work, however it puts a camelLock on all the >>>>>>>>> files >>>>>>>>> (because they were all processed by the first part of the route, >>>>>>>>> they >>>>>>>>> are >>>>>>>>> just queued up in the second). >>>>>>>>> >>>>>>>>> >>>>>>>>> While approach 3 works - what I would really like is to not have >>>>>>>>> the >>>>>>>>> camelLock placed on the files that are not being processed. >>>>>>>>> >>>>>>>>> So watching the directory, there would be (at most) 5 files with >>>>>>>>> camelLock >>>>>>>>> files created at a time, when they finish they are moved to the >>>>>>>>> .camel >>>>>>>>> directory, and then it starts processing the next file in the >>>>>>>>> directory. >>>>>>>>> >>>>>>> >>>>>>> You can also implement your own ProcessStrategy where you can deny >>>>>>> consuming in more files than 5 at any given time. >>>>>>> See the processStrategy option on the file consumer. Just return >>>>>>> false >>>>>>> on the begin() method. >>>>>>> >>>>>>> See >>>>>>> http://camel.apache.org/file2.html >>>>>>> in the bottom of the page. >>>>>>> >>>>>>> >>>>>>>>> Is that possible? Is there anything I should be sure to do in an >>>>>>>>> error >>>>>>>>> route so that I "roll back" the camel locks to ensure that >>>>>>>>> unprocessed >>>>>>>>> files are ready to process the next time the application starts? >>>>>>>>> >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Maybe you can try to use the parameter maxMessagesPerPoll on the >>>>>>>> file >>>>>>>> endpoint i.e.: >>>>>>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process() >>>>>>>> >>>>>>>> Check the file component documentation : >>>>>>>> http://camel.apache.org/file2.html >>>>>>>> >>>>>>>> -- >>>>>>>> View this message in context: >>>>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html >>>>>>>> Sent from the Camel - Users mailing list archive at Nabble.com. >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Claus Ibsen >>>>>>> Apache Camel Committer >>>>>>> >>>>>>> Author of Camel in Action: http://www.manning.com/ibsen/ >>>>>>> Open Source Integration: http://fusesource.com >>>>>>> Blog: http://davsclaus.blogspot.com/ >>>>>>> Twitter: http://twitter.com/davsclaus >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> View this message in context: >>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html >>>>>> Sent from the Camel - Users mailing list archive at Nabble.com. >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Claus Ibsen >>>>> Apache Camel Committer >>>>> >>>>> Author of Camel in Action: http://www.manning.com/ibsen/ >>>>> Open Source Integration: http://fusesource.com >>>>> Blog: http://davsclaus.blogspot.com/ >>>>> Twitter: http://twitter.com/davsclaus >>>>> >>>>> >>>> >>>> -- >>>> View this message in context: >>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26980045.html >>>> Sent from the Camel - Users mailing list archive at Nabble.com. >>>> >>>> >>> >>> >>> >>> -- >>> Claus Ibsen >>> Apache Camel Committer >>> >>> Author of Camel in Action: http://www.manning.com/ibsen/ >>> Open Source Integration: http://fusesource.com >>> Blog: http://davsclaus.blogspot.com/ >>> Twitter: http://twitter.com/davsclaus >>> >>> >> >> > > -- > View this message in context: > http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26981818.html > Sent from the Camel - Users mailing list archive at Nabble.com. > > -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus
