I think part of this is that I'm unsure how some of the ActiveMQ internals work here.
For example, I've run into problems with message selectors where they don't operate properly if one message subset produces fewer messages than the other message subsets due to the way the 'head of queue' system works in ActiveMQ. the memory gets choked out and so you end up with some subset of your messages having higher latencies. Another thing I was considering was building sort of a bucket system similar to the way storm operates or DHTs operate where we pre-allocate like 20k 'buckets' and put messages into these buckets. This way a consumer only sees the buckets its responsible and some smaller amount of volume. It can then group the messages in bulk when it receives them all. Another way to handle the incomple groups, IE if some of them get dropped .. is to keep rolling them back until they go into the DLQ... On Sun, Jan 24, 2016 at 2:06 PM, Tim Bain <tb...@alumni.duke.edu> wrote: > I'd use message groups for the messages produced by A, ensuring that all 15 > go to a single B consumer. Then B can use CLIENT_ACK or INDIVIDUAL_ACK > mode to only ack the messages when all 15 have been received (holding them > in memory and unacknowledged until all 15 arrive), and then B can publish a > single message to be consumed by C that encompasses all of the information > from the 15 messages. This only requires two queues (AB for messages > between A and B, and BC for messages between B and C), so it'll be simpler > to manage than having multiple aggregation_N queues. > > You'll want to consider what happens if you never get all 15 messages (I'd > eventually write them into a database so the batch can be reassembled later > if they eventually show up), as well as how to do your acking if a single B > consumer can get interleaved message groups (which is why I'd suggest > considering INDIVIDUAL_ACK mode). Also, although I agree that doing all of > this in a database isn't likely to be performant, I think any database > (Cassandra, Oracle, whatever) would be able to handle just your > incomplete-groups use case, since the percentage of groups that end up > routed through that mechanism is presumably small. > > All of this assumes that most message groups will be completed quickly > enough that it's reasonable to hold all messages from most open groups in > the memory (and the prefetch buffer) of one or another of your B consumers, > and that the number of groups that would have to be overflowed. If that's > not reasonable, this gets harder, but you could potentially write the > messages to some type of database but have the consumer just keep track of > which messages it has (so you're storing a single bit-mapped integer for > each message group instead of storing the full message content), and then > read them back in when the 15th one arrives. That means the database > doesn't have to query to know which batches are complete, only be capable > of retrieving the messages for a single group on demand. > > Tim > > On Sun, Jan 24, 2016 at 1:16 PM, Kevin Burton <bur...@spinn3r.com> wrote: > > > I have a pattern which I think I need advice on... > > > > I have three tasks... each a type of message consumer. > > > > Let's call them A, B, an C. > > > > A runs once,, creates 15 messages, sends them to B... then B process > these > > messages then generates 15 new messages. > > > > However, they need to be combined into a group, and sent to C all at > once, > > in one composite message. > > > > So its similar to map/reduce in a way in that C should execute once with > a > > block of these 15 messages. > > > > Conceptually I'm calling them (message group checkpoints).. but I'm > > wondering if there's already a more formal name for this concept. > > > > I'm not sure the best way to handle this with ActiveMQ. > > > > One strategy is that I could have one queue per C tasks (the final tasks) > > and then have C running and consuming them one at a time, and then > > performing the execution (and message commit) once it receives all 15 > > messages. > > > > I HAVE to get all the messages until I can make a decision, I can't > stream > > process them unfortunately because the algorithm needs all data points. > > > > I could use a database.. but the problem there is that it would incur > some > > database and Cassandra (in our case) doesn't handle this queue pattern > very > > well. > > > > Another idea is to use a series of queues ... say aggregation_0, > > aggregation_1, aggregation_2,... > > > > then I receive these messages into the first queue (aggregation_0), then > > sort it, if any of the groups are finished I send them on to the final > > destination task. If any are unfinished then I overflow them on to > > aggregation1 (pre-sorted)... > > > > Thoughts? > > > > -- > > > > We’re hiring if you know of any awesome Java Devops or Linux Operations > > Engineers! > > > > Founder/CEO Spinn3r.com > > Location: *San Francisco, CA* > > blog: http://burtonator.wordpress.com > > … or check out my Google+ profile > > <https://plus.google.com/102718274791889610666/posts> > > > -- We’re hiring if you know of any awesome Java Devops or Linux Operations Engineers! Founder/CEO Spinn3r.com Location: *San Francisco, CA* blog: http://burtonator.wordpress.com … or check out my Google+ profile <https://plus.google.com/102718274791889610666/posts>