8.49. socket 模块 API 文档

8.49.1. API

8.49.1.1. class socket(_socket.socket):

def __init__(self,*vars):...
def bind(self,host_port):...
def listen(self,num):...
def accept(self):...
def send(self,data):...
def close(self):...
def connect(self,host_port):...
def recv(self,num):...
def setblocking(self,sta):...
def gethostname():...
def gethostbyname(host):...

8.49.2. Examples

8.49.2.1. socket_thread.py

import socket
import _thread
import random
import time

test_finished = False
server_started = False


def socket_server_task(host, port):
    """
    socket 服务器任务
    :return:
    """
    print("socket server start:", host, port)
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((host, port))
    s.listen(5)
    print("socket server waiting accept")
    global server_started
    server_started = True
    accept, addr = s.accept()
    print("socket server accepted at", addr)
    while True:
        try:
            data = accept.recv(1024)
            print('socket server recv:', data.decode())
            accept.send(data)
        except Exception:
            print('socket server closing accept')
            accept.close()
            break
    print("socket server closing")
    s.close()
    global test_finished
    test_finished = True


def socket_server_init(host='0.0.0.0', port=36500):
    _thread.start_new_thread(socket_server_task, (host, port))


def socket_client_task(host, port):
    print("socket client start:", host, port)
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, port))
    client.send("hello".encode())
    recv = client.recv(1024).decode()
    print("client recv:", recv)
    client.close()


def socket_server_test(host='0.0.0.0', port=36500):
    _thread.start_new_thread(socket_client_task, (host, port))


test_port = random.randint(10000, 65535)
socket_server_init(port=test_port)
while not server_started:
    time.sleep(0.1)
socket_server_test(port=test_port)
while not test_finished:
    time.sleep(0.1)

8.49.2.2. socket_DNS.py

import socket

# 创建一个 socket 对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 使用域名和端口来进行连接
s.connect(("dns.google", 53))

# 关闭连接
s.close()

print("PASS")

8.49.2.3. socket_GET.py

import socket

def test_socket_GET():
    # 创建一个socket对象
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 获取服务器的IP地址
    # server_ip = socket.gethostbyname('baidu.com')
    server_port = 80

    # 连接到服务器
    s.connect(('pikapython.com', server_port))
    # 创建HTTP GET请求
    request = 'GET / HTTP/1.1\r\nHost: pikascript.com\r\n\r\n'
    # print('request:', request)
    s.send(request.encode())

    # 接收服务器的响应
    response = ''
    while True:
        try:
            recv = s.recv(1024)
        except:
            break
        if not recv:
            break
        response += recv.decode()
    s.close()
    return response

for i in range(10):
    response = test_socket_GET()
    res = 'HTTP/1.1' in response
    if res == True:
        break
    print('test_socket_GET() failed, retrying...')
    print('response', response)

8.49.2.4. server_client.py

import socket
import random

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = "127.0.0.1"
port = 9999 + random.randint(0, 1000)
print("port:", port)
server.bind((host, port))
server.listen(5)

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))

accept, addr = server.accept()

print("recv from client: %s" % str(addr))

client.send("send test from client".encode())
print("server recv:", accept.recv(1024).decode())

accept.send("send test from server".encode())
print("client recv:", client.recv(1024).decode())

accept.close()
client.close()
server.close()

8.49.2.5. gethostname.py

import socket

hostname = socket.gethostname()
print(hostname)

8.49.2.6. socket_json.py

import socket
import _thread
import random
import time
import json

test_finished = False
server_started = False
test_data = {
    'result': {
        'a_a': {
            'value': 0.290000, 'desc': 'A 相电流'
        }
    }, 'code': 0
}


def socket_server_task(host, port):
    """
    socket 服务器任务
    :return:
    """
    print("socket server start:", host, port)
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind((host, port))
    s.listen(5)
    global server_started
    server_started = True
    while True:
        try:
            print("socket server waiting accept")
            accept, addr = s.accept()
            print("socket server accepted at", addr)
            while True:
                data = accept.recv(1024)
                print('socket server recv:', data.decode())
                # accept.send(data)
                accept.send(json.dumps(test_data))
        except Exception:
            print('socket server closing accept')
            accept.close()
            break
    print("socket server closing")
    s.close()
    global test_finished
    test_finished = True


def socket_server_init(host='0.0.0.0', port=36500):
    _thread.start_new_thread(socket_server_task, (host, port))


def socket_client_task(host, port):
    print("socket client start:", host, port)
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, port))
    for i in range(2):
        client.send("hello".encode())
        recv = client.recv(1024).decode()
        print("client recv:", recv)
    client.close()


def socket_server_test(host='0.0.0.0', port=36500):
    _thread.start_new_thread(socket_client_task, (host, port))


test_port = random.randint(10000, 65535)
socket_server_init(port=test_port)
while not server_started:
    time.sleep(0.1)
socket_server_test(port=test_port)
while not test_finished:
    time.sleep(0.1)

8.49.2.7. socket_download.py

import socket
import time


def http_download_file(url: str, file_path: str, buff_size=1024):
    # Parse the URL to extract the host and path
    if not url.startswith('http://'):
        print("Only HTTP protocol is supported")
        return -1
    host = url.split("//")[1].split('/')[0]
    path = url[url.find(host) + len(host):]
    if len(path) == 0:
        path = "/"

    print("Host:", host)
    print("Path:", path)
    # Establish a socket connection
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((host, 80))
        print("Connected to:", host)
    except Exception as e:
        print("Connection error:", e)
        return -1

    # Send HTTP GET request
    get_request = "GET " + path + " HTTP/1.1\r\nHost: " + \
        host + "\r\nConnection: close\r\n\r\n"
    sock.send(get_request.encode())

    # Open file to write
    f = open(file_path, 'wb')  # Manually open the file
    data_received = False
    head_received = False
    while True:
        try:
            data = sock.recv(buff_size)
        except:
            print('End of data')
            break
        print("[Data received:", len(data), ']')
        if head_received:
            sz = f.write(data)
            # print("Data written:", sz)
        # print(data.decode())
        if len(data) == 0:
            print("Length of data:", len(data))
            if not data_received:
                print("No data received.")
                f.close()
                return -1
            print("No more data to receive")
            break
        data_received = True

        # Handle the end of the HTTP header if it's still present
        if head_received == False:
            if b'\r\n\r\n' in data:
                # print("Header received", data)
                head_received = True
                aplited = data.split(b'\r\n\r\n', 1)
                if len(aplited) == 2:
                    sz = f.write(aplited[1])

    # Close file and socket manually
    f.close()
    sock.close()
    print("Download completed")
    return 0


assert http_download_file("http://pikapython.com", "pikapython.html") == 0
print("PASS")