Python-TCP编程-UDP编程-SocketServer-IO各种概念及多路复用-asyncio-学习笔记
序
欠4前年的一份笔记 ,献给今后的自己。
网络编程
Socket介绍
Socket套接字
Python中提供socket.py标准库,非常底层的接口库。
Socket是一种通用的网络编程接口,和网络层次没有一一对应的关系。
协议族
AF表示Address Family,用于socket()第一个参数
TCP编程
Socket编程,需要两端,一般来说需要一个服务端、一个客户端,服务端称为Server,客户端称为Client
TCP服务端编程
服务器端编程步骤
- 创建Socket对象
- 绑定IP地址Address和端口Port。bind()方法
IPv4地址为一个二元组(IP地址字符串,Port) - 开始监听,将在指定的IP的端口上监听。listen()方法
- 获取用于传送数据的Socket对象
socket.accept() -> (socket object, address info)
accept方法阻塞等待客户端建立连接,返回一个新的Socket对象和客户端地址的二元组
地址是远程客户端的地址,IPv4中它是一个二元组(clientaddr, port)
1、接收数据 : recv(bufsize[, flags]) 使用缓冲区接收数据
2、发送数据 : send(bytes) 发送数据
问题
两次绑定同一个监听端口会怎么样?
import sockets = socket.socket() # 创建socket对象
s.bind(('127.0.0.1', 9999)) # 一个二元组s.listen() # 开始监听
# 开启一个连接s1, info = s.accept() # 阻塞直到和客户端成功建立连接,返回一个socket对象和客户端地址# 使用缓冲区获取数据data = s1.recv(1024)print(data, info)s1.send(b'magedu.com')# 开启另外一个连接
s2, _ = s.accept()data = s2.recv(1024)s2.send(b'hello python')s.close()
上例accept和recv是阻塞的,主线程经常被阻塞住而不能工作。怎么办?
练习一一写一个群聊程序
需求分析
聊天工具是CS程序,C是每一个客户端,S是服务器端。
服务器应该具有的功能:
启动服务,包括绑定地址和端口,监听
建立连接,能和多个客户端建立连接
接收不同用户的信息
分发,将接收的某个用户的信息转发到已连接的所有客户端
停止服务
记录连接的客户端
代码实现
服务端应该对应一个类
class ChatServer:def __init__(self, ip, port): # 启动服务self.sock = socket.socket()self.addr = (ip, port)def start(self): # 启动监听passdef accept(self): # 多人连接passdef recv(self): # 接收客户端数据passdef stop(self): # 停止服务pass
在此基础上,扩展完成
import logging
import socket
import threading
import datetimelogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999): # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.clients = {} # 客户端def start(self): # 启动监听self.sock.bind(self.addr) # $Bself.sock.listen() # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self): # 多人连接while True:sock, client = self.sock.accept() # 阻塞self.clients[client] = sock # 添加到客户端字典# 准备接收数据,recv是阻塞的,开启新的线程threading.Thread(target=self.recv, args=(sock, client)).start()def recv(self, sock: socket.socket, client): # 接收客户端数据while True:data = sock.recv(1024) # 阻塞到数据到来msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())logging.info(msg)msg = msg.encode()for s in self.clients.values():s.send(msg)def stop(self): # 停止服务for s in self.clients.values():s.close()self.sock.close()cs = ChatServer()
cs.start
在此基础上,扩展完成
import threading
import datetime
import logging
import socketlogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999): # 启动服务logging.info('Connecting to Chat Server')self.sock = socket.socket()self.addr = (ip, port)self.clients = {} # 客户端self.event = threading.Event()def start(self): # 启动监听self.sock.bind(self.addr) # 378self.sock.listen() # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self): # 多人连接while not self.event.is_set():sock, client = self.sock.accept() # BE%self.clients[client] = sock # 添加到客户端字典# 准备接收数据,recv是阻塞的,开启新的线程threading.Thread(target=self.recv, args=(sock, client)).start()def recv(self, sock: socket.socket, client): # 接收客户端数据while not self.event.is_set():data = sock.recv(1024) # 阻塞到数据到来msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())logging.info(msg)msg = msg.encode()for s in self.clients.values():s.send(msg)def stop(self): # 停止服务for s in self.clients.values():s.close()self.sock.close()self.event.set()cs = ChatServer()
cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)break
基本功能完成,但是有问题。使用Event改进。
import logging
import socketimport threading
import datetimelogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999): # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.clients = {} # 客户端self.event = threading.Event()def start(self): # 启动监听self.sock.bind(self.addr) #绑定 self.sock.listen() # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self): # 多人连接while not self.event.is_set():sock, client = self.sock.accept() # BEself.clients[client] = sock # 添加到客户端字典# 准备接收数据,recv是阻塞的,开启新的线程threading.Thread(target=self.recv, args=(sock, client)).start()def recv(self, sock: socket.socket, client): # 接收客户端数据while not self.event.is_set():data = sock.recv(1024) # 阻塞到数据到来msg = "{:%Y/%m/%d %H:%M:%S} {}: {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())logging.info(msg)msg = msg.encode()for s in self.clients.values():s.send(msg)def stop(self): # 停止服务for s in self.clients.values():s.close()self.sock.close()self.event.set()cs = ChatServer()
cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)break
这一版基本能用了,测试通过。但是还有要完善的地方。
例如各种异常的判断,客户端断开连接后字典中的移除客户端数据等。
客户端主动断开带来的问题
服务端知道自己何时断开,如果客户端断开,服务器不知道。
所以,好的做法是,客户端断开发出特殊消息通知服务器端断开连接。但是,如果客户端主动断开,服务端主动发
送一个空消息,超时返回异常,捕获异常并清理连接。
即使 客户端提供了断开命令,也不能保证客户端会使用它断开连接。但是还是要增加这个退出功能。
增加客户端退出命令
import logging
import socketimport threading
import datetimelogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999): # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.clients = {} # 客户端self.event = threading.Event()def start(self): # 启动监听self.sock.bind(self.addr) # 绑定self.sock.listen() # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self): # 多人连接while not self.event.is_set():sock, client = self.sock.accept() # BEself.clients[client] = sock # 添加到客户端字典# 准备接收数据,recv是阻塞的,开启新的线程threading.Thread(target=self.recv, args=(sock, client)).start()def recv(self, sock: socket.socket, client): # 接收客户端数据while not self.event.is_set():data = sock.recv(1024) # 阻塞到数据到来msg = data.decode().strip()# 客户端退出命令if msg == 'quit':self.clients.pop(client)sock.close()logging.info('{} quits'.format(client))breakmsg = "{:%Y/%m/%d %H:%M:%S} {}: {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())logging.info(msg)msg = msg.encode()for s in self.clients.values():s.send(msg)def stop(self): # 停止服务for s in self.clients.values():s.close()self.sock.close()self.event.set()cs = ChatServer()
cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)breaklogging.info(threading.enumerate()) # 用来观察断开后线程的变化
程序还有瑕疵,但是业务功能基本完成了
socket常用方法
MakeFile
socket-makefile(mode=‘r’, buffering=None, *, encoding=None, errors=None, newline=None)
创建一个与该套接字相关连的文件对象,将recv方法看做读方法,将send方法看做写方法。
# 使用makefile简单例子
import socketsockserver = socket.socket()ip = '127.0.0.1'port = 9999addr = (ip, port)sockserver.bind(addr)sockserver.listen()print(' - ' * 30)
s, _ = sockserver.accept()print(' - ' * 30)f = s.makefile(mode='rw')line = f.read(10) # 阻塞等
print(' - ' * 30)
print(line)
f.write('Return your msg: {}'.format(line))
f.flush()
上例不能循环接收消息,修改一下
# 使用makefile简单例子
import socket
import threadingsockserver = socket.socket()ip = '127.0.0.1'port = 9999addr = (ip, port)sockserver.bind(addr)sockserver.listen()print(' - ' * 30)event = threading.Event()def accept(sock: socket.socket, e: threading.Event):s, _ = sock.accept()f = s.makefile(mode='rw')while True:line = f.readline()print(line)if line.strip() == "quit": # 注意要发quit\nbreakf.write('Return your msg: (}'.format(line))f.flush()f.close()sock.close()e.wait(3)t = threading.Thread(target=accept, args=(sockserver, event))
t.start()
t.join()
print(sockserver)
makefile练习
使用makefile改写群聊类
import logging
import socket
import threading
import datetime
import socketlogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999): # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.clients = {} # 客户端self.event = threading.Event()def start(self): # 启动监听self.sock.bind(self.addr) # 绑定self.sock.listen() # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self): # 多人连接while not self.event.is_set():sock, client = self.sock.accept() # 阻塞# 准备接收数据,recv是阻塞的,开启新的线程f = sock.makefile('rw') # 支持读写self.clients[client] = f # 添加到客户端字典threading.Thread(target=self.recv, args=(f, client), name='recv').start()def recv(self, f, client): # 接收客户端数据while not self.event.is_set():data = f.readline() # 阻塞到换行符msg = data.strip()# 客户端退出命令if msg == 'quit':self.clients.pop(client)f.close()logging.info('{} quits'.format(client))breakmsg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data)logging.info(msg)for s in self.clients.values():s.write(msg)s.flush()def stop(self): # 停止服务for s in self.clients.values():s.close()self.sock.close()self.event.set()cs = ChatServer()
cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)breaklogging.info(threading.enumerate()) # 用来观察断开后线程的变化
上例完成了基本功能,但是,如果客户端主动断开,或者readline出现异常,就不会从clients中移除作废的
socket。可以使用异常处理解决这个问题。
ChatServer实验用完整代码
注意,这个代码为实验用,代码中瑕疵还有很多。Socket太底层了,实际开发中很少使用这么底层的接口。
增加一些异常处理。
import logging
import socket
import threading
import datetime
import socketlogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999): # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.clients = {} # 客户端self.event = threading.Event()def start(self): # 启动监听self.sock.bind(self.addr) # 绑定self.sock.listen() # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self): # 多人连接while not self.event.is_set():sock, client = self.sock.accept() # 阻塞# 准备接收数据,recv是阻塞的,开启新的线程f = sock.makefile('rw') # 支持读写self.clients[client] = f # 添加到客户端字典threading.Thread(target=self.recv, args=(f, client), name='recv').start()def recv(self, f, client): # 接收客户端数据while not self.event.is_set():try:data = f.readline() # 阻塞到换行符except Exception as e:logging.error(e)data = 'quit'msg = data.strip()# 客户端退出命令if msg == 'quit':self.clients.pop(client)f.close()logging.info('{} quits'.format(client))breakmsg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data)logging.info(msg)for s in self.clients.values():s.write(msg)s.flush()def stop(self): # 停止服务for s in self.clients.values():s.close()self.sock.close()self.event.set()def main():cs = ChatServer()cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)breaklogging.info(threading.enumerate()) # 用来观察断开后线程的变化if __name__ == '__main__':main()
TCP客户端编程
客户端编程步骤
- 创建Socket对象
- 连接到远端服务端的ip和port,connect()方法
- 传输数据
使用send、recv方法发送、接收数据 - 关闭连接,释放资源
import socketclient = socket.socket()
ipaddr = ('127.0.0.1', 9999)
client.connect(ipaddr) # 直接连接服务器
client.send(b'abcd\n')
data = client.recv(1024) # 阻塞等待
print(data)
client.close()
开始编写客户端类
import logging
import socket
import threading
import datetime
import socketlogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999): # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.clients = {} # 客户端self.event = threading.Event()def start(self): # 启动监听self.sock.bind(self.addr) # 绑定self.sock.listen() # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self): # 多人连接while not self.event.is_set():sock, client = self.sock.accept() # 阻塞# 准备接收数据,recv是阻塞的,开启新的线程f = sock.makefile('rw') # 支持读写self.clients[client] = f # 添加到客户端字典threading.Thread(target=self.recv, args=(f, client), name='recv').start()def recv(self, f, client): # 接收客户端数据while not self.event.is_set():try:data = f.readline() # 阻塞到换行符except Exception as e:logging.error(e)data = 'quit'msg = data.strip()# 客户端退出命令if msg == 'quit':self.clients.pop(client)f.close()logging.info('{} quits'.format(client))breakmsg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data)logging.info(msg)for s in self.clients.values():s.write(msg)s.flush()def stop(self): # 停止服务for s in self.clients.values():s.close()self.sock.close()self.event.set()def main():cs = ChatServer()cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)breaklogging.info(threading.enumerate()) # 用来观察断开后线程的变化if __name__ == '__main__':main()
同样,这样的客户端还是有些问题的,仅用于测试。
UDP编程
测试命令
> netstat -anp udp | find “9988” # windows查找udp是否启动端口
$ echo “123abc” | nc -u 127.0.0.1 9988 # linux下发给服务端数据\
UDP服务端编程
UDP服务端编程流程
- 创建socket对象。socket.SOCK_DGRAM
- 绑定IP和Port,bind0方法
- 传输数据
接收数据,socket.recvfrom(bufsize[, flagsl),获得一个二元组(string, address)
发送数据,socket.sendto(string, address)发给某地址某信息 - 释放资源
import socketserver = socket.socket(type=socket.SOCK_DGRAM)
server.bind(('0.0.0.0', 9999)) # 立即绑定一个udp端口
data = server.recv(1024) # 阻塞等待数据
data = server.recvfrom(1024) # 阻塞等待数据(value,(ip,port))
server.sendto(b'7', ('192.168.142.1', 10000))
server.close()
UDP客户端编程流程
- 创建socket对象。 socket.SOCK_DGRAM
- 发送数据,socket. sendto(string, address)发给某地址某信息
- 接收数据,socket.recvfrom(bufsize[, flags]),获得一个二元组(string, address)
- 释放资源
import socketclient = socket.socket(type=socket.SOCK_DGRAM)
raddr = ('192.168.142.1', 10000)
client.connect(raddr)
client.sendto(b'8', raddr)
client.send(b'9')
data = client.recvfrom(1024) # 阻塞等待数据(value,(ip,port))
data = client.recv(1024) # 阻塞等待数据
client.close()
注意:UDP是无连接协议,所以可以只有任何一端,例如客户端数据发往服务端,服务端存在与否无所谓。
UDP编程中bind、connect、send、sendto、recv、recvfrom方法使用
UDP的socket对象创建后,是没有占用本地地址和端口的。
练习——UDP版群聊
UDP版群聊服务端代码
import socket# 服务端类的基本架构
class ChatUDPServer:def __init__(self, ip='127.0.0.1', port=9999):self.addr = (ip, port)self.sock = socket.socket(type=socket.SOCK_DGRAM)def start(self):self.sock.bind(self.addr) # 立即绑定self.sock.recvfrom(1024) # 阻塞接收数据def stop(self):self.sock.close()
在上面代码的基础之上扩充
import socket
import threading
import datetime
import logging
from tkinter.font import namesFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatUDPServer:def __init__(self, ip='127.0.0.1', port=9999):self.addr = (ip, port)self.sock = socket.socket(type=socket.SOCK_DGRAM)self.clients = set() # 记录客户端self.event = threading.Event()def start(self):self.sock.bind(self.addr) # 立即绑定# 启动线程threading.Thread(target=self.recv, name='recv').start()def recv(self):while not self.event.is_set():data, raddr = self.sock.recvfrom(1024) # 阻塞接收数据if data.strip() == b'quit':# 有可能发来数据的不在clients中if raddr in self.clients:self.clients.remove(raddr)logging.info('{} leaving'.format(raddr))continueself.clients.add(raddr)msg = '{}. from {}: {}'.format(data.decode(), *raddr)logging.info(msg)msg = msg.encode()for c in self.clients:self.sock.sendto(msg, c) # 不保证对方能够收到def stop(self):for c in self.clients:self.sock.sendto(b'bye', c)self.sock.close()self.event.set()def main():cs = ChatUDPServer()cs.start()while True:cmd = input(">>>")if cmd.strip() == 'quit':cs.stop()breaklogging.info(threading.enumerate())logging.info(cs.clients)if __name__ == '__main__':main()
UDP群聊客户端代码
import threading
import socket
import loggingFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message) s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatUdpClient:def __init__(self, rip='127.0.0.1', rport=9999):self.sock = socket.socket(type=socket.SOCK_DGRAM)self.raddr = (rip, rport)self.event = threading.Event()def start(self):self.sock.connect(self.raddr) # 占用本地地址和端口,设置远端地址和端口threading.Thread(target=self.recv, name='recv').start()def recv(self):while not self.event.is_set():data, raddr = self.sock.recvfrom(1024)msg = '{}. from {}:{}'.format(data.decode(), *raddr)logging.info(msg)def send(self, msg: str):self.sock.sendto(msg.encode(), self.raddr)def stop(self):self.sock.close()self.event.set()def main():cc1 = ChatUdpClient()cc2 = ChatUdpClient()cc1.start()cc2.start()print(cc1.sock)print(cc2.sock)while True:cmd = input('Input your words >>')if cmd.strip() == 'quit':cc1.stop()cc2.stop()breakcc1.send(cmd)cc2.send(cmd)if __name__ == '__main__':main()
上面的例子并不完善,如果客户端断开了,服务端不知道。每一个服务端还需要对所有客户端发送数据,包括已经断开的客户端。
代码改进
服务端代码改进
加一个ack机制和心跳heartbeat。心跳,就是一端定时发往另一端的信息,一般每次数据越少越好。心跳时间间
隔约定好就行。ack即响应,一端收到另一端的消息后返回的信息。
心跳机制
1、一般来说是客户端定时发往服务端的,服务端并不需要ack回复客户端,只需要记录该客户端还活着就行了。
2、如果是服务端定时发往客户端的,一般需要客户端ack响应来表示活着,如果没有收到ack的客户端,服务端
移除其信息。这种实现较为复杂,用的较少。
3、也可以双向都发心跳的,用的更少。
在服务器端代码中使用第一种机制改进
import socket
import threading
import datetime
import logging
from tkinter.font import namesFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatUDPServer:def __init__(self, ip='127.0.0.1', port=9999,interval=10):self.addr = (ip, port)self.sock = socket.socket(type=socket.SOCK_DGRAM)self.clients = set() # 记录客户端self.event = threading.Event()self.interval = interval # 默认10秒,超时就要移除对应的客户端def start(self):self.sock.bind(self.addr) # 立即绑定# 启动线程threading.Thread(target=self.recv, name='recv').start()def recv(self):while not self.event.is_set():localset = set() # 清理超时data, raddr = self.sock.recvfrom(1024) # 阻塞接收数据current = datetime.datetime.now().timestamp() # floatif data.strip() == b'^hb^': # 心跳信息 ifself.clients[raddr] = currentcontinueelif data.strip() == b'quit':# 有可能发来数据的不在clients中self.clients.pop(raddr, None)logging.info('{} leaving'.format(raddr))continue# 有信息来就更新时间# 什么时候比较心跳时间呢?发送信息的时候,反正要遍历一遍self.clients[raddr] = currentmsg = '{}. from {}:{}'.format(data.decode(), *raddr)logging.info(msg)msg = msg.encode()for c, stamp in self.clients.items():if current - stamp > self.interval:localset.add(c)else:self.sock.sendto(msg, c) # 不保证对方能够收到for c in localset:self.clients.pop(c)def stop(self):for c in self.clients:self.sock.sendto(b'bye', c)self.sock.close()self.event.set()def main():cs = ChatUDPServer()cs.start()while True:cmd = input(">>>")if cmd.strip() == 'quit':cs.stop()breaklogging.info(threading.enumerate())logging.info(cs.clients)if __name__ == '__main__':main()
客户端代码改进
增加定时发送心跳代码
import threading
import socket
import loggingfrom pydantic.v1 import parse_file_asFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatUdpClient:def __init__(self, rip='127.0.0.1', rport=9999):self.sock = socket.socket(type=socket.SOCK_DGRAM)self.raddr = (rip, rport)self.event = threading.Event()def start(self):self.sock.connect(self.raddr) # 占用本地地址和端口,设置远端地址和端口threading.Thread(target=self._sendhb, name='heartbeat', daemon=True).start()threading.Thread(target=self.recv, name='recv').start()def _sendhb(self):# 心跳while not self.event.wait(5):self.send(' ^hb^')def recv(self):while not self.event.is_set():data, raddr = self.sock.recvfrom(1024)msg = '{}. from {}:{}'.format(data.decode(), *raddr)logging.info(msg)def send(self, msg: str):print(msg)self.sock.sendto(msg.encode(), self.raddr)def stop(self):self.send('quit') # 通知服务端退出self.sock.close()self.event.set()def main():cc1 = ChatUdpClient()cc2 = ChatUdpClient()cc1.start()cc2.start()print(cc1.sock)print(cc2.sock)while True:cmd = input('Input your words >>')if cmd.strip() == 'quit':cc1.stop()cc2.stop()breakcc1.send(cmd)cc2.send(cmd)if __name__ == '__main__':main()
UDP协议应用
UDP是无连接协议,它基于以下假设:网络足够好 消息不会丢包 包不会乱序
但是,即使是在局域网,也不能保证不丢包,而且包的到达不一定有序。
应用场景 视频、音频传输,一般来说,丢些包,问题不大,最多丢些图像、听不清话语,可以重新发话语来解决。
海量采集数据,例如传感器发来的数据,丢几十、几百条数据也没有关系。DNS协议,数据内容小,一个包就能
查询到结果,不存在乱序,丟包,重新请求解析。
一般来说,UDP性能优于TCP,但是可靠性要求高的场合的还是要选择TCP协议。
SocketServer
socket编程过于底层,编程虽然有套路,但是想要写出健壮的代码还是比较困难的,所以很多语言都对socket底层
AP进行封装,Python的封装就是——socketserver模块。它是网络服务编程框架,便于企业级快速开发。
类的继承关系
SocketServer简化了网络服务器的编写。
它有4个同步类:TCPServer ,UDPServer , UnixStreamServer,UnixDatagramServer。
2个Mixin类:ForkingMixIn 和 ThreadingMixIn类,用来支持异步。
class ForkingUDPServer(ForkingMixin, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass
class ThreadingUDPServerThreadingMixIn, UDPServer): pass
class ThreadingTCPServerThreadingMixIn, TCPServer): pass
fork是创建多进程,thread是创建多线程
编程接口
socketserver.BaseServer(server_address, RequestHandlerClass)
需要提供服务器绑定的地址信息,和用于处理请求的RequestHandlerClass类。
RequestHandlerClass类必须是BaseRequestHandler类的子类,在BaseServer中代码如下:
import threading# BaseServer代码
class BaseServer:def __init__(self, server_address, RequestHandlerClass):"""Constructor. May be extended, do not override."""self.server_address = server_addressself.RequestHandlerClass = RequestHandlerClassself.__is_shut_down = threading.Event()self.__shutdown_request = Falsedef finish_request(self, request, client_address): # 处理请求的方法"""Finish one request by instantiating RequestHandlerClass."""self.RequestHandlerClass(request, client_address, self) # RequestHandlerClass*i*
BaseRequestHandler类
它是和用户连接的用户请求处理类的基类,定义为BaseRequestHandler(request, client_address, server)
服务端Server实例接收用户请求后,最后会实例化这个类。
它被初始化时,送入3个构造参数:request, client_address, server自身
以后就可以在BaseRequestHandler类的实例上使用以下属性:
self.request是和客户端的连接的socket对象
self.server是TCPServer本身
self.client_address是客户端地址
这个类在初始化的时候,它会依次调用3个方法。子类可以覆盖这些方法。
# BaseRequestHandler要子类覆盖的方法
class BaseRequestHandler:def __init__(self, request, client_address, server):self.request = requestself.client_address = client_addressself.server = serverself.setup()try:self.handle()finally:self.finish()def setup(self): # 每一个连接初始化passdef handle(self): # 每一次请求处理passdef finish(self): # 每一个连接清理pass
测试代码
import threading
import socketserverclass MyHandler(socketserver.BaseRequestHandler):def handle(self):# super().handle()# 可以不调用,父类handle什么都没有做print('-' * 30)print(self.server) # 服务print(self.request) # 服务端负责客户端连接请求的socket对象print(self.client_address) # 客户端地址print(self.__dict__)print(self.server.__dict__) # 能看到负责accept的socketprint(threading.enumerate())print(threading.current_thread())print('-' * 30)addr = ('127.0.0.1', 9999)
server = socketserver.ThreadingTCPServer(addr, MyHandler)
server.serve_forever() # 永久
测试结果说明,handle方法相当于socket的recv方法。每个不同的连接上的请求过来后,生成这个连接的socket对象即self.request,客户端地址是self.dlient_address。
问题
测试过程中,上面代码,连接后立即断开了,为什么?
怎样才能客户端和服务器端长时间连接?
import threading
import socketserver
import loggingFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)class MyHandler(socketserver.BaseRequestHandler):def handle(self):# super().handle()# 可以不调用,父类handle什么都没有做print('-' * 30)print(self.server) # 服务print(self.request) # 服务端负责客户端连接请求的socket对象print(self.Client_address) # 客户端地址print(self.__dict__)print(self.server.__dict__) # 能看到负责accept的print(threading.enumerate())print(threading.current_thread())print('-' * 30)for i in range(3):data = self.request.recv(1024)logging.info(data)logging.info('====end====')addr = ('127.0.0.1', 9999)
server = socketserver.ThreadingTCPServer(addr, MyHandler)
server.serve_forever() # 永久
将ThreadingTCPServer换成TCPServer,同时连接2个客户端观察效果。
ThreadingTCPServer是异步的,可以同时处理多个连接。
TCPServer是同步的,一个连接处理完了,即一个连接的handle方法执行完了,才能处理另一个连接,且只有主线程。
总结
创建服务器需要几个步骤:
1.从BaseRequestHandler类派生出子类,并覆盖其handle(方法来创建请求处理程序类,此方法将处理
传入请求
2.实例化一个服务器类,传参服务器的地址和请求处理类
3.调用服务器实例的handle_request()或serve_forever()方法
4.调用server_close()关闭套接字
实现EchoServer
顾名思义,Echo,来什么消息回显什么消息
客户端发来什么信息,返回什么信息
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sysclass EchoHandler(BaseRequestHandler):def setup(self):super().setup()self.event = threading.Event() # 初始工作def finish(self):super().finish()self.event.set()def handle(self):super().handle()while not self.event.is_set():data = self.request.recv(1024).decode()msg = "{} {}".format(self.client_address, data).encode()self.request.send(msg)print('End')addr = ('127.0.0.1', 9999)
server = ThreadingTCPServer(addr, EchoHandler)server_thread = threading.Thread(target=server.serve_forever, name='EchoServer', daemon=True)
server_thread.start()try:while True:cmd = input('>>>')if cmd.strip() == 'quit':breakprint(threading.enumerate())
except Exception as e:print(e)
except KeyboardInterrupt:pass
finally:print('Exit')sys.exit(0)
练习一一改写ChatServer
使用ThreadingTCPServer改写ChatServer
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys
import loggingFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatHandler(BaseRequestHandler):clients = {}def setup(self):super().setup()self.event = threading.Event() # 初始工作self.clients[self.client_address] = self.requestdef finish(self):super().finish() # 清理工作self.clients.pop(self.client_address) # 能执行到吗?self.event.set()def handle(self):super().handle()while not self.event.is_set():data = self.request.recv(1024).decode()if data == 'quit':breakmsg = "{} {}".format(self.client_address, data).encode()logging.info(msg)for c in self.clients.values():self.request.send(msg)print('End')addr = ('127.0.0.1', 9999)server = ThreadingTCPServer(addr, ChatHandler)server_thread = threading.Thread(target=server.serve_forever, name='ChatServer', daemon=True)server_thread.start()try:while True:cmd = input('>>>')if cmd.strip() == 'quit':breakprint(threading.enumerate())
except Exception as e:print(e)
except KeyboardInterrupt:pass
finally:print('Exit')sys.exit(0)
问题
上例 self.clients.pop(self.client_address) 能执行到吗?
如果连接的线程中handle方法中抛出异常,例如客户端主动断开导致的异常,线程崩溃,self.clients的pop方法还能执行吗?
当然能执行,基类源码保证了即使异常,也能执行finish方法。但不代表不应该不捕获客户端各种异常。
解决客户端主动连接断开问题
如果客户端主动断开,总是抛出一个异常。看看到底发生了什么,在handle方法中增加一些语句。
import threading
import logging
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sysclass ChatHandler(BaseRequestHandler):clients = {}def setup(self):super().setup()self.event = threading.Event() # 初始工作self.clients[self.client_address] = self.requestdef finish(self):super().finish() # 清理工作self.clients.pop(self.client_address) # 能执行到吗?self.event.set()def handle(self):super().handle()while not self.event.is_set():data = self.request.recv(1024).decode()print(data, '~' * 30) # 增加if data == 'quit':breakmsg = "{} {}".format(self.client_address, data).encode()logging.info(msg)for c in self.clients.values:print('+++++++++++++') # 增加self.request.send(msg)print('End')
通过打印可以看到,客户端主动断开,会导致recv方法立即返回一个空bytes,并没有同时抛出异常。当循环回到recv这一句的时候就会抛出异常。所以,可以通过判断data数据是否为空来客户端是否断开。
import threading
import logging
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sysclass ChatHandler(BaseRequestHandler):clients = {}def setup(self):super().setup()self.event = threading.Event() # 初始工作self.clients[self.client_address] = self.requestdef finish(self):super().finish() # 清理工作self.Clients.pop(self.client_address) # 能执行到吗?self.event.set()def handle(self):super().handle()while not self.event.is_set():data = self.request.recv(1024).decode()print(data, '~' * 30)if not data or data == 'quit':print('Broken pipe')breakmsg = "{} {}".format(self.client_address, data).encode()logging.info(msg)for c in self.clients.values:self.request.send(msg)print('End')
总结
为每一个连接提供RequestHandlerClass类实例,依次调用setup、handle、finish方法,且使用了try...finally结构保证finish方法一定能被调用。这些方法依次执行完成,如果想维持这个连接和客户端通信,就需要在handle函数中使用循环。socketserver模块提供的不同的类,但是编程接口是一样的,即使是多进程、多线程的类也是一样,大大减少了编程的难度。
异步编程
重要概念
同步、异步
函数或方法被调用的时候,调用者是否得到最终结果的。
直接得到最终结果结果的,就是同步调用;
不直接得到最终结果的,就是异步调用。
同步就是我让你打饭,你不打好给我不走开,直到你打饭给了我。
异步就是我让你打饭,你打着,我不等你,但是我会盯着你,你打完,我会过来拿走的。异步并不保证多长时间最终打完饭。
阻塞、非阻塞
函数或方法调用的时候,是否立刻返回。
立即返回就是非阻塞调用;
不立即返回就是阻塞调用。
区别
同步、异步,与阻塞、非阻塞不相关。
同步、异步强调的是,是否得到(最终的)结果;
阻塞、非阻塞强调是时间,是否等待。
同步与异步区别在于:调用者是否得到了想要的最终结果。
同步就是一直要执行到返回最终结果;
异步就是直接返回了,但是返回的不是最终结果。调用者不能通过这种调用得到结果,还要通过被调用者,使用其它方式通知调用者,来取回最终结果。
阻塞与非阻塞的区别在于,调用者是否还能干其他事。
阻塞,调用者就只能干等;
非阻塞,调用者可以先去忙会别的,不用一直等。
联系
同步阻塞,我啥事不干,就等你打饭打给我。打到饭是结果,而且我啥事不干一直等,同步加阻塞。
同步非阻塞,我等着你打饭给我,但我可以玩会手机、看看电视。打饭是结果,但是我不一直等
异步阻塞,我要打饭,你说等叫号,并没有返回饭给我,我啥事不干,就干等着饭好了你叫我。例如,叫号
异步非阻塞,我要打饭,你说等叫号,并没有返回饭给我,我在旁边看电视、玩手机,饭打好了叫我。
同步IO、异步IO、IO 多路复用
IO 两个阶段
IO过程分两阶段:
1.数据准备阶段
2.内核空间复制回用户进程缓冲区阶段
发生I0的时候: 人的高薪职业学
1、内核从输入设备读、写数据(淘米,把米放饭锅里煮饭)
2、进程从内核复制数据(盛饭,从内核这个饭锅里面把饭装到碗里来)
系统调用一—read函数
IO模型
同步IO
同步IO 模型包括 阻塞IO 、非阻塞IO 、IO 多路复用
阻塞IO
进程等待(阻塞),直到读写完成。(全程等待)
read/write函数
非阻塞1O
进程调用read操作,如果IO设备没有准备好,立即返回ERROR,进程不阻塞。用户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户空间。
第一阶段数据没有准备好,就先忙别的,等会再来看看。检 数据是否准备好了的过程是非阻塞的。
第二阶段是阻塞的,即内核空间和用户空间之间复制数据是阻塞的。
淘米、蒸饭我不等,我去玩会,盛饭过程我等着你装好饭,但是要等到盛好饭才算完事,这是同步的,结果就是盛好饭。
read/write
IO多路复用
所谓IO多路复用,就是同时监控多个IO,有一个准备好了,就不需要等了开始处理,提高了同时处理IO的能力。
select几乎所有操作系统平台都支持,poll是对的select的升级。
epoll, Linux系统内核2.5+开始支持,对select和poll的增强,在监视的基础上,增加回调机制。BSD、Mac平台有
kqueue , Windows有iocp.
以select为例,将关注的IO操作告诉select函数并调用,进程阻塞,内核"“监视"select关注的文件描述符fd,被关注的任何一个fd对应的IO准备好了数据,select返回。再使用read将数据复制到用户进程。
select举例,食堂供应很多菜(众多的I0),你需要吃某三菜一汤,大师傅(操作系统)说要现做,需要等,你只好等待。其中一样菜好了,大师傅叫你过来说你点的菜有好的了,你得自己找找看哪一样才好了,请服务员把做好的菜打给你。
epoll是有菜准备好了,大师傅喊你去几号窗口直接打菜,不用自己找菜了。
一般情况下,select最多能监听1024个fd(可以修改,但不建议改),但是由于select采用轮询的方式,当管理的IO多了,每次都要遍历全部fd,效率低下。
epoll没有管理的fd的上限,且是回调机制,不需遍历,效率很高。
异步IO
进程发起异步IO请求,立即返回。内核完成1O的两个阶段,内核给进程发一个信号。
举例,来打饭,跟大师傅说饭好了叫你,饭菜准备好了,窗口服务员把饭盛好了打电话叫你。两阶段都是异步的。
在整个过程中,进程都可以忙别的,等好了才过来。
举例,今天不想出去到饭店吃饭了,点外卖,饭菜在饭店做好了(第一阶段),快递员从饭店送到你家门口(第二阶段)。
Linux的aio的系统调用,内核从版本2.6开始支持
Python 中IO多路复用
- IO多路复用
1、 大多数操作系统都支持select和poll
2、 Linux 2.5+ 支持epoll
3、 BSD、Mac支持kqueue
4、 Windows JOCP
Python的select库
实现了select、poll系统调用,这个基本上操作系统都支持。部分实现了epoll
底层的I0多路复用模块。
开发中的选择
1、完全跨平台,使用select、poll。但是性能较差
2、针对不同操作系统自行选择支持的技术,这样做会提高10处理的性能
selectors库
3.4 版本提供这个库,高级 IO 复用库。
类层次结构:
BaseSelector
+-- SelectSelector 实现select
+-- PollSelector 实现po11
+-- EpollSelector 实现epol1
+-- DevpollSelector 实现devpo11
+-- KqueueSelector 实现kqueue
selectors.DefaultSelector返回当前平台最有效、性能最高的实现。
但是,由于没有实现Windows下的IOCP,所以,只能退化为select。
# 在selects模块源码最下面有如下代码
# Choose the best implementation, roughly:
# epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():DefaultSelector = PollSelector
else:DefaultSelector = SelectSelector
abstractmethod register(fileobj, events, data=None)
为selector注册一个文件对象,监视它的IO事件。
fileobj被监视文件对象,例如socket对象
events 事件,该文件对象必须等待的事件
data 可选的与此文件对象相关联的不透明数据,例如,关联用来存储每个客户端的会话ID,关联方法。通过这个
参数在关注的事件产生后让selector干什么事。
# 使用举例
import selectors
import threading
import socket
import loggingFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)# 回调函数,自己定义形参def accept(sock: socket.socket, mask):"""mask:事件掩码的或值"""conn, raddr = sock.accept()conn.setblocking(False) # 不阻塞# 监视conn这个文件对象key = selector.register(conn, selectors.EVENT_READ, read)logging.info(key)# 回调函数
def read(conn: socket.socket, mask):data = conn.recv(1024)msg = "Your msg is {}.", format(data.decode())conn.send(msg.encode())# 构造缺省性能最优selector
selector = selectors.DefaultSelector()
# 创建Tcp Server
sock = socket.socket()
sock.bind(('127.0.0.1', 9999))sock.listen()
logging.info(sock)sock.setblocking(False) # 非阻塞# 注册文件对象sock关注读事件,返回SelectorKey
# 将sock、关注事件、data都绑定到key实例属性上
key = selector.register(sock, selectors.EVENT_READ, accept)logging.info(key)
e = threading.Event()def select(e):while not e.is_set():# 开始监视,等到有文件对象监控事件产生,返回(key,mask)元组events = selector.select()print('-' * 30)for key, mask in events:logging.info(key)logging.info(mask)callback = key.data # 回调函数callback(key.fileobj, mask)threading.Thread(target=select, args=(e,), name='select').start()def main():while not e.is_set():cmd = input('>>')if cmd.strip() == 'quit':e.set()fobjs = []logging.info('{}'.format(list(selector.get_map().items())))for fd, key in selector.get_map().items(): # 返回注册的项print(fd, key)print(key.fileobj)fobjs.append(key.fileobj)for fobj in fobjs:selector.unregister(fobj)fobj.close() # 关闭socketselector.close()if __name__ == '__main__':main()
练习
将ChatServer改写成IO多路复用的方式
不需要启动多线程来执行socket的accept、recv方法了
import socket
import threading
import datetime
import logging
import selectorsFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatServer:def __init__(self, ip='127.0.0.1', port=9999): # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.event = threading.Event()self.selector = selectors.DefaultSelector() # 创建selectordef start(self): # 启动监听self.sock.bind(self.addr) # 绑定self.sock.listen() # 监听self.sock.setblocking(False) # 不阻塞# 注册self.selector.register(self.sock, selectors.EVENT_READ, self.accept)threading.Thread(target=self.select, name='selector', daemon=True).start()def select(self): # 阻基while not self.event.is_set():# 开始监视,等到有文件对象监控事件产生,返回(key,mask)元组events = self.selector.select()print('-' * 30)for key, mask in events:logging.info(key)logging.info(mask)callback = key.data # 回调函数callback(key.fileobj)def accept(self, sock: socket.socket): # 多人连接conn, addr = sock.accept() # BaZEconn.setblocking(False) # 非阻塞# 注册,监视每一个连接的socket对象self.selector.register(conn, selectors.EVENT_READ, self.recv)def recv(self, sock: socket.socket): # 接收客户端数据data = sock.recv(1024) # 阻塞到数据到来if data == b'': # 客户端主动断开,注销并关闭socketself.selector.unregister(sock)sock.close()returnmsg = "{:%Y/%m/%d %H:%M:%S} {}: {}\n{}\n".format(datetime.datetime.now(), *sock.getpeername(), data.decode())logging.info(msg)msg = msg.encode()# 群发for key in self.selector.get_map().values():if key.data == self.recv: # #ißself.acceptkey.fileobj.send(msg)def stop(self): # 停止服务self.event.set()fobjs = []for fd, key in self.selector.get_map().items():fobjs.append(key.fileobj)for fobj in fobjs:self.selector.unregister(fobj)fobj.close()self.selector.close()def main():cs = ChatServer()cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)breaklogging.info(threading.enumerate())if __name__ == '__main__':main()
基本完成功能,但是退出机制、异常处理没有加,这个和以前的处理方式一样,请自行完成。
进阶
send是写操作,也可以让selector监听,如何监听?
self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.recv)
注册语句,要监听selectors.EVENT_READ | selectors.EVENT_WRITE 读与写事件。
回调的时候,需要mask来判断究竟是读触发还是写触发了。所以,需要修改方法声明,增加mask。
def recv(self, sock, mask)但是由于recv 方法处理读和写事件,所以叫recv不太合适,改名为
def handle(self, sock, mask)
注意读和写是分离的,那么handle函数应该写成下面这样
def handle(self, sock:socket.socket,mask) # 接收客户端数据if mask & selectors. EVENT_READ:pass# 注意,这里是某一个socket的写操作if mask & selectors.EVENT_WRITE:# 写缓冲区准备好了,可以写入数据了pass
handle方法里面处理读、写,mask有可能是0b01、0b10、0b11。
问题是,假没读取到了客户端发来的数据后,如何写出去?
为每一个与客户端连接的socket对象增加对应的队列。
与每一个客户端连接的socket对象,自己维护一个队列,某一个客户端收到信息后,会遍历发给所有客户端的队
列。这里完成一对多,即一份数据放到了所有队列中。
与每一个客户端连接的socket对象,发现自己队列有数据,就发送给客户端。
import socket
import threading
import datetime
import logging
import selectors
from queue import QueueFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message) s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatServer:def __init__(self, ip='127.0.0.1', port=9999):self.sock = socket.socket()self.addr = (ip, port)self.clients = {}self.event = threading.Event()self.selector = selectors.DefaultSelector() # 创建 selectordef start(self): # 启动监听self.sock.bind(self.addr) # 绑定self.sock.listen() # 监听self.sock.setblocking(False) # 不阻塞# 注册self.selector.register(self.sock, selectors.EVENT_READ, self.accept)threading.Thread(target=self.select, name='selector', daemon=True).start()def select(self): # 阻塞while not self.event.is_set():# 开始监视,等到某文件对象被监控的事件产生,返回(key,mask)元组events = self.selector.select() # 阻塞,直到eventsfor key, mask in events:if callable(key.data):callback = key.data # key对象的data属性,回调callback(key.fileobj, mask)else:callback = key.data[0]callback(key, mask)def accept(self, sock: socket.socket, mask): # 接收客户端连接conn, raddr = sock.accept()conn.setblocking(False) # 非阻塞self.clients[raddr] = (self.handle, Queue())# 注册,监视每一个与客户端的连接的socket对象self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.clients[raddr])def handle(self, key: selectors.SelectorKey, mask): # 接收客户端数据if mask & selectors.EVENT_READ:sock = key.fileobjraddr = sock.getpeername()data = sock.recv(1024)if not data or data == b'quit':self.selector.unregister(sock)sock.close()self.clients.pop(raddr)returnmsg = "{:%Y/%m/%d %H:%M:%S} {}: {}\n{}\n".format(datetime.datetime.now(), *raddr, data.decode())logging.info(msg)msg = msg.encode()for k in self.selector.get_map().values():logging.info(k)if isinstance(k.data, tuple):k.data[1].put(data)if mask & selectors.EVENT_WRITE:# 因为写一直就绪,mask为2,所以一直可以写,从而导致select()不断循环,如同不阻塞一样if not key.data[1].empty():key.fileobj.send(key.data[1].get())def stop(self): # 停止服务self.event.set()fobjs = []for fd, key in self.selector.get_map().items():fobjs.append(key.fileobj)for fobj in fobjs:self.selector.unregister(fobj)fobj.close()self.selector.close()def main():cs = ChatServer()cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)breaklogging.info(threading.enumerate())logging.info('-' * 30)logging.info("{} {}".format(len(cs.clients), cs.clients))logging.info(list(map(lambda x: (x.fileobj.fileno(), x.data), cs.selector.get_map().values())))logging.info('-' * 30)if __name__ == '__main__':main()
这个程序最大的问题,在select0一直判断可写,几乎一直循环不停。所以对于写不频繁的情况下,就不要监听EVENT_WRITE。
对于Server来说,一般来说,更多的是等待对方发来数据后响应时才发出数据,而不是积极的等着发送数据。所以监听EVENT_READ,收到数据之后再发送就可以了。
本例只完成基本功能,其他功能如有需要,请自行完成。
asyncio
3.4版本加入标准库。
asyncio底层基于selectors实现,看似库,其实就是个框架,包含异步IO、事件循环、协程、任务等内容。
问题的引出
def a():for x in range(3):print(x)def b():for x in "abc":print(x)a()
b()输出:
0
1
2
a
b
c
这是一个串行的程序,单线程中根本没有做任何能否并行?
import threading
import timedef a():for x in range(3):time.sleep(0.001) #print(x)def b():for x in "abc":time.sleep(0.001)print(x)threading.Thread(target=a, name='a').start()
threading.Thread(target=b, name='b').start()
# 运行结果
0
a
1
b
2
c
import multiprocessing
import timedef a():for x in range(3):time.sleep(0.001)print(x)def b():for x in "abc":time.sleep(0.001)print(x)if __name__ == "__main__":multiprocessing.Process(target=a, name='a').start()multiprocessing.Process(target=b, name='b').start()
输出:
a
0
b
1
c
2
生成器版本
def a():for x in range(3):print(x)yielddef b():for x in "abc":print(x)yieldx = a()
y = b()for i in range(3):next(x)next(y)
输出:
0
a
1
b
2
c
上例在一个线程内通过生成器完成了调度,让两个函数都有机会执行,这样的调度不是操作系统的进程、线程完成的,而是用户自己设计的。
这个程序编写:
需要使用yield来让出控制权
需要循环帮助交替执行
事件循环
事件循环是asyncio提供的核心运行机制。
协程
- 协程不是进程、也不是线程,它是用户空间调度的完成并发处理的方式。
- 进程、线程由操作系统完成调度,而协程是线程内完成调度。它不需要更多的线程,自然也没有多线程切换
带来的开销。 - 协程是非抢占式调度,只有一个协程主动让出控制权,另一个协程才会被调度。
- 协程也不需要使用锁机制,因为是在同一个线程中执行。
- 多CPU下,可以使用多进程和协程配合,既能进程并发又能发挥协程在单线程中的优势。
- Python中协程是基于生成器的。
协程的使用
3.4引|入asyncio,使用装饰器
import asyncio@asyncio.coroutine
def sleep(x): # 协程函数for i in range(3):print('sleep {}'.format(i))yield from asyncio.sleep(x)loop = asyncio.get_event_loop()
loop.run_until_complete(sleep(3))
loop.close()
将生成器函数转换成协程函数,就可以在事件循环中执行了。
3.5版本开始,Python提供关键字async、await,在语言上原生支持协程。
import asyncioasync def sleep(x):for i in range(3):print('sleep (}'.format(i))await asyncio.sleep(x)loop = asyncio.get_event_loop()
loop.run_until_complete(sleep(3))
loop.close()
async def用来定义协程函数,iscoroutinefunction() 返回True。协程函数中可以不包含await、 async关键字,但不能使用yield关键字。
如同生成器函数调用返回生成器对象一样,协程函数调用也会返回一个对象称为协程对象,iscoroutine()返回True.
再来做个例子
import asyncio
import threadingasync def sleep(x):for i in range(3):print('sleep {}'.format(i))await asyncio.sleep(x)async def showthread(x):for i in range(3):print(threading.enumerate())await asyncio.sleep(2)loop = asyncio.get_event_loop()tasks = [sleep(3), showthread(3)]loop.run_until_complete(asyncio.wait(tasks))loop.close()
# 协程版本
import asyncio
import threading@asyncio.coroutine
def a():for x in range(3):print('a.x', x)yield@asyncio.coroutine
def b():for x in 'abc':print('b.x', x)yieldprint(asyncio.iscoroutinefunction(a))
print(asyncio.iscoroutinefunction(b))# 大循环
loop = asyncio.get_event_loop()
tasks = [a(), b()]loop.run_until_complete(asyncio.wait(tasks))loop.close()
TCP Echo Server 举例
import asyncio# TCP Echo Server 举例
async def handle(reader, writer):while True:data = await reader.read(1024)print(dir(reader))print(dir(writer))client = writer.get_extra_info('peername')message = "{} Your msg (}".format(client, data.decode()).encode()writer.write(message)await writer.drain()loop = asyncio.get_event_loop()
ip = '127.0.0.1'
port = 9999
crt = asyncio.start_server(handle, ip, port, loop=loop)server = loop.run_until_complete(crt)print(server) # server是监听的socket对象try:loop.run_forever()
except KeyboardInterrupt:pass
finally:server.close()loop.close()
aiohttp库
安装
$ pip install aiohttp
开发
HTTP Server
from aiohttp import webasync def indexhandle(request: web.Request):return web.Response(text=request.path, status=201)async def handle(request: web.Request):print(request.match_info)print(request.query_string) # http://127.0.0.1:8080/1?name=12301return web.Response(text=request.match_info.get('id', '0000'), status=200)app = web.Application()app.router.add_get("/", indexhandle) # http://127.0.0.1:8080/app.router.add_get("/{id}", handle) # http://127.0.0.1:8080/12301web.run_app(app, host='0.0.0.0', port=9977)
HTTP Client
import asyncio
from aiohttp import ClientSessionasync def get_html(url: str):async with ClientSession() as session:async with session.get(url) as res:print(res.status)print(await res.text())url = 'http://127.0.0.1/ziroom-web/'loop = asyncio.get_event_loop()
loop.run_until_complete(get_html(url))loop.close()