The approach that I took incorporates some of the ideas listed here...
Basically each message in the queue was assigned a sequence number (needs to
be unique and increasing per queue), and then read out in sequence number
order.
The Message CF is logically one row per queue, with each column being the
message (Name=sequenceNum, val=data). In reality it is bucketed to get
around the row size and smaller rows seem to be better.


Another CF was used to keep track of the Highest Completed Sequence Number
(per queue) and was updated after committing products made from the
messages. When products were not idempotent, they were also stored with
their corresponding highest completed sequence number. In this way you
create a bit of journaling and should always move forward.

The problem was creating the sequence numbers. I didn't want to rely on
system time, because I was afraid that in a cluster the times would be out
of skew. Even with NTP, etc, and I would end up with some messages out of
order (though rarely). This may not be a problem for some/most.

I didn't want to rely on a system like zookeeper (or pick your favorite
source), because to store a message I didn't want to hit the sequence source
service and then store.
A DB High/Low type algorithm would work fine as long as you managed the High
correctly and didn't have multiple High's running for the same Queue at the
same time.


So I used ZK to assign responsibility for certain queues to certain machines
(load balance issue here). When a machine took the queue range, it would get
a new sequence number (64bit High) from ZK or Clock or DB etc, and then
every additional message processed would get a local 32 bit counter(Low)...
Not entirely satisfied with this scheme yet.



As far as scheduling the jobs.

When a message is inserted, it also updates an 'Activity' CF (actually
updates happen every few messages rand()%5==0 etc). The Activity CF maps
from QueueId to a single activity column (last update time and
sequenceNumber) (Actually there is some bucketing/caching for efficiency)
This can all be cached/asynch/lazy, because repeat work is already handled.

Workers (which are already assigned a range of queue responsibility by ZK)
scan the activity table (in queue ID order) for any entry. When the job is
completed the activity entry is deleted. If more activity happened in the
mean time, the delete will be ignored because the TS is !=.  If Cassandra
provided more complex queries, you could create more complex work scheduling
behavior.

There is no priority in this scheme.

Original Messages are left undeleted for now. But if you wanted to delete
them I would use some of the coming bulk deletion/truncate features, like
drop the entire CF.

-JD




On Sat, Jun 26, 2010 at 3:31 PM, Jonathan Shook <jsh...@gmail.com> wrote:

> Ideas:
>
> Use a checkpoint that moves forward in time for each logical partition
> of the workload.
>
> Establish a way of dividing up jobs between clients that doesn't
> require synchronization. One way of doing this would be to modulo the
> key by the number of logical workers, allowing them to graze directly
> on the job data. Doing it this way means that you have to make the
> workers smart enough to checkpoint properly, handle exceptions, etc.
> Jobs may be dispatched out-of-order in this scheme, so you would have
> to decide how to handle explicit sequencing requirements. Some jobs
> have idempotent results only when executed in the same order, and
> keeping operations idempotent allows for simpler failure recovery. If
> your workers are capable of absorbing the workload, then backlogging
> won't hurt too much. Otherwise, you'll see strange ordering of things
> in your application when they would otherwise need to look more
> consistent.
>
> You might find it easier to just take the hit of having a synchronized
> dispatcher, but make it is a lean as possible.
>
> Another way to break workload up is to have logical groupings of jobs
> according to a natural boundary in your domain model, and to run a
> synchronized dispatcher for each of those.
>
> Using the "job" columns to keep track of who owns a job may not be the
> best approach. You may have to do row scans on column data, which is a
> Cassandra anti-pattern. Without an atomic check and modify operation,
> there is no way to do it that avoids possible race conditions or extra
> state management. This may be one of the strongest arguments for
> putting such an operation into Cassandra.
>
> You can set up your job name/keying such that every job result is
> logically ordered to come immediately after the job definition. Row
> key range scans would still be close to optimal, but would carry a
> marker for jobs which had been completed, This would allow clients to
> self-checkpoint, as long as result insertions are atomic row-wise. (I
> think they are). Another worker could clean up rows which were
> subsequently consumed (results no longer needed) after some gap in
> time.  The client can avoid lots of tombstones by only looking where
> there should be additional work. (checkpoint time). Pick a character
> that is not natural for your keys and make it a delimiter. Require
> that all keys in the job CF be aggregate and fully-qualified.
>
> Clients might be able to remove jobs rows that allow for it after
> completion, but jobs which were dispatched to multiple works may end
> up with orphaned result rows to be cleaned up.
>
> .. just some drive-by ramblings ..
>
> Jonathan
>
>
>
> On Sat, Jun 26, 2010 at 3:56 PM, Andrew Miklas <and...@pagerduty.com>
> wrote:
> > Hi all,
> >
> > Has anyone written a work-queue implementation using Cassandra?
> >
> > There's a section in the UseCase wiki page for "A distributed Priority
> Job
> > Queue" which looks perfect, but unfortunately it hasn't been filled in
> yet.
> >
> http://wiki.apache.org/cassandra/UseCases#A_distributed_Priority_Job_Queue
> >
> > I've been thinking about how best to do this, but every solution I've
> > thought of seems to have some serious drawback.  The "range ghost"
> problem
> > in particular creates some issues.  I'm assuming each job has a row
> within
> > some column family, where the row's key is the time at which the job
> should
> > be run.  To find the next job, you'd do a range query with a start a few
> > hours in the past, and an end at the current time.  Once a job is
> completed,
> > you delete the row.
> >
> > The problem here is that you have to scan through
> deleted-but-not-yet-GCed
> > rows each time you run the query.  Is there a better way?
> >
> > Preventing more than one worker from starting the same job seems like it
> > would be a problem too.  You'd either need an external locking manager,
> or
> > have to use some other protocol where workers write their ID into the row
> > and then immediately read it back to confirm that they are the owner of
> the
> > job.
> >
> > Any ideas here?  Has anyone come up with a nice implementation?  Is
> > Cassandra not well suited for queue-like tasks?
> >
> >
> >
> > Thanks,
> >
> >
> > Andrew
> >
>

Reply via email to