+
80
-

workman gateway worker结构怎么打造大型分布式消息系统?

php

请问workman gateway worker结构怎么打造大型分布式消息系统?

网友回复

+
0
-

GatewayWorker介绍

GatewayWorker是基于Workerman开发的一套TCP长连接的应用框架,实现了单发、群发、广播等接口,内置了mysql类库,GatewayWorker分为Gateway进程和Worker进程,天然支持分布式部署,能够支持庞大的连接数(百万甚至千万连接级别的应用)。可用于开发IM聊天应用、移动通讯、游戏后台、物联网、智能家居后台等等。 GatewayWorker工作原理

1、可以方便的实现客户端之间的通讯 2、Gateway与Worker之间是基于socket长连接通讯,也就是说Gateway、Worker可以部署在不同的服务器上,非常容易实现分布式部署,扩容服务器 3、Gateway进程只负责网络IO,业务实现都在Worker进程上,可以reload Worker进程,实现在不影响用户的情况下完成代码热更新。 工作流程

1、Register、Gateway、BusinessWorker进程启动 2、Gateway、BusinessWorker进程启动后向Register服务进程发起长连接注册自己 3、Register服务收到Gateway的注册后,把所有Gateway的通讯地址保存在内存中 4、Register服务收到BusinessWorker的注册后,把内存中所有的Gateway的通讯地址发给BusinessWorker 5、BusinessWorker进程得到所有的Gateway内部通讯地址后尝试连接Gateway 6、如果运行过程中有新的Gateway服务注册到Register(一般是分布式部署加机器),则将新的Gateway内部通讯地址列表将广播给所有BusinessWorker,BusinessWorker收到后建立连接 7、如果有Gateway下线,则Register服务会收到通知,会将对应的内部通讯地址删除,然后广播新的内部通讯地址列表给所有BusinessWorker,BusinessWorker不再连接下线的Gateway 8、至此Gateway与BusinessWorker通过Register已经建立起长连接 9、客户端的事件及数据全部由Gateway转发给BusinessWorker处理,BusinessWorker默认调用Events.php中的onConnect onMessage onClose处理业务逻辑。 10、BusinessWorker的业务逻辑入口全部在Events.php中,包括onWorkerStart进程启动事件(进程事件)、onConnect连接事件(客户端事件)、onMessage消息事件(客户端事件)、onClose连接关闭事件(客户端事件)、onWorkerStop进程退出事件(进程事件) GatewayWorker  OR  Workerman 如果你的项目是长连接并且需要客户端与客户端之间通讯,建议使用GatewayWorker。 短连接或者不需要客户端与客户端之间通讯的项目建议使用Workerman。 GatewayWorker不支持UDP监听,所以UDP服务请选择Workerman。 如果你是一个有多进程socket编程经验的人,喜欢定制自己的进程模型,可以选择Workerman。 GatewayWorker安装

composer require workerman/gateway-worker 目录结构 ├── BusinessWorker.php ├── Gateway.php ├── Lib │   ├── Context.php │   ├── DbConnection.php │   ├── Db.php │   └── Gateway.php ├── Protocols │   └── GatewayProtocol.php └── Register.php Gateway使用

Gateway类用于初始化Gateway进程。Gateway进程是暴露给客户端的让其连接的进程。所有客户端的请求都是由Gateway接收然后分发给BusinessWorker处理,同样BusinessWorker也会将要发给客户端的响应通过Gateway转发出去。 require_once 'vendor/autoload.php'; use Workerman\Worker; use GatewayWorker\Gateway; 初始化: $gateway = new Gateway('protocol://ip:port’); 支持协议 为应用层协议,目前支持的协议有 1、websocket协议 2、text协议 3、Frame协议 4、自定义通讯协议 5、tcp,直接裸tcp,不推荐 属性 name 和Worker一样,可以设置Gateway进程的名称,方便status命令中查看统计 count 和Worker一样,可以设置Gateway进程的数量,以便充分利用多cpu资源 lanIp lanIp是Gateway所在服务器的内网IP,默认填写127.0.0.1即可。多服务器分布式部署的时候需要填写真实的内网ip,不能填写127.0.0.1。注意:lanIp只能填写真实ip,不能填写域名或者其它字符串,无论如何都不能写0.0.0.0 . startPort Gateway进程启动后会监听一个本机端口,用来给BusinessWorker提供链接服务,然后Gateway与BusinessWorker之间就通过这个连接通讯。这里设置的是Gateway监听本机端口的起始端口。比如启动了4个Gateway进程,startPort为2000,则每个Gateway进程分别启动的本地端口一般为2000、2001、2002、2003。 当本机有多个Gateway/BusinessWorker项目时,需要把每个项目的startPort设置成不同的段 registerAddress,注册服务地址,只写格式类似于 '127.0.0.1:1236’ 回调属性 onWorkerStart 和Worker一样,可以设置Gateway进程启动后的回调函数,一般在这个回调里面初始化一些全局数据 onWorkerStop 和Worker一样,可以设置Gateway进程关闭的回调函数,一般在这个回调里面做数据清理或者保存数据工作 onConnect(比较少用到,开发者一般不用关注) 和Worker一样,可以设置onConnect回调,当有客户端连接上来时触发。与Events::onConnect的区别是Events::onConnect运行在BusinessWorker进程上。Gateway::onConnect是运行在Gateway进程上,无法使用\GatewayWorker\Lib\Gateway类提供的接口 onClose(比较少用到,开发者一般不用关注) 和Worker一样,可以设置onClose回调,当有客户端连接关闭时触发。同样与Events::onClose的区别是Gateway::onClose是运行在Gateway进程上,无法使用\GatewayWorker\Lib\Gateway类提供的接口 BusinessWorker使用 BusinessWorker类其实也是基于基础的Worker开发的。BusinessWorker是运行业务逻辑的进程,BusinessWorker收到Gateway转发来的事件及请求时会默认调用Events.php中的onConnect onMessage onClose方法处理事件及数据,开发者正是通过实现这些回调控制业务及流程。 name 和Worker一样,可以设置BusinessWorker进程的名称,方便status命令中查看统计 count 和Worker一样,可以设置BusinessWorker进程的数量,以便充分利用多cpu资源 registerAddress,注册服务地址,只写格式类似于 '127.0.0.1:1236' eventHandler 设置使用哪个类来处理业务,默认值是Events,即默认使用Events.php中的Events类来处理业务。业务类至少要实现onMessage静态方法,onConnect和onClose静态方法可以不用实现。 onWorkerStart 和Worker一样,可以设置BusinessWorker启动后的回调函数,一般在这个回调里面初始化一些全局数据 onWorkerStop 和Worker一样,可以设置BusinessWorker关闭的回调函数,一般在这个回调里面做数据清理或者保存数据工作 Register Register类其实也是基于基础的Worker开发的。Gateway进程和BusinessWorker进程启动后分别向Register进程注册自己的通讯地址,Gateway进程和BusinessWorker通过Register进程得到通讯地址后,就可以建立起连接并通讯了。 Register类只能定制监听的ip和端口,并且目前只能使用text协议。 use Workerman\Worker; use GatewayWorker\Register; $register = new Register('text://0.0.0.0:1236'); Events eventHandler 设置使用哪个类来处理业务,默认值是Events,即默认使用Events.php中的Events类来处理业务。业务类至少要实现onMessage静态方法,onConnect和onClose静态方法可以不用实现。 onWorkerStart(BusinessWorker $businessWorker); 当businessWorker进程启动时触发。每个进程生命周期内都只会触发一次。$businessworker->onWorkerStart和Event::onWorkerStart不会互相覆盖,如果两个回调都设置则都会运行。 onConnect(string $client_id); 当客户端连接上gateway进程时(TCP三次握手完毕时)触发的回调函数。  $client_id client_id固定为20个字符的字符串,用来全局标记一个socket连接,每个客户端连接都会被分配一个全局唯一的client_id。 onWebSocketConnect(string $client_id, array $data); 当客户端连接上gateway完成websocket握手时触发的回调函数。  $client_id client_id固定为20个字符的字符串,用来全局标记一个socket连接,每个客户端连接都会被分配一个全局唯一的client_id。 $data websocket握手时的http头数据,包含get、server等变量 onWebSocketConnect(string $client_id, array $data); 当客户端连接上gateway完成websocket握手时触发的回调函数。  $client_id client_id固定为20个字符的字符串,用来全局标记一个socket连接,每个客户端连接都会被分配一个全局唯一的client_id。 $data websocket握手时的http头数据,包含get、server等变量 onMessage(string $client_id, mixed $recv_data); 当客户端发来数据(Gateway进程收到数据)后触发的回调函数 $client_id 全局唯一的客户端socket连接标识 $recv_data 完整的客户端请求数据,数据类型取决于Gateway所使用协议的decode方法返的回值类型 onClose(string $client_id); 客户端与Gateway进程的连接断开时触发。不管是客户端主动断开还是服务端主动断开,都会触发这个回调。 onWorkerStop(BusinessWorker $businessWorker); 当businessWorker进程退出时触发。每个进程生命周期内都只会触发一次。 Lib\Gateway Lib\Gateway类是Gateway/BusinessWorker模型中给客户端发送数据的类。 提供了单发、群发以及关闭客户端连接的接口 sendToClient(string $client_id, string $send_data); 向客户端client_id发送$send_data数据 $client_id 客户端连接的client_id $send_data 要发送的数据(字符串类型),此数据会被Gateway所使用协议的encode方法打包后再发送给客户端 closeClient(string $client_id); 断开与client_id对应的客户端的连接 $client_id 全局唯一标识客户端连接的id sendToAll(string $send_data [, array $client_id_array = null [, array $exclude_client_id = null [, bool $raw = false]]]); 向所有客户端或者client_id_array指定的客户端发送$send_data数据 $send_data 要发送的数据(字符串类型),此数据会被Gateway所使用协议的encode方法打包后发送给客户端 $client_id_array 指定向哪些client_id发送,如果不传递该参数,则是向所有在线客户端发送 $send_data 数据 $exclude_client_id client_id组成的数组。$exclude_client_id数组中指定的client_id将被排除在外,不会收到本次发的消息 $raw 是否发送原始数据,一般用不到 getAllClientIdList(void); 获取全局所有在线client_id列表。 getAllClientIdCount(void); 获取当前在线连接总数(多少client_id在线)。 isOnline(string $client_id); 判断$client_id是否还在线,是否在线取决于对应client_id是否触发过onClose回调。 $client_id 客户端的client_id 返回值 在线返回1,不在线返回0 bindUid(string $client_id, mixed $uid); 将client_id与uid绑定,uid泛指用户id或者设备id,用来唯一确定一个客户端用户或者设备 $client_id 客户端的client_id $uid uid,可以是数字或者字符串。 uid与client_id是一对多的关系,一个uid下有多个client_id,client_id下线(连接断开)时会自动执行解绑 sendToUid(mixed $uid, string $message); 向uid绑定的所有在线client_id发送数据。 $uid uid可以是字符串、数字、或者包含uid的数组。如果为数组,则是给数组内所有uid发送数据 $message 要发送的数据(字符串类型),此数据会被Gateway所使用协议的encode方法打包后再发送给客户端 getClientIdByUid(mixed $uid); 返回一个数组,数组元素为与uid绑定的所有在线的client_id getUidByClientId(string $client_id); 返回client_id绑定的uid,如果client_id没有绑定uid,则返回null。 unbindUid(string $client_id, mixed $uid); 将client_id与uid解绑。当client_id下线(连接断开)时会自动与uid解绑 $client_id 客户端的client_id $uid 数字或者字符串 isUidOnline(mixed $uid); 判断$uid是否在线,如果某uid没有通过进行任何绑定,那么对该uid调用将返回0 返回值 uid在线返回1,不在线返回0 getAllUidList(void); 获取全局所有在线uid列表。 getAllUidCount(void); 获取全局所有在线uid数量。 joinGroup(string $client_id, mixed $group); 将client_id加入某个组,以便通过Gateway::sendToGroup发送数据。 $client_id 客户端的client_id $group 只能是数字或者字符串 leaveGroup(string $client_id, mixed $group); 将client_id从某个组中删除 $client_id 客户端的client_id $group 只能是数字或者字符串。 ungroup(mixed $group); 取消分组,或者说解散分组 sendToGroup(mixed $group, string $message [, array $exclude_client_id = null [, bool $raw = false]]) 向某个分组的所有在线client_id发送数据。 $group group可以是字符串、数字、或者数组。如果为数组,则是给数组内所有group发送数据 $message 要发送的数据(字符串类型),此数据会被Gateway所使用协议的encode方法打包后再发送给客户端 $exclude_client_id client_id组成的数组。$exclude_client_id数组中指定的client_id将被排除在外,不会收到本次发的消息 $raw 是否发送原始数据 getAllGroupIdList(void); 获取全局所有在线group id列表。 getClientIdCountByGroup(mixed $group); 获取某分组当前在线成连接数(多少client_id在线)。 getClientIdListByGroup(mixed $group); 获取某个分组所有在线client_id列表。 getUidCountByGroup(mixed $group); 获取某个分组下的在线uid数量。 getUidListByGroup(mixed $group); 获取某个分组所有在线uid列表。 getClientSessionsByGroup(mixed $group); 获取某个分组所有在线client_id信息。 getAllClientSessions(void); 获取当前所有在线client_id信息。 setSession(string $client_id, array $session); 设置某个client_id对应的session updateSession(string $client_id, array $session); 更新某个client_id对应的session getSession(string $client_id); 获取某个client_id对应的session。 配置wss服务 $context = array(     'ssl' => array(         // 请使用绝对路径         'local_cert'                 => '磁盘路径/server.pem', // 也可以是crt文件         'local_pk'                   => '磁盘路径/server.key',         'verify_peer'               => false,             ) ); $gateway = new Gateway("websocket://0.0.0.0:443", $context); $gateway->transport = 'ssl'; ws = new WebSocket("wss://域名"); 1、如果无法启动,则一般是443端口被占用,请改成其它端口。如果必须使用443端口请参考worekrman手册创建wss服务方法二部分。 2、wss端口只能通过wss协议访问,ws无法访问wss端口。 3、证书一般是与域名绑定的,所以测试的时候客户端请使用域名连接,不要使用ip去连。 4、如果出现无法访问的情况,请检查服务器防火墙。 5、此方法要求PHP版本>=5.6,因为微信小程序要求tls1.2,而PHP5.6以下版本不支持tls1.2。 开启进程数

Gateway进程使用的非阻塞式IO通讯,属于CPU密集型业务,Gateway进程数设置成与CPU核数一样

BusinessWorker进程中根据业务是否有阻塞式IO设置进程数为CPU核数的1倍-3倍

我知道答案,我要回答