socket server 、TCP、UDP
1、socket 套接字
Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口,我们无需深入理解tcp/udp协议,socket已经为我们封装好了,我们只需要遵循socket的规定去编程,写出的程序自然就是遵循tcp/udp标准的。
在TCP/IP五层协议里,除了应用层的行为需要自定义,其余四层(传输层、网络层、数据链路层、物理层)都已经封装成socket,只需要进行调用接口就好了。
2、使用
import socket
# 创建socket
ms = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.socket()
的第一个参数是套接字家族的类型,有两种
- 基于文件类型的套接字家族:AF_UNIX
- 基于网络类型的套接字家族:AF_INET
另外,还有AF_INET6被用于ipv6。
socket.socket()
的第二个参数是使用什么协议,主要就两种
- 流式,TCP协议: socket.SOCK_STREAM
- 数据报文,UDP协议:socket.SOCK_DGRAM
2.1 服务端套接字函数
ms.bind()
绑定(主机,端口号)到套接字
ms.listen()
开始TCP监听
ms.accept()
被动接受TCP客户的连接,(阻塞式)等待连接的到来
2.2 客户端套接字函数
ms.connect()
主动初始化TCP服务器连接
ms.connect_ex()
connect()函数的扩展版本,出错时返回出错码,而不是抛出异常
2.3 公共的函数
ms.recv()
接收TCP数据
ms.send()
发送TCP数据(send在待发送数据量大于己端缓存区剩余空间时,数据丢失,不会发完)
ms.sendall()
发送完整的TCP数据(本质就是循环调用send,sendall在待发送数据量大于己端缓存区剩余空间时,数据不丢失,循环调用send直到发完)
ms.recvfrom()
接收UDP数据
ms.sendto()
发送UDP数据
ms.getpeername()
连接到当前套接字的远端的地址
ms.getsockname()
当前套接字的地址
ms.getsockopt()
返回指定套接字的参数
ms.setsockopt()
设置指定套接字的参数
ms.close()
关闭套接字
2.4 面向锁的函数
ms.setblocking()
设置套接字的阻塞与非阻塞模式
ms.settimeout()
设置阻塞套接字操作的超时时间
ms.gettimeout()
得到阻塞套接字操作的超时时间
2.5 面向文件的函数
ms.fileno()
套接字的文件描述符
ms.makefile()
创建一个与该套接字相关的文件
3、基于TCP的套接字
一个demo
服务端代码
import socket
# 创建socket
ms = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定ip和端口
ms.bind(('127.0.0.1', 8080))
# 监听 backlog=5:半连接池的大小
ms.listen(5)
# 等待,获取一个连接
conn, client_addr=ms.accept()
print('conn', conn)
print('客户端的ip和端口:', client_addr)
# 收消息
data = conn.recv(1024) # 最大接收量 1024字节
print(data.decode('utf-8'))
# 发消息
conn.send('服务端发的消息'.encode('utf-8'))
# 关闭连接
conn.close()
ms.close()
客户端代码
import socket
# 创建socket
ms = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 客户端发请求给服务端
ms.connect(('127.0.0.1', 8080))
ms.send('客户端发送的消息'.encode('utf-8'))
data = ms.recv(1024)
print(data.decode('utf-8'))
ms.close()
在没有并发的情况下的修复版,可以一直让服务器保持运行
############################################ 服务端代码
import socket
# 创建socket
ms = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定ip和端口
ms.bind(('127.0.0.1', 8080))
ms.listen(5)
# 链接循环
while True:
# 等待,获取一个连接
conn, client_addr = ms.accept()
print('客户端的ip和端口:', client_addr)
# 收消息
while True:
try:
data = conn.recv(1024) # 最大接收量 1024字节
# 在unix系统下如果收到空代表 客户端非法断开链接
if len(data) == 0:
break
print(data.decode('utf-8'))
# 发消息
conn.send(data.upper())
except Exception:
# windows
break
# 关闭连接
conn.close()
ms.close()
############################################ 客户端代码
import socket
# 创建socket
ms = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 客户端发请求给服务端
ms.connect(('127.0.0.1', 8080))
while True:
txt = input('请输入内容').strip()
if len(txt) == 0: continue
ms.send(txt.encode('utf-8'))
data = ms.recv(1024)
print(data.decode('utf-8'))
ms.close()
4、基于UDP
# 服务端
import socket
# 创建socket
ms = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 绑定ip和端口
ms.bind(('127.0.0.1', 8080))
while True:
# 收消息
data, client_addr = ms.recvfrom(1024)
# 发消息
ms.sendto(data.upper(), client_addr)
# 客户端
import socket
# 创建socket
ms = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
#发消息
msg = input('>>>').strip()
if msg == 'quit':
break
ms.sendto(msg.encode('utf-8'), ('127.0.0.1', 8080))
# 收消息
data, serverAddr = ms.recvfrom(1024)
print(data.decode('utf-8'))
ms.close()
5、TCP黏包问题
当客户端接收服务端返回的内容的时候,返回的内容太多,客户端一次性读取不完全,遗留部分数据在缓存内,又因为TCP是流失协议,所以会在下次执行命令的时候会读取到上次遗留的数据。
- TCP协议黏包的原因:因为TCP为了更高效的传输,使用了
Nagle
算法,将多次间隔较小且数据量小的数据合并成一个大的数据块进行封包,这样接收端就很难分辨出来了,需要有科学的拆包机制。也就是说面向流的通信是无消息保护边界的。
# 服务端代码
import subprocess
from socket import *
server = socket(AF_INET, SOCK_STREAM)
# 重用端口,避免重启服务的时候,由于原服务有等待状态的tcp链接导致端口被占用
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)
while True:
conn, client = server.accept()
while True:
try:
res = conn.recv(1024)
if len(res) == 0: break
# 命令执行的结果
obj = subprocess.Popen(res.decode('utf-8'),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 成功的结果
stdout_res = obj.stdout.read()
# 失败的结果
stuerr_res = obj.stderr.read()
conn.send(stdout_res + stuerr_res)
except Exception:
break
conn.close()
# 客户端
# 引入socket模块下所有的方法,避免socket.xxx
from socket import *
client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8080))
while True:
msg = input('请输入》》》').strip()
if len(msg) == 0: continue
client.send(msg.encode('utf-8'))
# 在这里接收,如果返回的内容太大,会发生`粘包`问题,即:上次返回结果太多,只读取了前1024字节的内容,余下内容会随着下次命令读出,因为返回来的数据是数据流,都在内存中,只能从前往后的读出来
# 这个地方设置的大小不能无限大,不可超过缓存区
data = client.recv(1024)
# windows 系统使用 gbk
print(data.decode('utf-8'))
当用户输入一些查询命令,返回结果比较大的时候ps aux / ifconfig
之类的就会触发
解决方案思路
- 因为需要一次性把数据接受干净,下次执行命令才不会出现粘包问题,所以我们需要知道数据总的大小 total_size
- 因为一次最好只取1024字节,所以需要循环的去取数据,每次取出来的数据的大小加起来 recv_size
- 当recv_size == total_size的时候,说明数据就取完了,不会发生粘包问题了。
根据思路我们可以自定义一个协议,规定头信息是多少个字节,然后服务器发送给客户端,客户端首先先解析规定个数的数据,然后根据接收到的数据的信息来确定数据是有多长,从而得到total_size,然后累加1024,直到等于total_size
# 服务端
# 其他代码。。。。
import struct
total_size = len(stdout_res) + len(stuerr_res)
# struct模块可以将一种数据打包成固定长度, i 是int
header = struct.pack('i', total_size)
conn.send(header)
conn.send(stdout_res + stuerr_res)
# 客户端
from socket import *
import struct
client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8080))
while True:
msg = input('请输入》》》').strip()
if len(msg) == 0: continue
client.send(msg.encode('utf-8'))
# 在这里接收,如果返回的内容太大,会发生`粘包`问题,即:上次返回结果太多,只读取了前1024字节的内容,余下内容会随着下次命令读出,因为返回来的数据是数据流,都在内存中,只能从前往后的读出来
# 自定义了协议头,前4个是对数据的描述,内容是数据的大小,就是total_size
header = client.recv(4)
total_size = struct.unpack('i', header)[0]
recv_size = 0
cmd_res = b''
while recv_size < total_size:
recv_data = client.recv(1024)
recv_size = len(recv_data)
cmd_res += recv_data
# windows 系统使用 gbk
print(cmd_res.decode('utf-8'))
这样就通过自定义协议头来解决了黏包问题。
完整代码
# 服务端代码
import subprocess
import struct
import json
from socket import *
server = socket(AF_INET, SOCK_STREAM)
# 重用端口,避免重启服务的时候,由于原服务有等待状态的tcp链接导致端口被占用
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)
print('服务启动,等待连接...')
while True:
conn, client = server.accept()
while True:
try:
res = conn.recv(1024)
if len(res) == 0: break
print('收到指令:', res.decode('utf-8'))
# 命令执行的结果
obj = subprocess.Popen(res.decode('utf-8'),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 成功的结果
stdout_res = obj.stdout.read()
# 失败的结果
stuerr_res = obj.stderr.read()
total_size = len(stdout_res) + len(stuerr_res)
# 制作头信息
header_dict = {
"filename": 'a.txt',
"total_size": total_size,
"md5": "h21h3213yu12y321h3h"
}
json_str = json.dumps(header_dict)
json_byt = json_str.encode('utf-8')
# 获取头信息的长度发过去
header_json_size = struct.pack('i', len(json_byt))
conn.send(header_json_size)
#发头信息
conn.send(json_byt)
# 发数据
conn.send(stdout_res + stuerr_res)
print('执行完毕,返回结果。')
except Exception:
break
conn.close()
这样修改完之后,客户端接收需要先接收4个字节的数据,获取到服务端的头信息的长度,然后再根据长度获取头信息,然后再根据头信息的内容获取数据。
修改一下客户端的代码
# 客户端代码
from socket import *
import struct
import json
client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8080))
while True:
msg = input('请输入》》》').strip()
if len(msg) == 0: continue
if msg == 'edit': break
client.send(msg.encode('utf-8'))
# 获取4位长度的header头信息
header = client.recv(4)
# 获取真实长度数据
json_len = struct.unpack('i', header)[0]
# 获取真实的头信息
json_byt = client.recv(json_len)
# 转成字符串
json_str = json_byt.decode('utf-8')
# json转成对象
json_dic = json.loads(json_str)
total_size = json_dic['total_size']
recv_size = 0
cmd_res = b''
while recv_size < total_size:
recv_data = client.recv(1024)
recv_size = len(recv_data)
cmd_res += recv_data
# windows 系统使用 gbk
print(cmd_res.decode('utf-8'))
6、并发
使用socketserver实现并发
import socketserver
class MySocketServerHandler(socketserver.BaseRequestHandler):
def handle(self):
# 如果是tcp协议 self.request 就是连接对象
print(self.request)
# 客户ip
print(self.client_address)
s = socketserver.ThreadingTCPServer(('127.0.0.1', 8080), MySocketServerHandler)
s.serve_forever()
这样就搭建好了基础的服务,建立起了链接。
s.serve_forever()相当于一个死循环取连接。
# s.serve_forever()
# 相当于下面的代码,循环取出连接对象
while True:
conn, client = server.accept()
每次建立一个连接都会启动一个线程,把链接对象给线程进行服务
6.1 基于TCP的实现
服务端
import socketserver
class MySocketServerHandler(socketserver.BaseRequestHandler):
def handle(self):
# 如果是tcp协议 self.request 就是连接对象
print(self.request)
# 客户ip
print(self.client_address)
while True:
try:
msg = self.request.recv(1024)
if len(msg) == 0:
break
self.request.send(msg.upper())
except Exception:
break
self.request.close()
s = socketserver.ThreadingTCPServer(('127.0.0.1', 8080), MySocketServerHandler)
s.serve_forever()
客户端,客户端文件可以多复制几个,代码一样。
from socket import *
client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8080))
while True:
msg = input('请输入》》》').strip()
if len(msg) == 0:
continue
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
6.2 基于UDP的实现
客户端都是一样的,服务端代码改变一些
import socketserver
class MySocketServerHandlerUdp(socketserver.BaseRequestHandler):
def handle(self):
# 如果是udp协议 self.request是一个元组,第一个数据是客户端发过来的数据,第二个是客户端的信息
client_data = self.request[0]
server = self.request[1]
client_address = self.client_address
# 发送
server.sendto(client_data.upper(), client_address)
s = socketserver.ThreadingTCPServer(('127.0.0.1', 8080), MySocketServerHandlerUdp)
# 在udp协议下,这里只负责接收消息
s.serve_forever()