from __future__ import with_statement ''' 3) upon_acquiring( lockA, lockB )( function, *ar, **kwar )
upon_acquiring spawns new thread upon acquiring locks A and B. Locks may be specified in any order, as none is acquired until all are free. The options to spawn a new thread upon call, lock, and not release until "it's its turn"; just block until then; and vary honoring order are open. 6) @with_self Prepends the function object to itself to the parameter list ''' ''' a lockA lockB b lockA lockB c lockA lockB d lockA lockC e lockB lockD f lockE lockF assert a< b assert b< c assert c< d assert c< e assert f in. ''' ''' A (A,B)1 (A,C)2 (A,B)3 B (A,B)1 (A,B)3 C (A,C)2 (C,D,E)4 D (C,D,E)4 E (C,D,E)4 a.lock: A, a.state: Free, a.waiters: [ X1(a,b), X2(a,c), X3(a,b) ] b.lock: B, b.state: Free, b.waiters: [ X1(a,b), X3(a,b) ] c.lock: C, c.state: Free, X3.waiters: [ X2(a,c), X4(c,d,e) ] d.lock: D, d.state: Free, X3.waiters: [ X4(c,d,e) ] e.lock: E, e.state: Free, X3.waiters: [ X4(c,d,e) ] acq a,b x1= a,b a.waiters+= x1 b.waiters+= x1 #same as if a.state is free and b.state is free: a.state= taken b.state= taken a.waiters-= x1 b.waiters-= x1 a.lock.release() #acq? b.lock.release() #acq? x1.lock.release() acq a,c x2= a,c a.waiters+= x2 c.waiters+= x2 if a.state is free and c.state is free: a.state= taken c.state= taken a.waiters-= x2 c.waiters-= x2 a.lock.release() #acq? c.lock.release() #acq? x2.lock.release() acq a,b x3= a,b a.waiters+= x3 b.waiters+= x3 acq c,d,e x4= c,d,e c.waiters+= x4 d.waiters+= x4 e.waiters+= x4 ''' from thread import start_new_thread from threading import Lock, Thread, Event import time from functools import partial class LockDeps: def __init__( self, lock, state, waiters ): self.lock, self.state, self.waiters= \ lock, state, waiters class LockSet: #ok to use them elsewhere, just gets in line. def __init__( self, *locks ): self._locks= locks self._lock= Lock() self._lock.acquire() self._remains= set( locks ) self._doneevt= Event() self.th= None self.retval= None def release( self, *locks ): for lock in locks: lock.release() self._remains.remove( lock ) def releaseall( self ): for lock in self._remains: lock.release() self._remains.clear() class UponAcquiring: def __init__( self ): self._deps= {} self._oplock= Lock() def acq( self, *locks ): lckset= LockSet( *locks ) return partial( self._enqueue, lckset ) def _enqueue( self, lckset, fun, *ar, **kwar ): with self._oplock: for lock in lckset._locks: dep= self._deps.get( lock ) if None is dep: dep= LockDeps( lock, False, [] ) self._deps[ lock ]= dep dep.waiters.append( lckset ) th= Thread( target= self._functhd, args= ( lckset, fun )+ ar, kwargs= kwar ) lckset.th= th th.start() self._analyze( lckset ) return lckset def _functhd( self, lckset, fun, *ar, **kwar ): try: with lckset._lock: lckset.retval=\ fun( lckset, *ar, **kwar ) lckset._doneevt.set() finally: with self._oplock: lckset.releaseall() for lock in lckset._locks: self._deps[ lock ].state= False self._analyze( lckset ) def _analyze( self, lckset ): with self._oplock: for lock in lckset._locks: dep= self._deps[ lock ] if dep.state: continue for lckset in dep.waiters: assert lock in lckset._locks for lock2 in lckset._locks: if self._deps[ lock2 ].state: break else: for lock2 in lckset._locks: dep2= self._deps[ lock2 ] dep2.state= True assert dep2.waiters.count( lckset )== 1 dep2.waiters.remove( lckset ) lock2.acquire() lckset._lock.release() break results= [] ver= results.index lcksets= set() import random from sys import stdout def callback( locks, i ): stdout.write( 'cb%i '% i ) time.sleep( random.uniform( 0, .01 ) ) results.append( i ) if random.choice( [ False, True ] ): locks.releaseall() #if random.random()< 0.1: # raise Exception() while 1: class Case1: lockA, lockB, lockC= Lock(), Lock(), Lock() lockD, lockE, lockF= Lock(), Lock(), Lock() a= ( lockA, lockB ) b= ( lockA, lockB ) c= ( lockA, lockB ) d= ( lockA, lockC ) e= ( lockB, lockD ) f= ( lockE, lockF ) ua= UponAcquiring() for i, x in enumerate( [ a, b, c, d, e, f ] ): lcksets.add( ua.acq( *x )( callback, i ) ) for lckset in lcksets: lckset.th.join() stdout.write( repr( results ) ) stdout.write( '\n' ) stdout.flush() assert ver( 0 )< ver( 1 ) assert ver( 1 )< ver( 2 ) assert ver( 2 )< ver( 3 ) assert ver( 2 )< ver( 4 ) assert len( set( results ) )== len( results ) '''permissible orders e.g.: [0, 5, 1, 2, 3, 4] [5, 0, 1, 2, 3, 4] [0, 5, 1, 2, 4, 3] [5, 0, 1, 2, 4, 3] ''' del results[:] lcksets.clear() -- http://mail.python.org/mailman/listinfo/python-list