8.49. socket 模块 API 文档

8.49.1. API

8.49.1.1. class socket(_socket.socket):

def __init__(self,*vars):...
def bind(self,host_port):...
def listen(self,num):...
def accept(self):...
def send(self,data):...
def close(self):...
def connect(self,host_port):...
def recv(self,num):...
def setblocking(self,sta):...
def gethostname():...
def gethostbyname(host):...

8.49.2. Examples

8.49.2.1. socket_thread.py

import socket
import _thread
import random
import time

test_finished = False
server_started = False


def socket_server_task(host, port):
    """
    socket 服务器任务
    :return:
    """
    print("socket server start:", host, port)
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((host, port))
    s.listen(5)
    print("socket server waiting accept")
    global server_started
    server_started = True
    accept, addr = s.accept()
    print("socket server accepted at", addr)
    while True:
        try:
            data = accept.recv(1024)
            print('socket server recv:', data.decode())
            accept.send(data)
        except Exception:
            print('socket server closing accept')
            accept.close()
            break
    print("socket server closing")
    s.close()
    global test_finished
    test_finished = True


def socket_server_init(host='0.0.0.0', port=36500):
    _thread.start_new_thread(socket_server_task, (host, port))


def socket_client_task(host, port):
    print("socket client start:", host, port)
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, port))
    client.send("hello".encode())
    recv = client.recv(1024).decode()
    print("client recv:", recv)
    client.close()


def socket_server_test(host='0.0.0.0', port=36500):
    _thread.start_new_thread(socket_client_task, (host, port))


test_port = random.randint(10000, 65535)
socket_server_init(port=test_port)
while not server_started:
    time.sleep(0.1)
socket_server_test(port=test_port)
while not test_finished:
    time.sleep(0.1)

8.49.2.2. socket_DNS.py

import socket

# 创建一个 socket 对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 使用域名和端口来进行连接
s.connect(("dns.google", 53))

# 关闭连接
s.close()

print("PASS")

8.49.2.3. socket_GET.py

import socket

def test_socket_GET():
    # 创建一个socket对象
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 获取服务器的IP地址
    # server_ip = socket.gethostbyname('baidu.com')
    server_port = 80

    # 连接到服务器
    s.connect(('pikapython.com', server_port))
    # 创建HTTP GET请求
    request = 'GET / HTTP/1.1\r\nHost: pikascript.com\r\n\r\n'
    # print('request:', request)
    s.send(request.encode())

    # 接收服务器的响应
    response = ''
    while True:
        try:
            recv = s.recv(1024)
        except:
            break
        if not recv:
            break
        response += recv.decode()
    s.close()
    return response

for i in range(10):
    response = test_socket_GET()
    res = 'HTTP/1.1' in response
    if res == True:
        break
    print('test_socket_GET() failed, retrying...')
    print('response', response)

8.49.2.4. server_client.py

import socket
import random

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = "127.0.0.1"
port = 9999 + random.randint(0, 1000)
print("port:", port)
server.bind((host, port))
server.listen(5)

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))

accept, addr = server.accept()

print("recv from client: %s" % str(addr))

client.send("send test from client".encode())
print("server recv:", accept.recv(1024).decode())

accept.send("send test from server".encode())
print("client recv:", client.recv(1024).decode())

accept.close()
client.close()
server.close()

8.49.2.5. gethostname.py

import socket

hostname = socket.gethostname()
print(hostname)

8.49.2.6. socket_json.py

import socket
import _thread
import random
import time
import json

test_finished = False
server_started = False
test_data = {
    'result': {
        'a_a': {
            'value': 0.290000, 'desc': 'A 相电流'
        }
    }, 'code': 0
}


def socket_server_task(host, port):
    """
    socket 服务器任务
    :return:
    """
    print("socket server start:", host, port)
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((host, port))
    s.listen(5)
    global server_started
    server_started = True
    while True:
        try:
            print("socket server waiting accept")
            accept, addr = s.accept()
            print("socket server accepted at", addr)
            while True:
                data = accept.recv(1024)
                print('socket server recv:', data.decode())
                # accept.send(data)
                accept.send(json.dumps(test_data))
        except Exception:
            print('socket server closing accept')
            accept.close()
            break
    print("socket server closing")
    s.close()
    global test_finished
    test_finished = True


def socket_server_init(host='0.0.0.0', port=36500):
    _thread.start_new_thread(socket_server_task, (host, port))


def socket_client_task(host, port):
    print("socket client start:", host, port)
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, port))
    for i in range(2):
        client.send("hello".encode())
        recv = client.recv(1024).decode()
        print("client recv:", recv)
    client.close()


def socket_server_test(host='0.0.0.0', port=36500):
    _thread.start_new_thread(socket_client_task, (host, port))


test_port = random.randint(10000, 65535)
socket_server_init(port=test_port)
while not server_started:
    time.sleep(0.1)
socket_server_test(port=test_port)
while not test_finished:
    time.sleep(0.1)