8.1. _thread 模块 API 文档

8.1.1. API

def start_new_thread(function:any,args_:any):...
def stack_size(*size)->int:...

8.1.2. Examples

8.1.2.1. test2.py

import time
import _thread

finished = False

def test_thread(arg):
    global finished
    for i in range(3):
        print(i)
        time.sleep(0.1)
    finished = True
    print('test_thread arg:', arg)
    assert arg == 'test'

# 开启线程 获取数据
_thread.start_new_thread(test_thread, ('test'))
while not finished:
    time.sleep(0.1)
time.sleep(0.1)

8.1.2.2. thread_self.py

import _thread
import time


class Test:
    _val = 1

    def __init__(self):
        self._val = 2
        _thread.start_new_thread(self.init, ())

    def init(self):
        print('self._val:', self._val)
        self._val = 3

test = Test()
while test._val != 3:
    time.sleep(0.1)

time.sleep(0.5)

8.1.2.3. test1.py

import _thread
import time

task1_finished = False
task2_finished = False


def task1():
    global task1_finished
    print("task1")
    for i in range(10):
        time.sleep(0.05)
        print("task1")
    task1_finished = True


def task2(sleep_time, loop_count):
    global task2_finished
    print("task2:", sleep_time, loop_count)
    for i in range(loop_count):
        time.sleep(sleep_time)
        print("task2")
    task2_finished = True


_thread.start_new_thread(task1, ())
_thread.start_new_thread(task2, (0.05, 10))

while not task1_finished or not task2_finished:
    time.sleep(0.1)

time.sleep(0.5)  # wait for threads to exit