ws 是什么?
https://www.npmjs.com/package/ws
ws 是一个 nodejs 的 webscoket 库, 用来创建 websocket 服务。
下面用简单的思路来实现一个聊天室应用的 nodejs 端服务。
|
import WebSocket, { WebSocketServer } from 'ws'; import express from 'express'; import chalk from 'chalk'; const pageConfig = { ping: 5000, // 心跳检测 msgPort: 3000, //内部推送端口 socketPort: 8020, // socket端口 } const app = express(); const wss = new WebSocketServer({ port: pageConfig.socketPort }); const groups = new Map(); wss.on('connection', function connection(ws) { ws.on('message', function incoming(message) { console.log('received: %s', message); const data = JSON.parse(message); ws.uid = data?.uid ?? ''; const action = data?.action ?? ''; const room = data?.room ?? ''; sameUser(ws); // 同一个用户在不同的客户端打开; 则把这些用户加入到一个房间内 if (action === 'login') { // 登陆系统; 返回令牌 replyData({}, action, ws); } // 加入房间 if (action === 'join') { // 加入到房间; room && addGroup(room, ws); } // 数据转发到其他room if (action === 'transmit') { // 将本次数据转发到此分组所有用户 sendMessageToGroup(room, data, action); } // 其他操作 if (action === 'other') {} }); ws.on('close', function close() { console.log(ws.userId, '离开了'); groups.forEach(function (value, key) { if (value.has(ws)) { value.delete(ws); if (value.size === 0) { groups.delete(key); } } }); }); }); /** * 回复消息给客户端 * @param data * @param action * @param ws */ const replyData = (data, action = '', ws) => { const config = JSON.stringify({ txt: data, action, time: new Date().getTime(), uid: ws.uid }); console.log(chalk.green(`🚀 回复消息`), action, config); ws.send(config); } /** * 加入到房间 * @param group * @param ws */ const addGroup = (group, ws) => { if (group) { if (!groups.has(group)) { groups.set(group, new Set()); } groups.get(group).add(ws); console.log(chalk.red(ws.uid), chalk.blue('加入了房间'), group); } } /** * 多个客户端相同 UID 用户处理 * @param ws */ const sameUser = (ws) => { // 查找所有在线用户里 uid 等于自己的, 加一个分组里; // Set 会自动过滤重复的 wss.clients.forEach(function each(client) { // 把所有的自己都加入到 room Id 是 uid 的组里; if (client.readyState === WebSocket.OPEN && client.uid === ws.uid) { addGroup(ws.uid, client); } }); } /** * 推送消息给房间(分组) * @param group * @param message * @param action */ const sendMessageToGroup = (group, message, action = '') => { if (groups.has(group)) { groups.get(group).forEach(function each(client) { if (client.readyState === WebSocket.OPEN) { client.send(message); replyData(message, action, client); } }); } } /** * 给所有人发消息 * @param txt */ const sendAll = (txt) => { wss.clients.forEach(function each(client) { if (client.readyState === WebSocket.OPEN) { client.send(txt); } }); } app.get('/', (req, res) => { const params = req.query; sendAll(params?.txt ?? '你好'); res.send('Hello, this is a GET request!'); }); // 给房间发消息 app.get('/send', (req, res) => { const params = req?.query ?? {}; const action = params?.action ?? ''; sendMessageToGroup(params?.room, params?.txt ?? '你好', action); res.send('Hello, Send Success'); }); // 获取房间个数 app.get('/count', (req, res) => { let size = 0; groups.forEach(function (value, key) { const room = req?.query?.room ?? ''; if (key === room) size = value?.size; }); res.send(size.toString()); }) // 心跳检测; setInterval(() => { wss.clients.forEach(function each(client) { if (client.readyState === WebSocket.OPEN) { replyData(`Hello ~ 你还在吗?`, 'ping', client); } else { // 删除已关闭的客户端 groups.forEach(function (value, key) { console.log(`名称`, key, '在线个数', value.size); if (value.has(client)) { value.delete(client); if (value.size === 0) { groups.delete(key); } } }); } }); }, pageConfig.ping); app.listen(pageConfig.msgPort, () => { console.log(chalk.blue(`内部推送服务已启动: ${pageConfig.msgPort}`)); }); |
服务包含了心跳检测, 多分组, 内部推送服务等!