如下 Python threading 标准库_Condition 类的的代码。在 Wait 条件发生前,需要先 acquire _Condition 的锁。
而在 wait 的时候,会先判断 当前线程 是否拥有该锁。
def _is_owned(self):
if self.__lock.acquire(0):
self.__lock.release()
return False
else:
return True
class _Condition(_Verbose):
def __init__(self, lock=None, verbose=None):
_Verbose.__init__(self, verbose)
if lock is None:
lock = RLock()
self.__lock = lock
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self.__waiters = []
def __enter__(self):
return self.__lock.__enter__()
def __exit__(self, *args):
return self.__lock.__exit__(*args)
def __repr__(self):
return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
def _release_save(self):
self.__lock.release() # No state to save
def _acquire_restore(self, x):
self.__lock.acquire() # Ignore saved state
def _is_owned(self):
if self.__lock.acquire(0):
self.__lock.release()
return False
else:
return True
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self.__waiters.append(waiter)
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
if __debug__:
self._note("%s.wait(): got it", self)
else:
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
gotit = waiter.acquire(0)
if gotit:
break
remaining = endtime - _time()
if remaining <= 0:
break
delay = min(delay * 2, remaining, .05)
_sleep(delay)
if not gotit:
if __debug__:
self._note("%s.wait(%s): timed out", self, timeout)
try:
self.__waiters.remove(waiter)
except ValueError:
pass
else:
if __debug__:
self._note("%s.wait(%s): got it", self, timeout)
finally:
self._acquire_restore(saved_state)
1
wwqgtxx 2017-01-05 20:32:34 +08:00
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call # release() and acquire() on the lock). Ditto for _is_owned(). 这是 Python 源代码中的注释,具体取决于 lock 本身的实现 |
2
wwqgtxx 2017-01-05 20:36:21 +08:00
至于具体怎么实现,可以参考我用 Redis 实现的分布式 Lock
https://github.com/wwqgtxx/RedisTools/blob/master/redistools/redistools.py#L179 |
3
merpyzf 2019-03-09 11:21:04 +08:00
由于 Condition 内部的 lock/RLock 对象是在线程间共享的,因此_is_owned 方法并不能判断当前的线程是否拥有该锁。
举一个例子,现在有两个线程 T1 T2,(T2 线程的执行时间晚于 T1): T1 线程调用了 cond._lock.acquire(),获取锁,不释放锁 T2 线程中调用了 cond._lock.acquire(False),此时 acquire()返回结果为 False。 如果在 T2 线程中调用 cond._is_owned 方法那么会返回 True。显然 T2 线程并没有持有锁。 此时如果直接在 T2 线程中调用 cond.wait()是可以成功阻塞 T2 线程的运行的,尽管在 T2 线程中并没有持有锁。 因此在线程中调用 cond.wait()方法时,只要满足 Condition 实例对象的内部的 lock 已经持有某个线程的锁就可以了。 |