8.1. _thread 模块 API 文档

8.1.1. API

def start_new_thread(function:any,args_:any):...
def stack_size(*size)->int:...

8.1.2. Examples

8.1.2.1. thread_issue1.py

import time
import _thread


def task2():
    while True:
        print("123")
        time.sleep_ms(201)


_thread.start_new_thread(task2, ())

while True:
    time.sleep_ms(200)

8.1.2.2. test2.py

import time
import _thread

finished = False

def test_thread(arg):
    global finished
    for i in range(3):
        print(i)
        time.sleep(0.1)
    finished = True
    print('test_thread arg:', arg)
    assert arg == 'test'

# 开启线程 获取数据
_thread.start_new_thread(test_thread, ('test'))
while not finished:
    time.sleep(0.1)
time.sleep(0.1)

8.1.2.3. thread_self.py

import _thread
import time


class Test:
    _val = 1

    def __init__(self):
        self._val = 2
        _thread.start_new_thread(self.init, ())

    def init(self):
        print('self._val:', self._val)
        self._val = 3

test = Test()
while test._val != 3:
    time.sleep(0.1)

time.sleep(0.5)

8.1.2.4. test1.py

import _thread
import time

task1_finished = False
task2_finished = False


def task1():
    global task1_finished
    print("task1")
    for i in range(10):
        time.sleep(0.05)
        print("task1")
    task1_finished = True


def task2(sleep_time, loop_count):
    global task2_finished
    print("task2:", sleep_time, loop_count)
    for i in range(loop_count):
        time.sleep(sleep_time)
        print("task2")
    task2_finished = True


_thread.start_new_thread(task1, ())
_thread.start_new_thread(task2, (0.05, 10))

while not task1_finished or not task2_finished:
    time.sleep(0.1)

time.sleep(0.5)  # wait for threads to exit

8.1.2.5. timer1.py

import time
from timer import Timer


tim = Timer()


def task1(a, b):
    print('task1:', a, b)


def task2(a, b):
    print('task2:', a, b)


tim.call_after_ms(1000, task1, (1, 2))
tim.call_after_ms(2000, task2, (3, 4))

while len(tim.tasks) > 0:
    time.sleep(1)

tim.stop()
time.sleep(1)