ws 是什么?
https://www.npmjs.com/package/ws
ws 是一个 nodejs 的 webscoket 库, 用来创建 websocket 服务。
下面用简单的思路来实现一个聊天室应用的 nodejs 端服务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
import WebSocket, { WebSocketServer } from 'ws'; import express from 'express'; import chalk from 'chalk'; const pageConfig = { ping: 5000, // 心跳检测 msgPort: 3000, //内部推送端口 socketPort: 8020, // socket端口 } const app = express(); const wss = new WebSocketServer({ port: pageConfig.socketPort }); const groups = new Map(); wss.on('connection', function connection(ws) { ws.on('message', function incoming(message) { console.log('received: %s', message); const data = JSON.parse(message); ws.uid = data?.uid ?? ''; const action = data?.action ?? ''; const room = data?.room ?? ''; sameUser(ws); // 同一个用户在不同的客户端打开; 则把这些用户加入到一个房间内 if (action === 'login') { // 登陆系统; 返回令牌 replyData({}, action, ws); } // 加入房间 if (action === 'join') { // 加入到房间; room && addGroup(room, ws); } // 数据转发到其他room if (action === 'transmit') { // 将本次数据转发到此分组所有用户 sendMessageToGroup(room, data, action); } // 其他操作 if (action === 'other') {} }); ws.on('close', function close() { console.log(ws.userId, '离开了'); groups.forEach(function (value, key) { if (value.has(ws)) { value.delete(ws); if (value.size === 0) { groups.delete(key); } } }); }); }); /** * 回复消息给客户端 * @param data * @param action * @param ws */ const replyData = (data, action = '', ws) => { const config = JSON.stringify({ txt: data, action, time: new Date().getTime(), uid: ws.uid }); console.log(chalk.green(`🚀 回复消息`), action, config); ws.send(config); } /** * 加入到房间 * @param group * @param ws */ const addGroup = (group, ws) => { if (group) { if (!groups.has(group)) { groups.set(group, new Set()); } groups.get(group).add(ws); console.log(chalk.red(ws.uid), chalk.blue('加入了房间'), group); } } /** * 多个客户端相同 UID 用户处理 * @param ws */ const sameUser = (ws) => { // 查找所有在线用户里 uid 等于自己的, 加一个分组里; // Set 会自动过滤重复的 wss.clients.forEach(function each(client) { // 把所有的自己都加入到 room Id 是 uid 的组里; if (client.readyState === WebSocket.OPEN && client.uid === ws.uid) { addGroup(ws.uid, client); } }); } /** * 推送消息给房间(分组) * @param group * @param message * @param action */ const sendMessageToGroup = (group, message, action = '') => { if (groups.has(group)) { groups.get(group).forEach(function each(client) { if (client.readyState === WebSocket.OPEN) { client.send(message); replyData(message, action, client); } }); } } /** * 给所有人发消息 * @param txt */ const sendAll = (txt) => { wss.clients.forEach(function each(client) { if (client.readyState === WebSocket.OPEN) { client.send(txt); } }); } app.get('/', (req, res) => { const params = req.query; sendAll(params?.txt ?? '你好'); res.send('Hello, this is a GET request!'); }); // 给房间发消息 app.get('/send', (req, res) => { const params = req?.query ?? {}; const action = params?.action ?? ''; sendMessageToGroup(params?.room, params?.txt ?? '你好', action); res.send('Hello, Send Success'); }); // 获取房间个数 app.get('/count', (req, res) => { let size = 0; groups.forEach(function (value, key) { const room = req?.query?.room ?? ''; if (key === room) size = value?.size; }); res.send(size.toString()); }) // 心跳检测; setInterval(() => { wss.clients.forEach(function each(client) { if (client.readyState === WebSocket.OPEN) { replyData(`Hello ~ 你还在吗?`, 'ping', client); } else { // 删除已关闭的客户端 groups.forEach(function (value, key) { console.log(`名称`, key, '在线个数', value.size); if (value.has(client)) { value.delete(client); if (value.size === 0) { groups.delete(key); } } }); } }); }, pageConfig.ping); app.listen(pageConfig.msgPort, () => { console.log(chalk.blue(`内部推送服务已启动: ${pageConfig.msgPort}`)); }); |
服务包含了心跳检测, 多分组, 内部推送服务等!