On Thu, Dec 31, 2009 at 5:39 PM, jonathanq <[email protected]> wrote:
>
> Excellent - that would definitely help my solution, as I could use lock files
> and if we had to kill the process, it would just delete those on next start
> up and re-process the files.
>
> So when is 2.2.0 coming out? :-)
>
I implemented this feature today so yeah it will be in 2.2

Early 2010. I hope we get it out in the start of February. The last
major goal is to have an improved thread pool configuration.


> The solution I have works - and will probably what we will use for this
> application. But that will help with future applications as we do end up
> writing a lot of file based camel processes.
>

Yeah file is actually much harder than at first thought. Our goal is
to make the file / ftp components in Camel flexible and to cover many
of the use cases out there. So any feedback is valuable.

I will git it some though to see if we can change the dynamic inflight
throttler to be measuring metrics a bit earlier.

Okay I guess its time to celebrate the new year.



> Thanks for all the help!
>
> Jonathan
>
>
> Claus Ibsen-2 wrote:
>>
>> On Thu, Dec 31, 2009 at 12:33 AM, jonathanq <[email protected]> wrote:
>>>
>>> I took a good look at the Route Policy - at first the
>>> ThrottlingInflightRoutePolicy class seemed like it could work - as I
>>> really
>>> only want 5 exchanges to be in-flight at a time.
>>>
>>> Unfortunately it would never suspend the consumer.  I dug deeper into the
>>> code and discovered why.  The ThrottlingInflightRoutePolicy class only
>>> checks the number of inflight exchanges AFTER an exchange has been
>>> processed
>>> (the code to stop or start a consumer is all done in the onExchangeDone
>>> method).
>>>
>>> Since in my case the exchanges will take a while to process - it wouldn't
>>> know it had exceeded the maximum number until after it had finished
>>> processing one of them.
>>>
>>> In my opinion that is a bug - or at the very least an important thing to
>>> note in the documentation.  I spent a fair bit of time trying to figure
>>> out
>>> why I could not get it to work as it appeared it was supposed to.  All
>>> because it was not checking the inflight numbers to the threshold until
>>> after it had finished processing an exchange.
>>>
>>> I also tried writing my own FileThrottlingRoutePolicy that would test how
>>> many files were in a "inprogress" directory - and stop the consumer if it
>>> exceeded the max concurrent files.
>>>
>>> However I ran into read/write issues when I used the preMove of files -
>>> for
>>> some reason my processes later would throw exceptions about file not
>>> found
>>> or file lock (I can't remember which - i have been trying so many
>>> different
>>> things today to try and get this working).
>>>
>>> In the end I solved my problem by avoiding my problem :)
>>>
>>> The primary reason I didn't want the file locks to occur is it would be a
>>> manual cleanup if we ever had to kill the process while it's running.
>>> Otherwise the next time it started, it would ignore any of the files that
>>> had a lock file as well.
>>>
>>
>> We have a ticket for that
>> https://issues.apache.org/activemq/browse/CAMEL-2082
>>
>>
>>
>>> I re-wrote my route to work as follows:
>>>
>>> from("file://incoming?maxMessagesPerPoll=1&idempotent=true&moveFailed=failed&move=processed&readLock=none").threads(5).process()
>>>
>>
>> Nice solution :)
>>
>>> This way - when files are "finished" they will be placed in a "processed"
>>> directory, when they fail they are put in a "failed" directory.  Anything
>>> still in the incoming directory is to be processed.  Because the memory
>>> of
>>> what was processed and what hasn't been was all in memory - restarting
>>> the
>>> process will just re-start any of the files still in the incoming
>>> directory.
>>>
>>> No more Lock files means restarting it won't cause us to have to delete
>>> .lock files.
>>>
>>> I wish there was still an easier way to do what I wanted.  Now I just
>>> have
>>> to rely on the threads(5) to do the limiting to 5 files at a time.
>>>  Although
>>> if I understand your comment (and the documentation) I can't actually
>>> rely
>>> on threads(5) to spawn 5 threads..it will just spawn UP TO 5 threads
>>> depending on the system load?
>>>
>>> Jonathan
>>>
>>>
>>>
>>>
>>> Claus Ibsen-2 wrote:
>>>>
>>>> Hi
>>>>
>>>> See also route policy to throttle the file consumer to a pace of 5
>>>> concurrent files
>>>> http://camel.apache.org/routepolicy.html
>>>>
>>>>
>>>>
>>>> On Wed, Dec 30, 2009 at 11:51 AM, gmagniez <[email protected]>
>>>> wrote:
>>>>>
>>>>>
>>>>> jonathanq wrote:
>>>>>>
>>>>>> I am trying to write a process that will use a file endpoint (camel
>>>>>> 2.1.0)
>>>>>> to read from a directory.
>>>>>>
>>>>>> I need the process to read a file from the directory and then do some
>>>>>> processing on the contents (namely hitting a REST service for each
>>>>>> record
>>>>>> in the file).  We have been asked to limit the number of threads that
>>>>>> are
>>>>>> hitting the service to 5.  So we decided to simply process 5 files at
>>>>>> a
>>>>>> time (to avoid concurrency issues reading 1 file and writing to 1 file
>>>>>> with 5 threads)
>>>>>>
>>>>>> I tried a few different approaches, and I wanted to see if there was a
>>>>>> way
>>>>>> to do what I want.
>>>>>>
>>>>>> Approach 1:
>>>>>>
>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>
>>>>>> from("seda:filequeue").thread(5).process()
>>>>>>
>>>>>> Now - this reads in ALL of the files in the directory (places
>>>>>> camelLock
>>>>>> on
>>>>>> all) and then sends them to the seda endpoint.  I saw log messages
>>>>>> that
>>>>>> referred to thread 1 through 6.  But from what I read on the
>>>>>> documentation, thread() is not necessarily going t limit it at that
>>>>>> number.
>>>>>>
>>>>
>>>> thread(5) will limit to at most 5 concurrent threads from this point
>>>> forward.
>>>>
>>>>
>>>>>> Approach 2:
>>>>>>
>>>>>> from("file://incoming").thread(5).process()
>>>>>>
>>>>>> This only processed 5 files at a time - but created camelLocks on all
>>>>>> files in the directory.
>>>>>>
>>>>>> So then I tried approach 3:
>>>>>>
>>>>>> from("file://incoming").to("seda:filequeue")
>>>>>>
>>>>>> from("seda:filequeue?concurrentConsumers=5").process()
>>>>>>
>>>>>> Again this seems to work, however it puts a camelLock on all the files
>>>>>> (because they were all processed by the first part of the route, they
>>>>>> are
>>>>>> just queued up in the second).
>>>>>>
>>>>>>
>>>>>> While approach 3 works - what I would really like is to not have the
>>>>>> camelLock placed on the files that are not being processed.
>>>>>>
>>>>>> So watching the directory, there would be (at most) 5 files with
>>>>>> camelLock
>>>>>> files created at a time, when they finish they are moved to the .camel
>>>>>> directory, and then it starts processing the next file in the
>>>>>> directory.
>>>>>>
>>>>
>>>> You can also implement your own ProcessStrategy where you can deny
>>>> consuming in more files than 5 at any given time.
>>>> See the processStrategy option on the file consumer. Just return false
>>>> on the begin() method.
>>>>
>>>> See
>>>> http://camel.apache.org/file2.html
>>>> in the bottom of the page.
>>>>
>>>>
>>>>>> Is that possible?  Is there anything I should be sure to do in an
>>>>>> error
>>>>>> route so that I "roll back" the camel locks to ensure that unprocessed
>>>>>> files are ready to process the next time the application starts?
>>>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> Maybe you can try to use the parameter maxMessagesPerPoll on the file
>>>>> endpoint i.e.:
>>>>> from("file://incoming?maxMessagesPerPoll=5").thread(5).process()
>>>>>
>>>>> Check the file component documentation :
>>>>> http://camel.apache.org/file2.html
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26965930.html
>>>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Claus Ibsen
>>>> Apache Camel Committer
>>>>
>>>> Author of Camel in Action: http://www.manning.com/ibsen/
>>>> Open Source Integration: http://fusesource.com
>>>> Blog: http://davsclaus.blogspot.com/
>>>> Twitter: http://twitter.com/davsclaus
>>>>
>>>>
>>>
>>> --
>>> View this message in context:
>>> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26973577.html
>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>
>>>
>>
>>
>>
>> --
>> Claus Ibsen
>> Apache Camel Committer
>>
>> Author of Camel in Action: http://www.manning.com/ibsen/
>> Open Source Integration: http://fusesource.com
>> Blog: http://davsclaus.blogspot.com/
>> Twitter: http://twitter.com/davsclaus
>>
>>
>
> --
> View this message in context: 
> http://old.nabble.com/Processing-5-files-at-a-time---Threads--SEDA-%2B-Concurrent-Consumers--tp26960942p26980045.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Reply via email to