8.49. socket 模块 API 文档
8.49.1. API
8.49.1.1. class socket(_socket.socket):
def __init__(self,*vars):...
def bind(self,host_port):...
def listen(self,num):...
def accept(self):...
def send(self,data):...
def close(self):...
def connect(self,host_port):...
def recv(self,num):...
def setblocking(self,sta):...
def gethostname():...
def gethostbyname(host):...
8.49.2. Examples
8.49.2.1. socket_thread.py
import socket
import _thread
import random
import time
test_finished = False
server_started = False
def socket_server_task(host, port):
"""
socket 服务器任务
:return:
"""
print("socket server start:", host, port)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(5)
print("socket server waiting accept")
global server_started
server_started = True
accept, addr = s.accept()
print("socket server accepted at", addr)
while True:
try:
data = accept.recv(1024)
print('socket server recv:', data.decode())
accept.send(data)
except Exception:
print('socket server closing accept')
accept.close()
break
print("socket server closing")
s.close()
global test_finished
test_finished = True
def socket_server_init(host='0.0.0.0', port=36500):
_thread.start_new_thread(socket_server_task, (host, port))
def socket_client_task(host, port):
print("socket client start:", host, port)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
client.send("hello".encode())
recv = client.recv(1024).decode()
print("client recv:", recv)
client.close()
def socket_server_test(host='0.0.0.0', port=36500):
_thread.start_new_thread(socket_client_task, (host, port))
test_port = random.randint(10000, 65535)
socket_server_init(port=test_port)
while not server_started:
time.sleep(0.1)
socket_server_test(port=test_port)
while not test_finished:
time.sleep(0.1)
8.49.2.2. socket_DNS.py
import socket
# 创建一个 socket 对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 使用域名和端口来进行连接
s.connect(("dns.google", 53))
# 关闭连接
s.close()
print("PASS")
8.49.2.3. socket_GET.py
import socket
def test_socket_GET():
# 创建一个socket对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 获取服务器的IP地址
# server_ip = socket.gethostbyname('baidu.com')
server_port = 80
# 连接到服务器
s.connect(('pikapython.com', server_port))
# 创建HTTP GET请求
request = 'GET / HTTP/1.1\r\nHost: pikascript.com\r\n\r\n'
# print('request:', request)
s.send(request.encode())
# 接收服务器的响应
response = ''
while True:
try:
recv = s.recv(1024)
except:
break
if not recv:
break
response += recv.decode()
s.close()
return response
for i in range(10):
response = test_socket_GET()
res = 'HTTP/1.1' in response
if res == True:
break
print('test_socket_GET() failed, retrying...')
print('response', response)
8.49.2.4. server_client.py
import socket
import random
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = "127.0.0.1"
port = 9999 + random.randint(0, 1000)
print("port:", port)
server.bind((host, port))
server.listen(5)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
accept, addr = server.accept()
print("recv from client: %s" % str(addr))
client.send("send test from client".encode())
print("server recv:", accept.recv(1024).decode())
accept.send("send test from server".encode())
print("client recv:", client.recv(1024).decode())
accept.close()
client.close()
server.close()
8.49.2.5. gethostname.py
import socket
hostname = socket.gethostname()
print(hostname)
8.49.2.6. socket_json.py
import socket
import _thread
import random
import time
import json
test_finished = False
server_started = False
test_data = {
'result': {
'a_a': {
'value': 0.290000, 'desc': 'A 相电流'
}
}, 'code': 0
}
def socket_server_task(host, port):
"""
socket 服务器任务
:return:
"""
print("socket server start:", host, port)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(5)
global server_started
server_started = True
while True:
try:
print("socket server waiting accept")
accept, addr = s.accept()
print("socket server accepted at", addr)
while True:
data = accept.recv(1024)
print('socket server recv:', data.decode())
# accept.send(data)
accept.send(json.dumps(test_data))
except Exception:
print('socket server closing accept')
accept.close()
break
print("socket server closing")
s.close()
global test_finished
test_finished = True
def socket_server_init(host='0.0.0.0', port=36500):
_thread.start_new_thread(socket_server_task, (host, port))
def socket_client_task(host, port):
print("socket client start:", host, port)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
for i in range(2):
client.send("hello".encode())
recv = client.recv(1024).decode()
print("client recv:", recv)
client.close()
def socket_server_test(host='0.0.0.0', port=36500):
_thread.start_new_thread(socket_client_task, (host, port))
test_port = random.randint(10000, 65535)
socket_server_init(port=test_port)
while not server_started:
time.sleep(0.1)
socket_server_test(port=test_port)
while not test_finished:
time.sleep(0.1)
8.49.2.7. socket_download.py
import socket
import time
def http_download_file(url: str, file_path: str, buff_size=1024):
# Parse the URL to extract the host and path
if not url.startswith('http://'):
print("Only HTTP protocol is supported")
return -1
host = url.split("//")[1].split('/')[0]
path = url[url.find(host) + len(host):]
if len(path) == 0:
path = "/"
print("Host:", host)
print("Path:", path)
# Establish a socket connection
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, 80))
print("Connected to:", host)
except Exception as e:
print("Connection error:", e)
return -1
# Send HTTP GET request
get_request = "GET " + path + " HTTP/1.1\r\nHost: " + \
host + "\r\nConnection: close\r\n\r\n"
sock.send(get_request.encode())
# Open file to write
f = open(file_path, 'wb') # Manually open the file
data_received = False
head_received = False
while True:
try:
data = sock.recv(buff_size)
except:
print('End of data')
break
print("[Data received:", len(data), ']')
if head_received:
sz = f.write(data)
# print("Data written:", sz)
# print(data.decode())
if len(data) == 0:
print("Length of data:", len(data))
if not data_received:
print("No data received.")
f.close()
return -1
print("No more data to receive")
break
data_received = True
# Handle the end of the HTTP header if it's still present
if head_received == False:
if b'\r\n\r\n' in data:
# print("Header received", data)
head_received = True
aplited = data.split(b'\r\n\r\n', 1)
if len(aplited) == 2:
sz = f.write(aplited[1])
# Close file and socket manually
f.close()
sock.close()
print("Download completed")
return 0
assert http_download_file("http://pikapython.com", "pikapython.html") == 0
print("PASS")