基于ThinkJS的WebSocket通信详解

本文介绍了如何在ThinkJS的项目中利用 WebSocket 实现多端的实时通信。ThinkJS是360前端团队奇舞团基于Koa 2 开发的企业级 Node.js 服务端框架,文章中会从零开始实现一个简单的聊天室,希望读者们能有所收获。

基于 ThinkJS 的 WebSocket 通信详解

前 言

我们的项目是基于 ThinkJS Vue 开发的,最近实现了一个多端实时同步数据的功能,所以想写一篇文章来介绍下如何在 ThinkJS 的项目中利用 WebSocket 实现多端的实时通信。ThinkJS 是基于 Koa 2 开发的企业级 Node.js 服务端框架,文章中会从零开始实现一个简单的聊天室,希望读者们能有所收获。

WebSocket

WebSocket 是 HTML5 中提出的一种协议。它的出现是为了解决客户端和服务端的实时通信问题。在 WebSocket 出现之前,如果想实现实时消息传递一般有两种方式:

  1. 客户端通过轮询不停的向服务端发送请求,如果有新消息客户端进行更新。这种方式的缺点很明显,客户端需要不停向服务器发送请求,然而 HTTP 请求可能包含较长的头部,其中真正有效的数据可能只是很小的一部分,显然这样会浪费很多带宽资源
  2. HTTP 长连接,客户端通过 HTTP 请求连接到服务端后, 底层的 TCP 连接不会马上断开,后续的信息还是可以通过同一个连接来传输。这种方式有一个问题是每个连接会占用服务端资源,在收到消息后连接断开,就需要重新发送请求。如此循环往复。

可以看到,这两种实现方式的本质还是客户端向服务端“Pull”的过程,并没有一个服务端主动“Push”到客户端的方式,所有的方式都是依赖客户端先发起请求。为了满足两方的实时通信, WebSocket 应运而生。

WebSocket 协议

首先,WebSocket 是基于 HTTP 协议的,或者说借用了 HTTP 协议来完成连接的握手部分。其次,WebSocket 是一个持久化协议,相对于 HTTP 这种非持久的协议来说,一个 HTTP 请求在收到服务端回复后会直接断开连接,下次获取消息需要重新发送 HTTP 请求,而 WebSocket 在连接成功后可以保持连接状态。下图应该能体现两者的关系:

基于 ThinkJS 的 WebSocket 通信详解

在发起 WebSocket 请求时需要先通过 HTTP 请求告诉服务端需要将协议升级为 WebSocket。

浏览器先发送请求:

GET / HTTP/1.1
Host: localhost:8080
Origin: [url=http://127.0.0.1:3000]http://127.0.0.1:3000[/url]
Connection: Upgrade
Upgrade: WebSocket
Sec-WebSocket-Version: 13
Sec-WebSocket-Key: w4v7O6xFTi36lq3RNcgctw==

服务端回应请求:

HTTP/1.1 101 Switching Protocols
Connection:Upgrade
Upgrade: WebSocket
Sec-WebSocket-Accept: Oy4NRAQ13jhfONC7bP8dTKb4PTU=

在请求头中核心的部分是 Connection 和 Upgrade ,通过这两个字段服务端会将 HTTP 升级为 WebSocket 协议。服务端返回对应信息后连接成功,客户端和服务端就可以正常通信了。

随着新标准的推进,WebSocket 已经比较成熟了,并且各个主流浏览器对 WebSocket 的支持情况比较好(不兼容低版本 IE,IE 10 以下)

基于 ThinkJS 的 WebSocket 通信详解

Socket.io

Socket.io (https://socket.io/) 是一个完全由 JavaScript 实现、基于 Node.js、支持 WebSocket 协议的用于实时通信、跨平台的开源框架。它包括了客户端的 JavaScript 和服务器端的 Node.js,并且有着很好的兼容性,会根据浏览器的支持情况选择不同的方式进行通讯,如上面介绍的轮询和 HTTP 长连接。

简易聊天室

对于 WebSocket 目前 ThinkJS 支持了 Socket.io 并对其进行了一些简单的包装,只需要进行一些简单的配置就可

以使用 WebSocket 了。

服务端配置

stickyCluster

ThinkJS 默认采用了多进程模型,每次请求会根据策略输送到不同的进程中执行,关于其多进程模型可以参考《细谈 ThinkJS 多进程模型》(https://zhuanlan.zhihu.com/p/37921649)。 而 WebSocket 连接前需要使用 HTTP 请求来完成握手升级,多个请求需要保证命中相同的进程,才能保证握手成功。这个时候就需要开启 StickyCluster 功能,使客户端所有的请求命中同一进程。修改配置文件 src/config/config.js 即可。

module.exports = {
 stickyCluster: true,
 // ...
}

添加 WebSocket 配置

在 src/config/extend.js 引入 WebSocket:

const websocket = require(\'think-websocket\');
module.exports = [
 // ...
 websocket(think.app),
];

在 src/config/adapter.js 文件中配置 WebSocket

const socketio = require(\'think-websocket-socket.io\');
exports.websocket = {
 type: \'socketio\',
 common: {
 // common config
 },
 socketio: {
 handle: socketio,
 messages: {
 open: \'/websocket/open\', //建立连接时处理对应到 websocket Controller 下的 open Action
 close: \'/websocket/close\', // 关闭连接时处理的 Action
 room: \'/websocket/room\' // room 事件处理的 Action
 }
 }
}

配置中的 messages 对应着事件的映射关系。比如上述的例子,客户端触发 room 事件,服务端需要在 websocket controller 下的 roomAction 中处理消息。

添加 WebSocket 实现

创建处理消息的 controller 文件。上面的配置是 /websocket/xxx ,所以直接在项目根目录 src/controller 下创建 websocket.js 文件。

module.exports = class extends think.Controller {
// this.socket 为发送消息的客户端对应的 socket 实例, this.io 为Socket.io 的一个实例
 constructor(...arg) {
 super(...arg);
 this.io = this.ctx.req.io;
 this.socket = this.ctx.req.websocket;
 }
 async openAction() {
 this.socket.emit(\'open\', \'websocket success\')
 }
 closeAction() {
 this.socket.disconnect(true);
 }
};

这时候服务端代码就已经配置完了。

客户端配置

客户端代码使用比较简单,只需要引入 socket.io.js 就可以直接使用了。

引入后在初始化代码创建 WebSocket 连接:

this.socket = io();
this.socket.on(\'open\', data => {
 console.log(\'open\', data)
})

这样一个最简单的 WebSocket 的 demo 就完成了,打开页面的时候会自动创建一个 WebSocket 连接,创建成功后服务端会触发 open 事件,客户端在监听的 open 事件中会接收到服务端返回的 websocket success 字符串。

接下来我们开始实现一个简单的聊天室。

简易聊天室的实现

从刚才的内容中我们知道每个 WebSocket 连接的创建会有一个 Socket 句柄创建,对应到代码中的 this.socket 变量。所以本质上聊天室人与人的通信可以转换成每个人对应的 Socket 句柄的通信。我只需要找到这个人对应的 Socket 句柄,就能实现给对方发送消息了。

简单来实现我们可以设置一个全局变量来存储连接到服务端的 WebSocket 的一些信息。在 src/bootstrap/global.js 中设置全局变量:

global.$socketChat = {};

然后在 src/bootstrap/worker.js 中引入global.js,使全局变量生效。

require(\'./global\');

然后在服务端 controller 增加 roomAction 和 messageAction , messageAction 用来接收客户端用户的聊天信息,并将信息发送给所有的客户端成员。 roomAction 用来接收客户端进入/离开聊天室的信息。这两个的区别是聊天消息是需要同步到所有的成员所以使用 this.io.emit,聊天室消息是同步到所有除当前客户端外的所有成员所以使用this.socket.broadcast.emit

module.exports = class extends think.Controller {
 constructor(...arg) {
 super(...arg);
 this.io = this.ctx.req.io;
 this.socket = this.ctx.req.websocket;
 global.$socketChat.io = this.io;
 }
 async messageAction() {
 this.io.emit(\'message\', {
 nickname: this.wsData.nickname,
 type: \'message\',
 message: this.wsData.message,
 id: this.socket.id
 })
 }
 async roomAction() {
 global.$socketChat[this.socket.id] = {
 nickname: this.wsData.nickname,
 socket: this.socket
 }
 this.socket.broadcast.emit(\'room\', {
 nickname: this.wsData.nickname,
 type: \'in\',
 id: this.socket.id
 })
 }
 async closeAction() {
 const closeSocket = global.$socketChat[this.socket.id];
 const nickname = closeSocket && closeSocket.nickname;
 this.socket.disconnect(true);
 this.socket.removeAllListeners();
 this.socket.broadcast.emit(\'room\', {
 nickname,
 type: \'out\',
 id: this.socket.id
 })
 delete global.$socketChat[this.socket.id]
 }
}

客户端通过监听服务端 emit 的事件来处理信息

this.socket.on(\'message\', data => {
 // 通过socket的id的对比,判断消息的发送方
 data.isMe = (data.id === this.socket.id);
 this.chatData.push(data);
})
this.socket.on(\'room\', (data) => {
 this.chatData.push(data);
})

通过 emit 服务端对应的 action 来发送消息

this.socket.emit(\'room\', {
 nickname: this.nickname
})
this.socket.emit(\'message\', {
 message: this.chatMsg,
 nickname: this.nickname
})

根据发送/接收消息的type判断消息类型

{{item.nickname}}进入聊天室

{{item.nickname}}离开聊天室

{{item.nickname}}:{{item.message}}

至此一个简单的聊天室就完成了。

基于 ThinkJS 的 WebSocket 通信详解

多节点通信问题

刚才我们说了通信的本质其实是 Socket 句柄查询使用的过程,本质上我们是利用全局变量存储所有的 WebSocket 句柄的方式解决了 WebSocket 连接查找的问题。但是当我们的服务端扩容后,会出现多个服务器都有 WebSocket 连接,这个时候跨节点的 WebSocket 连接查找使用全局变量的方式就无效了。此时我们就就需要换一种方式来实现跨服务器的通信同步,一般有以下几种方式:

消息队列

发送消息不直接执行 emit 事件,而是将消息发送到消息队列中,然后所有的节点对这条消息进行消费。拿到数据后查看接收方的 WebSocket 连接是否在当前节点上,不在的话就忽略这条数据,在的话则执行发送的动作。

节点通信

通过外部存储服务例如 Redis 充当之前的“全局变量”的角色,所有的节点创建 WebSocket 连接后都向 Redis 中注册一下,告诉大家有个叫 “A” 家伙的连接在 “192.168.1.1” 这。当 B 要向 A 发送消息的时候它去 Redis 中查找到 A 的连接所处的节点后,通知 192.168.1.1 这个节点 B 要向 A 发送消息,然后节点会执行发送的动作。

基于 Redis 的节点通信实现

Redis 的 pub/sub 是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。WebSocket 的一个节点接收到消息后,通过 Redis 发布(pub),其他节点作为订阅者(sub)接收消息再进行后续处理。

这次我们将在聊天室的 demo 上实现节点通信的功能。

首先,在 websocket controller 文件中增加接口调用

const ip = require(\'ip\');
const host = ip.address();
module.exports = class extends think.Controller {
 async openAction() {
 // 记录当前 WebSocket 连接到的服务器ip
 await global.rediser.hset(\'-socket-chat\', host, 1);
 }
 emit(action, data) {
 if (action === \'message\') {
 this.io.emit(action, data)
 } else {
 this.socket.broadcast.emit(action, data);
 }
 this.crossSync(action, data)
 }
 async messageAction() {
 const data = {
 nickname: this.wsData.nickname,
 type: \'message\',
 message: this.wsData.message,
 id: this.socket.id
 };
 this.emit(\'message\', data);
 }
 async closeAction() {
 const connectSocketCount = Object.keys(this.io.sockets.connected).length;
 this.crossSync(action, data);
 if (connectSocketCount <= 0) {
 await global.rediser.hdel(\'-socket-chat\', host);
 }
 }
 async crossSync(action, params) {
 const ips = await global.rediser.hkeys(\'-socket-chat\').filter(ip => ip !== host);
 ips.forEach(ip => request({
 method: \'POST\',
 uri: `http://${ip}/api/websocket/sync`,
 form: {
 action,
 data: JSON.stringify(params)
 },
 json: true
 });
 );
 }
}

然后在 src/controller/api/websocket 实现通信接口

const Base = require(\'../base\');
module.exports = class extends Base {
 async syncAction() {
 const {action, data} = this.post();
 const blackApi = [\'room\', \'message\', \'close\', \'open\'];
 if (!blackApi.includes(action)) return this.fail();
 // 由于是跨服务器接口,所以直接使用io.emit发送给当前所有客户端
 const io = global.$socketChat.io;
 io && io.emit(action, JSON.parse(data));
 }
};

这样就实现了跨服务的通信功能,当然这只是一个简单的 demo ,但是基本原理是相同的。

基于 ThinkJS 的 WebSocket 通信详解

socket.io-redis

第二种 Redis (sub/pub) 的方式,socket.io 提供了一种官方的库 socket.io-redis (https://github.com/socketio/Socket.io-redis) 来实现。它在 Redis 的 pub/sub 功能上进行了封装,让开发者可以忽略 Redis 相关的部分,方便了开发者使用。使用时只需要传入 Redis 的配置即可。

// Thinkjs socket.io-redis 配置
const redis = require(\'socket.io-redis\');
exports.websocket = {
 ...
 socketio: {
 adapter: redis({ host: \'localhost\', port: 6379 }),
 message: {
 ...
 }
 }
}
// then controller websocket.js
this.io.emit(\'hi\', \'all sockets\');

HTTP 与 WebSocket 通信

如果想通过非 socket.io 进程向 socket.io 服务通信,例如:HTTP,可以使用官方的 socket.io-emitter 库。使用方式如下:

var io = require(\'socket.io-emitter\')({ host: \'127.0.0.1\', port: 6379 });
setInterval(function(){
 io.emit(\'time\', new Date);
}, 5000);

后 记

整个聊天室的代码已经上传到github,大家可以直接下载体验聊天室示例 (https://github.com/bingomvm/thinkjs-chat)

内容出处:,

声明:本网站所收集的部分公开资料来源于互联网,转载的目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。如果您发现网站上有侵犯您的知识产权的作品,请与我们取得联系,我们会及时修改或删除。文章链接:http://www.yixao.com/procedure/15628.html

发表评论

登录后才能评论