且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Python: flask-socketio使用Websocket协议进行通讯

更新时间:2022-08-19 17:52:04

文档:


PyPI: https://pypi.org/project/Flask-SocketIO/

Github: https://github.com/miguelgrinberg/Flask-SocketIO

doc: https://flask-socketio.readthedocs.io

socket.io: https://socket.io/


安装

pip install flask-socketio gevent-websocket

代码实例

from flask import Flask, render_template, request
from flask_socketio import SocketIO

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
app.jinja_env.auto_reload = True

socketio = SocketIO(app)


@app.route('/')
def index():
    return render_template('index.html')


@app.route('/send_all')
def send_all():
    """
    广播
    :return: 
    """
    message = request.args.get('message')

    socketio.send(message)

    return {'status': 'ok'}


@app.route('/send')
def send_message():
    """
    单独发送
    :return:
    """
    sid = request.args.get('sid')
    message = request.args.get('message')

    socketio.send(message, to=sid)

    return {'status': 'ok'}


@socketio.on('connect')
def connect():
    print('connect')
    socketio.send({'sid': request.sid})


@socketio.on('disconnect')
def disconnect():
    print('disconnect')


@socketio.on('message')
def handle_message(data):
    print(data)
    socketio.send({'data': data})


if __name__ == '__main__':
    socketio.run(app, debug=True)

Http 发送测试请求

import requests

params = {
    'sid': 'm4AymrH2TIFHCcQNAAAF',
    'message': 'send'
}

res = requests.get('http://localhost:5000/send', params=params)
print(res.text)


params = {
    'message': 'send_all'
}

res = requests.get('http://localhost:5000/send_all', params=params)
print(res.text)