# threading 模块 API 文档 ## API ``` python def get_ident()->int64:... ``` ### class Lock(): ``` python def __init__(self):... ``` ``` python def acquire(self,block:bool,timeout:any)->bool:... ``` ``` python def locked(self)->bool:... ``` ``` python def release(self):... ``` ``` python def __del__(self):... ``` ### class RLock(): ``` python def __init__(self):... ``` ``` python def acquire(self,block:bool,timeout:any)->bool:... ``` ``` python def locked(self)->bool:... ``` ``` python def release(self):... ``` ``` python def __del__(self):... ``` ### class Condition: ``` python def __init__(self):... ``` ``` python def wait(self,timeout:any)->bool:... ``` ``` python def notify(self,n:int):... ``` ``` python def notify_all(self):... ``` ``` python def _is_owned(self)->bool:... ``` ``` python def __del__(self):... ``` ``` python def acquire(self,block:bool,timeout:any)->bool:... ``` ``` python def release(self):... ``` ## Examples ### lock_rlock.py ```python import _thread import time import threading import PikaStdLib # 共享资源 shared_resource = 0 # 互斥锁 mutex = threading.Lock() # 线程函数 finished = 0 def thread_function(name, delay): global shared_resource global mutex, finished print("delay : %s" % str(delay)) k = 0 i = 0 mem = PikaStdLib.MemChecker() for i in range(5): # while 1: try: # 获取互斥锁 print("%s try to acquire lock. #1" % name) res = mutex.acquire(True, None) print("res: %s" % str(res)) if 1: # 测试RLock或者Lock的超时加上 print("%s try to acquire lock. #2" % name) res = mutex.acquire(True, 0.5) print("res: %s" % str(res)) if res: print("%s acquire lock SUCC." % name) else: print("%s acquire lock FAIL." % name) # 打印当前线程名称和共享资源的值 print("Thread %s: Iteration %d, Shared Resource: %d" % (name, i, shared_resource)) # 更新共享资源 shared_resource += 1 # 模拟工作时间 time.sleep(delay) print("wake") # 释放互斥锁 mutex.release() mutex.release() k += 1 print("%s i = %d." % (name, i)) # print('mem used now:') # mem.now() except: print("------------- error ---------------") print("%s exit , at last, i = %d." % (name, k)) finished += 1 # 主函数 def main(): # 创建第一个线程 _thread.start_new_thread(thread_function, ("Thread-1", 0.1)) time.sleep(0.5) # 创建第二个线程 _thread.start_new_thread(thread_function, ("Thread-2", 0.2)) # 主线程等待子线程结束 # 由于 _thread 没有 join 方法,我们通过 sleep 来模拟等待 # time.sleep(60) while finished < 2: time.sleep(1) main() ```