import WebSocket, { WebSocketServer } from 'ws';
import express from 'express';
import chalk from 'chalk';
const pageConfig = {
ping: 5000, // 心跳检测
msgPort: 3000, //内部推送端口
socketPort: 8020, // socket端口
}
const app = express();
const wss = new WebSocketServer({ port: pageConfig.socketPort });
const groups = new Map();
wss.on('connection', function connection(ws) {
ws.on('message', function incoming(message) {
console.log('received: %s', message);
const data = JSON.parse(message);
ws.uid = data?.uid ?? '';
const action = data?.action ?? '';
const room = data?.room ?? '';
sameUser(ws); // 同一个用户在不同的客户端打开; 则把这些用户加入到一个房间内
if (action === 'login') {
// 登陆系统; 返回令牌
replyData({}, action, ws);
}
// 加入房间
if (action === 'join') {
// 加入到房间;
room && addGroup(room, ws);
}
// 数据转发到其他room
if (action === 'transmit') {
// 将本次数据转发到此分组所有用户
sendMessageToGroup(room, data, action);
}
// 其他操作
if (action === 'other') {}
});
ws.on('close', function close() {
console.log(ws.userId, '离开了');
groups.forEach(function (value, key) {
if (value.has(ws)) {
value.delete(ws);
if (value.size === 0) {
groups.delete(key);
}
}
});
});
});
/**
* 回复消息给客户端
* @param data
* @param action
* @param ws
*/
const replyData = (data, action = '', ws) => {
const config = JSON.stringify({
txt: data,
action,
time: new Date().getTime(),
uid: ws.uid
});
console.log(chalk.green(`🚀 回复消息`), action, config);
ws.send(config);
}
/**
* 加入到房间
* @param group
* @param ws
*/
const addGroup = (group, ws) => {
if (group) {
if (!groups.has(group)) {
groups.set(group, new Set());
}
groups.get(group).add(ws);
console.log(chalk.red(ws.uid), chalk.blue('加入了房间'), group);
}
}
/**
* 多个客户端相同 UID 用户处理
* @param ws
*/
const sameUser = (ws) => {
// 查找所有在线用户里 uid 等于自己的, 加一个分组里;
// Set 会自动过滤重复的
wss.clients.forEach(function each(client) {
// 把所有的自己都加入到 room Id 是 uid 的组里;
if (client.readyState === WebSocket.OPEN && client.uid === ws.uid) {
addGroup(ws.uid, client);
}
});
}
/**
* 推送消息给房间(分组)
* @param group
* @param message
* @param action
*/
const sendMessageToGroup = (group, message, action = '') => {
if (groups.has(group)) {
groups.get(group).forEach(function each(client) {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
replyData(message, action, client);
}
});
}
}
/**
* 给所有人发消息
* @param txt
*/
const sendAll = (txt) => {
wss.clients.forEach(function each(client) {
if (client.readyState === WebSocket.OPEN) {
client.send(txt);
}
});
}
app.get('/', (req, res) => {
const params = req.query;
sendAll(params?.txt ?? '你好');
res.send('Hello, this is a GET request!');
});
// 给房间发消息
app.get('/send', (req, res) => {
const params = req?.query ?? {};
const action = params?.action ?? '';
sendMessageToGroup(params?.room, params?.txt ?? '你好', action);
res.send('Hello, Send Success');
});
// 获取房间个数
app.get('/count', (req, res) => {
let size = 0;
groups.forEach(function (value, key) {
const room = req?.query?.room ?? '';
if (key === room) size = value?.size;
});
res.send(size.toString());
})
// 心跳检测;
setInterval(() => {
wss.clients.forEach(function each(client) {
if (client.readyState === WebSocket.OPEN) {
replyData(`Hello ~ 你还在吗?`, 'ping', client);
} else {
// 删除已关闭的客户端
groups.forEach(function (value, key) {
console.log(`名称`, key, '在线个数', value.size);
if (value.has(client)) {
value.delete(client);
if (value.size === 0) {
groups.delete(key);
}
}
});
}
});
}, pageConfig.ping);
app.listen(pageConfig.msgPort, () => {
console.log(chalk.blue(`内部推送服务已启动: ${pageConfig.msgPort}`));
});