8.1. _thread 模块 API 文档
8.1.1. API
def start_new_thread(function:any,args_:any):...
def stack_size(*size)->int:...
8.1.2. Examples
8.1.2.1. thread_issue1.py
import time
import _thread
def task2():
while True:
print("123")
time.sleep_ms(201)
_thread.start_new_thread(task2, ())
while True:
time.sleep_ms(200)
8.1.2.2. test2.py
import time
import _thread
finished = False
def test_thread(arg):
global finished
for i in range(3):
print(i)
time.sleep(0.1)
finished = True
print('test_thread arg:', arg)
assert arg == 'test'
# 开启线程 获取数据
_thread.start_new_thread(test_thread, ('test'))
while not finished:
time.sleep(0.1)
time.sleep(0.1)
8.1.2.3. thread_self.py
import _thread
import time
class Test:
_val = 1
def __init__(self):
self._val = 2
_thread.start_new_thread(self.init, ())
def init(self):
print('self._val:', self._val)
self._val = 3
test = Test()
while test._val != 3:
time.sleep(0.1)
time.sleep(0.5)
8.1.2.4. test1.py
import _thread
import time
task1_finished = False
task2_finished = False
def task1():
global task1_finished
print("task1")
for i in range(10):
time.sleep(0.05)
print("task1")
task1_finished = True
def task2(sleep_time, loop_count):
global task2_finished
print("task2:", sleep_time, loop_count)
for i in range(loop_count):
time.sleep(sleep_time)
print("task2")
task2_finished = True
_thread.start_new_thread(task1, ())
_thread.start_new_thread(task2, (0.05, 10))
while not task1_finished or not task2_finished:
time.sleep(0.1)
time.sleep(0.5) # wait for threads to exit
8.1.2.5. timer1.py
import time
from timer import Timer
tim = Timer()
def task1(a, b):
print('task1:', a, b)
def task2(a, b):
print('task2:', a, b)
tim.call_after_ms(1000, task1, (1, 2))
tim.call_after_ms(2000, task2, (3, 4))
while len(tim.tasks) > 0:
time.sleep(1)
tim.stop()
time.sleep(1)