且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

用select模块实现的socket server

更新时间:2022-09-30 18:53:00

socket服务端

之前笔记里面记录的比较乱,最后我写了一个类,试着封装成一个模块的样子。
使用的时候通过继承生成一个子类,然后调用run执行。
你应该需要重构其中的部分方法,另外可能还需要在子类中创建新的方法。
至少需要重构onrecv方法,接收到数据后的处理。
另外要发数据,调用send_data接口,把conn连接和bytes类型的data传入。

import logging
import queue
import select
import socket

class Server(object):
    def __init__(self, host, port, data_size=1024):
        self.host = host
        self.port = port
        self.data_size = data_size

        self.server = socket.socket()
        self.server.setblocking(False)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind((self.host, self.port))
        self.server.listen()
        logging.critical("监听已经开启")

        self.inputs = [self.server, ]
        self.outputs = []
        self.data_queue = {}

    def run(self):
        while True:
            self.loop()

    def loop(self):
        """调用一次select并处理
        run方法中就是循环调用执行此方法
        可以重构此方法:
        调用执行父类中的这个方法后,加入一次循环中需要执行的其他步骤
        """
        readable, writeable, exceptional = select.select(self.inputs, self.outputs, self.inputs)
        logging.debug("select返回: {readable:%s, writeable:%s, exceptional:%s}"
                      % (readable, writeable, exceptional))
        for r in readable:
            if r is self.server:
                conn, addr = r.accept()
                logging.info("接收到新的客户端连接: {conn: %s, addr: %s}" % (conn, addr))
                conn.setblocking(False)
                self.inputs.append(conn)
                self.data_queue[conn] = queue.Queue()
            else:
                try:
                    data = r.recv(self.data_size)
                except ConnectionResetError as e:
                    logging.error("recv时捕获到异常: %s" % e)
                    self.clean(r)
                    if r in writeable: writeable.remove(r)
                except Exception as e:
                    logging.critical("recv时捕获未知异常: %s" % e)
                    self.clean(r)
                else:
                    if data:  # 如果有收到数据
                        logging.debug(data)
                        self.onrecv(r, data)
                    else:  # 可能是收到了空,就是客户端断开了
                        logging.info("客户端已断开: %s" % r)
                        self.clean(r)
                        # 只在writeable之前才从writeable列表中清除
                        if r in writeable: writeable.remove(r)

        for w in writeable:
            try:
                data = self.data_queue[w].get_nowait()
            except KeyError as e:  # 客户端突然断开,这个连接可能会同时出现在读和写列表中,而读中已经将它删掉了
                logging.error("获取消息队列时捕获到异常: %s" % e)
                # self.clean(w)  # 这里就是被删了才出的异常
            except queue.Empty:  # 如果队列空了才说明消息都发完了,从读列表中remove
                # if w in self.outputs: self.outputs.remove(w)
                self.outputs.remove(w)  # 这里应该不用判断,一定在列表里。因为如果不在会包KeyError
            except Exception as e:
                logging.critical("获取消息队列时捕获未知异常: %s" % e)
                self.clean(w)
            else:
                if data:
                    try:
                        w.sendall(data)  # 可能一次发不完,所以send之后不从读列表中remove。下次会发现队列为空
                    except ConnectionResetError as e:
                        logging.error("send时捕获到异常: %s" % e)
                    except Exception as e:
                        logging.critical("send时捕获未知异常: %s" % e)

        for e in exceptional:
            logging.critical("异常列表有返回: %s" % e)
            self.clean(e)

    def clean(self, conn):
        """清理客户端连接信息
        连接断开时处理以下4步
        1. 从读列表中去掉,不再去监听这个连接
        2. 从写列表中去掉,如果还有没发出的消息,那么也不再发了
        3. 关闭这个连接
        4. 从消息队列字典中中删除这个队列,可能还有未发送的消息
        可以重构此方法:
        如果还有其他在子类中定义的列表或字典需要清理,
        那么重构此方法,调用执行父类的次方法后,添加自定义的内容
        """
        if conn in self.inputs: self.inputs.remove(conn)
        if conn in self.outputs: self.outputs.remove(conn)
        conn.close()
        if conn in self.data_queue: del self.data_queue[conn]

    def onrecv(self, conn, data):
        """收到消息后调用执行此方法
        需要重构此方法
        否则是调用send_data方法,原样发回
        """
        self.send_data(conn, data)

    def send_data(self, conn, data):
        """发送数据"""
        if conn not in self.outputs: self.outputs.append(conn)
        self.data_queue[conn].put(data)

if __name__ == '__main__':
    Server('localhost', 9999).run()

测试客户端

下面是测试并发的客户端程序,通过多线程实现并发。

import socket
import threading

HOST = 'localhost'
PORT = 9999
def client(i):
    client = socket.socket()
    client.connect((HOST, PORT))
    for j in range(500):
        msg = "hello %s %s" % (i, j)
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print('Received:', data.decode('utf-8'))
    client.close()

if __name__ == '__main__':
    for i in range(50):
        t = threading.Thread(target=client, args=(i,))
        t.start()












本文转自骑士救兵51CTO博客,原文链接:http://blog.51cto.com/steed/2056177,如需转载请自行联系原作者