fix:更新已知bug,优化代码
This commit is contained in:
565
vendor/workerman/gateway-worker/src/BusinessWorker.php
vendored
Normal file
565
vendor/workerman/gateway-worker/src/BusinessWorker.php
vendored
Normal file
@@ -0,0 +1,565 @@
|
||||
<?php
|
||||
/**
|
||||
* This file is part of workerman.
|
||||
*
|
||||
* Licensed under The MIT License
|
||||
* For full copyright and license information, please see the MIT-LICENSE.txt
|
||||
* Redistributions of files must retain the above copyright notice.
|
||||
*
|
||||
* @author walkor<walkor@workerman.net>
|
||||
* @copyright walkor<walkor@workerman.net>
|
||||
* @link http://www.workerman.net/
|
||||
* @license http://www.opensource.org/licenses/mit-license.php MIT License
|
||||
*/
|
||||
namespace GatewayWorker;
|
||||
|
||||
use Workerman\Connection\TcpConnection;
|
||||
|
||||
use Workerman\Worker;
|
||||
use Workerman\Timer;
|
||||
use Workerman\Connection\AsyncTcpConnection;
|
||||
use GatewayWorker\Protocols\GatewayProtocol;
|
||||
use GatewayWorker\Lib\Context;
|
||||
|
||||
/**
|
||||
*
|
||||
* BusinessWorker 用于处理Gateway转发来的数据
|
||||
*
|
||||
* @author walkor<walkor@workerman.net>
|
||||
*
|
||||
*/
|
||||
class BusinessWorker extends Worker
|
||||
{
|
||||
/**
|
||||
* 保存与 gateway 的连接 connection 对象
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
public $gatewayConnections = array();
|
||||
|
||||
/**
|
||||
* 注册中心地址
|
||||
*
|
||||
* @var string|array
|
||||
*/
|
||||
public $registerAddress = '127.0.0.1:1236';
|
||||
|
||||
/**
|
||||
* 事件处理类,默认是 Event 类
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
public $eventHandler = 'Events';
|
||||
|
||||
/**
|
||||
* 业务超时时间,可用来定位程序卡在哪里
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
public $processTimeout = 30;
|
||||
|
||||
/**
|
||||
* 业务超时时间,可用来定位程序卡在哪里
|
||||
*
|
||||
* @var callable
|
||||
*/
|
||||
public $processTimeoutHandler = '\\Workerman\\Worker::log';
|
||||
|
||||
/**
|
||||
* 秘钥
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
public $secretKey = '';
|
||||
|
||||
/**
|
||||
* businessWorker进程将消息转发给gateway进程的发送缓冲区大小
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
public $sendToGatewayBufferSize = 10240000;
|
||||
|
||||
/**
|
||||
* 保存用户设置的 worker 启动回调
|
||||
*
|
||||
* @var callback
|
||||
*/
|
||||
protected $_onWorkerStart = null;
|
||||
|
||||
/**
|
||||
* 保存用户设置的 workerReload 回调
|
||||
*
|
||||
* @var callback
|
||||
*/
|
||||
protected $_onWorkerReload = null;
|
||||
|
||||
/**
|
||||
* 保存用户设置的 workerStop 回调
|
||||
*
|
||||
* @var callback
|
||||
*/
|
||||
protected $_onWorkerStop= null;
|
||||
|
||||
/**
|
||||
* 到注册中心的连接
|
||||
*
|
||||
* @var AsyncTcpConnection
|
||||
*/
|
||||
protected $_registerConnection = null;
|
||||
|
||||
/**
|
||||
* 处于连接状态的 gateway 通讯地址
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $_connectingGatewayAddresses = array();
|
||||
|
||||
/**
|
||||
* 所有 geteway 内部通讯地址
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $_gatewayAddresses = array();
|
||||
|
||||
/**
|
||||
* 等待连接个 gateway 地址
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $_waitingConnectGatewayAddresses = array();
|
||||
|
||||
/**
|
||||
* Event::onConnect 回调
|
||||
*
|
||||
* @var callback
|
||||
*/
|
||||
protected $_eventOnConnect = null;
|
||||
|
||||
/**
|
||||
* Event::onMessage 回调
|
||||
*
|
||||
* @var callback
|
||||
*/
|
||||
protected $_eventOnMessage = null;
|
||||
|
||||
/**
|
||||
* Event::onClose 回调
|
||||
*
|
||||
* @var callback
|
||||
*/
|
||||
protected $_eventOnClose = null;
|
||||
|
||||
/**
|
||||
* websocket回调
|
||||
*
|
||||
* @var null
|
||||
*/
|
||||
protected $_eventOnWebSocketConnect = null;
|
||||
|
||||
/**
|
||||
* SESSION 版本缓存
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $_sessionVersion = array();
|
||||
|
||||
/**
|
||||
* 用于保持长连接的心跳时间间隔
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
const PERSISTENCE_CONNECTION_PING_INTERVAL = 25;
|
||||
|
||||
/**
|
||||
* 构造函数
|
||||
*
|
||||
* @param string $socket_name
|
||||
* @param array $context_option
|
||||
*/
|
||||
public function __construct($socket_name = '', $context_option = array())
|
||||
{
|
||||
parent::__construct($socket_name, $context_option);
|
||||
$backrace = debug_backtrace();
|
||||
$this->_autoloadRootPath = dirname($backrace[0]['file']);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function run()
|
||||
{
|
||||
$this->_onWorkerStart = $this->onWorkerStart;
|
||||
$this->_onWorkerReload = $this->onWorkerReload;
|
||||
$this->_onWorkerStop = $this->onWorkerStop;
|
||||
$this->onWorkerStop = array($this, 'onWorkerStop');
|
||||
$this->onWorkerStart = array($this, 'onWorkerStart');
|
||||
$this->onWorkerReload = array($this, 'onWorkerReload');
|
||||
parent::run();
|
||||
}
|
||||
|
||||
/**
|
||||
* 当进程启动时一些初始化工作
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
protected function onWorkerStart()
|
||||
{
|
||||
if (function_exists('opcache_reset')) {
|
||||
opcache_reset();
|
||||
}
|
||||
|
||||
if (!class_exists('\Protocols\GatewayProtocol')) {
|
||||
class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');
|
||||
}
|
||||
|
||||
if (!is_array($this->registerAddress)) {
|
||||
$this->registerAddress = array($this->registerAddress);
|
||||
}
|
||||
$this->connectToRegister();
|
||||
|
||||
\GatewayWorker\Lib\Gateway::setBusinessWorker($this);
|
||||
\GatewayWorker\Lib\Gateway::$secretKey = $this->secretKey;
|
||||
if ($this->_onWorkerStart) {
|
||||
call_user_func($this->_onWorkerStart, $this);
|
||||
}
|
||||
|
||||
if (is_callable($this->eventHandler . '::onWorkerStart')) {
|
||||
call_user_func($this->eventHandler . '::onWorkerStart', $this);
|
||||
}
|
||||
|
||||
if (function_exists('pcntl_signal')) {
|
||||
// 业务超时信号处理
|
||||
pcntl_signal(SIGALRM, array($this, 'timeoutHandler'), false);
|
||||
} else {
|
||||
$this->processTimeout = 0;
|
||||
}
|
||||
|
||||
// 设置回调
|
||||
if (is_callable($this->eventHandler . '::onConnect')) {
|
||||
$this->_eventOnConnect = $this->eventHandler . '::onConnect';
|
||||
}
|
||||
|
||||
if (is_callable($this->eventHandler . '::onMessage')) {
|
||||
$this->_eventOnMessage = $this->eventHandler . '::onMessage';
|
||||
} else {
|
||||
echo "Waring: {$this->eventHandler}::onMessage is not callable\n";
|
||||
}
|
||||
|
||||
if (is_callable($this->eventHandler . '::onClose')) {
|
||||
$this->_eventOnClose = $this->eventHandler . '::onClose';
|
||||
}
|
||||
|
||||
if (is_callable($this->eventHandler . '::onWebSocketConnect')) {
|
||||
$this->_eventOnWebSocketConnect = $this->eventHandler . '::onWebSocketConnect';
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* onWorkerReload 回调
|
||||
*
|
||||
* @param Worker $worker
|
||||
*/
|
||||
protected function onWorkerReload($worker)
|
||||
{
|
||||
// 防止进程立刻退出
|
||||
$worker->reloadable = false;
|
||||
// 延迟 0.05 秒退出,避免 BusinessWorker 瞬间全部退出导致没有可用的 BusinessWorker 进程
|
||||
Timer::add(0.05, array('Workerman\Worker', 'stopAll'));
|
||||
// 执行用户定义的 onWorkerReload 回调
|
||||
if ($this->_onWorkerReload) {
|
||||
call_user_func($this->_onWorkerReload, $this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 当进程关闭时一些清理工作
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
protected function onWorkerStop()
|
||||
{
|
||||
if ($this->_onWorkerStop) {
|
||||
call_user_func($this->_onWorkerStop, $this);
|
||||
}
|
||||
if (is_callable($this->eventHandler . '::onWorkerStop')) {
|
||||
call_user_func($this->eventHandler . '::onWorkerStop', $this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接服务注册中心
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function connectToRegister()
|
||||
{
|
||||
foreach ($this->registerAddress as $register_address) {
|
||||
$register_connection = new AsyncTcpConnection("text://{$register_address}");
|
||||
$secret_key = $this->secretKey;
|
||||
$register_connection->onConnect = function () use ($register_connection, $secret_key, $register_address) {
|
||||
$register_connection->send('{"event":"worker_connect","secret_key":"' . $secret_key . '"}');
|
||||
// 如果Register服务器不在本地服务器,则需要保持心跳
|
||||
if (strpos($register_address, '127.0.0.1') !== 0) {
|
||||
$register_connection->ping_timer = Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, function () use ($register_connection) {
|
||||
$register_connection->send('{"event":"ping"}');
|
||||
});
|
||||
}
|
||||
};
|
||||
$register_connection->onClose = function ($register_connection) {
|
||||
if(!empty($register_connection->ping_timer)) {
|
||||
Timer::del($register_connection->ping_timer);
|
||||
}
|
||||
$register_connection->reconnect(1);
|
||||
};
|
||||
$register_connection->onMessage = array($this, 'onRegisterConnectionMessage');
|
||||
$register_connection->connect();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 当注册中心发来消息时
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function onRegisterConnectionMessage($register_connection, $data)
|
||||
{
|
||||
$data = json_decode($data, true);
|
||||
if (!isset($data['event'])) {
|
||||
echo "Received bad data from Register\n";
|
||||
return;
|
||||
}
|
||||
$event = $data['event'];
|
||||
switch ($event) {
|
||||
case 'broadcast_addresses':
|
||||
if (!is_array($data['addresses'])) {
|
||||
echo "Received bad data from Register. Addresses empty\n";
|
||||
return;
|
||||
}
|
||||
$addresses = $data['addresses'];
|
||||
$this->_gatewayAddresses = array();
|
||||
foreach ($addresses as $addr) {
|
||||
$this->_gatewayAddresses[$addr] = $addr;
|
||||
}
|
||||
$this->checkGatewayConnections($addresses);
|
||||
break;
|
||||
default:
|
||||
echo "Receive bad event:$event from Register.\n";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 当 gateway 转发来数据时
|
||||
*
|
||||
* @param TcpConnection $connection
|
||||
* @param mixed $data
|
||||
*/
|
||||
public function onGatewayMessage($connection, $data)
|
||||
{
|
||||
$cmd = $data['cmd'];
|
||||
if ($cmd === GatewayProtocol::CMD_PING) {
|
||||
return;
|
||||
}
|
||||
// 上下文数据
|
||||
Context::$client_ip = $data['client_ip'];
|
||||
Context::$client_port = $data['client_port'];
|
||||
Context::$local_ip = $data['local_ip'];
|
||||
Context::$local_port = $data['local_port'];
|
||||
Context::$connection_id = $data['connection_id'];
|
||||
Context::$client_id = Context::addressToClientId($data['local_ip'], $data['local_port'],
|
||||
$data['connection_id']);
|
||||
// $_SERVER 变量
|
||||
$_SERVER = array(
|
||||
'REMOTE_ADDR' => long2ip($data['client_ip']),
|
||||
'REMOTE_PORT' => $data['client_port'],
|
||||
'GATEWAY_ADDR' => long2ip($data['local_ip']),
|
||||
'GATEWAY_PORT' => $data['gateway_port'],
|
||||
'GATEWAY_CLIENT_ID' => Context::$client_id,
|
||||
);
|
||||
// 检查session版本,如果是过期的session数据则拉取最新的数据
|
||||
if ($cmd !== GatewayProtocol::CMD_ON_CLOSE && isset($this->_sessionVersion[Context::$client_id]) && $this->_sessionVersion[Context::$client_id] !== crc32($data['ext_data'])) {
|
||||
$_SESSION = Context::$old_session = \GatewayWorker\Lib\Gateway::getSession(Context::$client_id);
|
||||
$this->_sessionVersion[Context::$client_id] = crc32($data['ext_data']);
|
||||
} else {
|
||||
if (!isset($this->_sessionVersion[Context::$client_id])) {
|
||||
$this->_sessionVersion[Context::$client_id] = crc32($data['ext_data']);
|
||||
}
|
||||
// 尝试解析 session
|
||||
if ($data['ext_data'] != '') {
|
||||
Context::$old_session = $_SESSION = Context::sessionDecode($data['ext_data']);
|
||||
} else {
|
||||
Context::$old_session = $_SESSION = null;
|
||||
}
|
||||
}
|
||||
|
||||
if ($this->processTimeout) {
|
||||
pcntl_alarm($this->processTimeout);
|
||||
}
|
||||
// 尝试执行 Event::onConnection、Event::onMessage、Event::onClose
|
||||
switch ($cmd) {
|
||||
case GatewayProtocol::CMD_ON_CONNECT:
|
||||
if ($this->_eventOnConnect) {
|
||||
call_user_func($this->_eventOnConnect, Context::$client_id);
|
||||
}
|
||||
break;
|
||||
case GatewayProtocol::CMD_ON_MESSAGE:
|
||||
if ($this->_eventOnMessage) {
|
||||
call_user_func($this->_eventOnMessage, Context::$client_id, $data['body']);
|
||||
}
|
||||
break;
|
||||
case GatewayProtocol::CMD_ON_CLOSE:
|
||||
unset($this->_sessionVersion[Context::$client_id]);
|
||||
if ($this->_eventOnClose) {
|
||||
call_user_func($this->_eventOnClose, Context::$client_id);
|
||||
}
|
||||
break;
|
||||
case GatewayProtocol::CMD_ON_WEBSOCKET_CONNECT:
|
||||
if ($this->_eventOnWebSocketConnect) {
|
||||
call_user_func($this->_eventOnWebSocketConnect, Context::$client_id, $data['body']);
|
||||
}
|
||||
break;
|
||||
}
|
||||
if ($this->processTimeout) {
|
||||
pcntl_alarm(0);
|
||||
}
|
||||
|
||||
// session 必须是数组
|
||||
if ($_SESSION !== null && !is_array($_SESSION)) {
|
||||
throw new \Exception('$_SESSION must be an array. But $_SESSION=' . var_export($_SESSION, true) . ' is not array.');
|
||||
}
|
||||
|
||||
// 判断 session 是否被更改
|
||||
if ($_SESSION !== Context::$old_session && $cmd !== GatewayProtocol::CMD_ON_CLOSE) {
|
||||
$session_str_now = $_SESSION !== null ? Context::sessionEncode($_SESSION) : '';
|
||||
\GatewayWorker\Lib\Gateway::setSocketSession(Context::$client_id, $session_str_now);
|
||||
$this->_sessionVersion[Context::$client_id] = crc32($session_str_now);
|
||||
}
|
||||
|
||||
Context::clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* 当与 Gateway 的连接断开时触发
|
||||
*
|
||||
* @param TcpConnection $connection
|
||||
* @return void
|
||||
*/
|
||||
public function onGatewayClose($connection)
|
||||
{
|
||||
$addr = $connection->remoteAddress;
|
||||
unset($this->gatewayConnections[$addr], $this->_connectingGatewayAddresses[$addr]);
|
||||
if (isset($this->_gatewayAddresses[$addr]) && !isset($this->_waitingConnectGatewayAddresses[$addr])) {
|
||||
Timer::add(1, array($this, 'tryToConnectGateway'), array($addr), false);
|
||||
$this->_waitingConnectGatewayAddresses[$addr] = $addr;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 尝试连接 Gateway 内部通讯地址
|
||||
*
|
||||
* @param string $addr
|
||||
*/
|
||||
public function tryToConnectGateway($addr)
|
||||
{
|
||||
if (!isset($this->gatewayConnections[$addr]) && !isset($this->_connectingGatewayAddresses[$addr]) && isset($this->_gatewayAddresses[$addr])) {
|
||||
$gateway_connection = new AsyncTcpConnection("GatewayProtocol://$addr");
|
||||
$gateway_connection->remoteAddress = $addr;
|
||||
$gateway_connection->onConnect = array($this, 'onConnectGateway');
|
||||
$gateway_connection->onMessage = array($this, 'onGatewayMessage');
|
||||
$gateway_connection->onClose = array($this, 'onGatewayClose');
|
||||
$gateway_connection->onError = array($this, 'onGatewayError');
|
||||
$gateway_connection->maxSendBufferSize = $this->sendToGatewayBufferSize;
|
||||
if (TcpConnection::$defaultMaxSendBufferSize == $gateway_connection->maxSendBufferSize) {
|
||||
$gateway_connection->maxSendBufferSize = 50 * 1024 * 1024;
|
||||
}
|
||||
$gateway_data = GatewayProtocol::$empty;
|
||||
$gateway_data['cmd'] = GatewayProtocol::CMD_WORKER_CONNECT;
|
||||
$gateway_data['body'] = json_encode(array(
|
||||
'worker_key' =>"{$this->name}:{$this->id}",
|
||||
'secret_key' => $this->secretKey,
|
||||
));
|
||||
$gateway_connection->send($gateway_data);
|
||||
$gateway_connection->connect();
|
||||
$this->_connectingGatewayAddresses[$addr] = $addr;
|
||||
}
|
||||
unset($this->_waitingConnectGatewayAddresses[$addr]);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查 gateway 的通信端口是否都已经连
|
||||
* 如果有未连接的端口,则尝试连接
|
||||
*
|
||||
* @param array $addresses_list
|
||||
*/
|
||||
public function checkGatewayConnections($addresses_list)
|
||||
{
|
||||
if (empty($addresses_list)) {
|
||||
return;
|
||||
}
|
||||
foreach ($addresses_list as $addr) {
|
||||
if (!isset($this->_waitingConnectGatewayAddresses[$addr])) {
|
||||
$this->tryToConnectGateway($addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 当连接上 gateway 的通讯端口时触发
|
||||
* 将连接 connection 对象保存起来
|
||||
*
|
||||
* @param TcpConnection $connection
|
||||
* @return void
|
||||
*/
|
||||
public function onConnectGateway($connection)
|
||||
{
|
||||
$this->gatewayConnections[$connection->remoteAddress] = $connection;
|
||||
unset($this->_connectingGatewayAddresses[$connection->remoteAddress], $this->_waitingConnectGatewayAddresses[$connection->remoteAddress]);
|
||||
}
|
||||
|
||||
/**
|
||||
* 当与 gateway 的连接出现错误时触发
|
||||
*
|
||||
* @param TcpConnection $connection
|
||||
* @param int $error_no
|
||||
* @param string $error_msg
|
||||
*/
|
||||
public function onGatewayError($connection, $error_no, $error_msg)
|
||||
{
|
||||
echo "GatewayConnection Error : $error_no ,$error_msg\n";
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有 Gateway 内部通讯地址
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function getAllGatewayAddresses()
|
||||
{
|
||||
return $this->_gatewayAddresses;
|
||||
}
|
||||
|
||||
/**
|
||||
* 业务超时回调
|
||||
*
|
||||
* @param int $signal
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function timeoutHandler($signal)
|
||||
{
|
||||
switch ($signal) {
|
||||
// 超时时钟
|
||||
case SIGALRM:
|
||||
// 超时异常
|
||||
$e = new \Exception("process_timeout", 506);
|
||||
$trace_str = $e->getTraceAsString();
|
||||
// 去掉第一行timeoutHandler的调用栈
|
||||
$trace_str = $e->getMessage() . ":\n" . substr($trace_str, strpos($trace_str, "\n") + 1) . "\n";
|
||||
// 开发者没有设置超时处理函数,或者超时处理函数返回空则执行退出
|
||||
if (!$this->processTimeoutHandler || !call_user_func($this->processTimeoutHandler, $trace_str, $e)) {
|
||||
Worker::stopAll();
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
1063
vendor/workerman/gateway-worker/src/Gateway.php
vendored
Normal file
1063
vendor/workerman/gateway-worker/src/Gateway.php
vendored
Normal file
File diff suppressed because it is too large
Load Diff
136
vendor/workerman/gateway-worker/src/Lib/Context.php
vendored
Normal file
136
vendor/workerman/gateway-worker/src/Lib/Context.php
vendored
Normal file
@@ -0,0 +1,136 @@
|
||||
<?php
|
||||
/**
|
||||
* This file is part of workerman.
|
||||
*
|
||||
* Licensed under The MIT License
|
||||
* For full copyright and license information, please see the MIT-LICENSE.txt
|
||||
* Redistributions of files must retain the above copyright notice.
|
||||
*
|
||||
* @author walkor<walkor@workerman.net>
|
||||
* @copyright walkor<walkor@workerman.net>
|
||||
* @link http://www.workerman.net/
|
||||
* @license http://www.opensource.org/licenses/mit-license.php MIT License
|
||||
*/
|
||||
namespace GatewayWorker\Lib;
|
||||
|
||||
use Exception;
|
||||
|
||||
/**
|
||||
* 上下文 包含当前用户 uid, 内部通信 local_ip local_port socket_id,以及客户端 client_ip client_port
|
||||
*/
|
||||
class Context
|
||||
{
|
||||
/**
|
||||
* 内部通讯 id
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
public static $local_ip;
|
||||
|
||||
/**
|
||||
* 内部通讯端口
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
public static $local_port;
|
||||
|
||||
/**
|
||||
* 客户端 ip
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
public static $client_ip;
|
||||
|
||||
/**
|
||||
* 客户端端口
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
public static $client_port;
|
||||
|
||||
/**
|
||||
* client_id
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
public static $client_id;
|
||||
|
||||
/**
|
||||
* 连接 connection->id
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
public static $connection_id;
|
||||
|
||||
/**
|
||||
* 旧的session
|
||||
*
|
||||
* @var string
|
||||
*/
|
||||
public static $old_session;
|
||||
|
||||
/**
|
||||
* 编码 session
|
||||
*
|
||||
* @param mixed $session_data
|
||||
* @return string
|
||||
*/
|
||||
public static function sessionEncode($session_data = '')
|
||||
{
|
||||
if ($session_data !== '') {
|
||||
return serialize($session_data);
|
||||
}
|
||||
return '';
|
||||
}
|
||||
|
||||
/**
|
||||
* 解码 session
|
||||
*
|
||||
* @param string $session_buffer
|
||||
* @return mixed
|
||||
*/
|
||||
public static function sessionDecode($session_buffer)
|
||||
{
|
||||
return unserialize($session_buffer);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清除上下文
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public static function clear()
|
||||
{
|
||||
self::$local_ip = self::$local_port = self::$client_ip = self::$client_port =
|
||||
self::$client_id = self::$connection_id = self::$old_session = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 通讯地址到 client_id 的转换
|
||||
*
|
||||
* @param int $local_ip
|
||||
* @param int $local_port
|
||||
* @param int $connection_id
|
||||
* @return string
|
||||
*/
|
||||
public static function addressToClientId($local_ip, $local_port, $connection_id)
|
||||
{
|
||||
return bin2hex(pack('NnN', $local_ip, $local_port, $connection_id));
|
||||
}
|
||||
|
||||
/**
|
||||
* client_id 到通讯地址的转换
|
||||
*
|
||||
* @param string $client_id
|
||||
* @return array
|
||||
* @throws Exception
|
||||
*/
|
||||
public static function clientIdToAddress($client_id)
|
||||
{
|
||||
if (strlen($client_id) !== 20) {
|
||||
echo new Exception("client_id $client_id is invalid");
|
||||
return false;
|
||||
}
|
||||
return unpack('Nlocal_ip/nlocal_port/Nconnection_id', pack('H*', $client_id));
|
||||
}
|
||||
}
|
||||
76
vendor/workerman/gateway-worker/src/Lib/Db.php
vendored
Normal file
76
vendor/workerman/gateway-worker/src/Lib/Db.php
vendored
Normal file
@@ -0,0 +1,76 @@
|
||||
<?php
|
||||
/**
|
||||
* This file is part of workerman.
|
||||
*
|
||||
* Licensed under The MIT License
|
||||
* For full copyright and license information, please see the MIT-LICENSE.txt
|
||||
* Redistributions of files must retain the above copyright notice.
|
||||
*
|
||||
* @author walkor<walkor@workerman.net>
|
||||
* @copyright walkor<walkor@workerman.net>
|
||||
* @link http://www.workerman.net/
|
||||
* @license http://www.opensource.org/licenses/mit-license.php MIT License
|
||||
*/
|
||||
namespace GatewayWorker\Lib;
|
||||
|
||||
use Config\Db as DbConfig;
|
||||
use Exception;
|
||||
|
||||
/**
|
||||
* 数据库类
|
||||
*/
|
||||
class Db
|
||||
{
|
||||
/**
|
||||
* 实例数组
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected static $instance = array();
|
||||
|
||||
/**
|
||||
* 获取实例
|
||||
*
|
||||
* @param string $config_name
|
||||
* @return DbConnection
|
||||
* @throws Exception
|
||||
*/
|
||||
public static function instance($config_name)
|
||||
{
|
||||
if (!isset(DbConfig::$$config_name)) {
|
||||
echo "\\Config\\Db::$config_name not set\n";
|
||||
throw new Exception("\\Config\\Db::$config_name not set\n");
|
||||
}
|
||||
|
||||
if (empty(self::$instance[$config_name])) {
|
||||
$config = DbConfig::$$config_name;
|
||||
self::$instance[$config_name] = new DbConnection($config['host'], $config['port'],
|
||||
$config['user'], $config['password'], $config['dbname'],$config['charset']);
|
||||
}
|
||||
return self::$instance[$config_name];
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭数据库实例
|
||||
*
|
||||
* @param string $config_name
|
||||
*/
|
||||
public static function close($config_name)
|
||||
{
|
||||
if (isset(self::$instance[$config_name])) {
|
||||
self::$instance[$config_name]->closeConnection();
|
||||
self::$instance[$config_name] = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭所有数据库实例
|
||||
*/
|
||||
public static function closeAll()
|
||||
{
|
||||
foreach (self::$instance as $connection) {
|
||||
$connection->closeConnection();
|
||||
}
|
||||
self::$instance = array();
|
||||
}
|
||||
}
|
||||
1979
vendor/workerman/gateway-worker/src/Lib/DbConnection.php
vendored
Normal file
1979
vendor/workerman/gateway-worker/src/Lib/DbConnection.php
vendored
Normal file
File diff suppressed because it is too large
Load Diff
1380
vendor/workerman/gateway-worker/src/Lib/Gateway.php
vendored
Normal file
1380
vendor/workerman/gateway-worker/src/Lib/Gateway.php
vendored
Normal file
File diff suppressed because it is too large
Load Diff
216
vendor/workerman/gateway-worker/src/Protocols/GatewayProtocol.php
vendored
Normal file
216
vendor/workerman/gateway-worker/src/Protocols/GatewayProtocol.php
vendored
Normal file
@@ -0,0 +1,216 @@
|
||||
<?php
|
||||
/**
|
||||
* This file is part of workerman.
|
||||
*
|
||||
* Licensed under The MIT License
|
||||
* For full copyright and license information, please see the MIT-LICENSE.txt
|
||||
* Redistributions of files must retain the above copyright notice.
|
||||
*
|
||||
* @author walkor<walkor@workerman.net>
|
||||
* @copyright walkor<walkor@workerman.net>
|
||||
* @link http://www.workerman.net/
|
||||
* @license http://www.opensource.org/licenses/mit-license.php MIT License
|
||||
*/
|
||||
namespace GatewayWorker\Protocols;
|
||||
|
||||
/**
|
||||
* Gateway 与 Worker 间通讯的二进制协议
|
||||
*
|
||||
* struct GatewayProtocol
|
||||
* {
|
||||
* unsigned int pack_len,
|
||||
* unsigned char cmd,//命令字
|
||||
* unsigned int local_ip,
|
||||
* unsigned short local_port,
|
||||
* unsigned int client_ip,
|
||||
* unsigned short client_port,
|
||||
* unsigned int connection_id,
|
||||
* unsigned char flag,
|
||||
* unsigned short gateway_port,
|
||||
* unsigned int ext_len,
|
||||
* char[ext_len] ext_data,
|
||||
* char[pack_length-HEAD_LEN] body//包体
|
||||
* }
|
||||
* NCNnNnNCnN
|
||||
*/
|
||||
class GatewayProtocol
|
||||
{
|
||||
// 发给worker,gateway有一个新的连接
|
||||
const CMD_ON_CONNECT = 1;
|
||||
|
||||
// 发给worker的,客户端有消息
|
||||
const CMD_ON_MESSAGE = 3;
|
||||
|
||||
// 发给worker上的关闭链接事件
|
||||
const CMD_ON_CLOSE = 4;
|
||||
|
||||
// 发给gateway的向单个用户发送数据
|
||||
const CMD_SEND_TO_ONE = 5;
|
||||
|
||||
// 发给gateway的向所有用户发送数据
|
||||
const CMD_SEND_TO_ALL = 6;
|
||||
|
||||
// 发给gateway的踢出用户
|
||||
// 1、如果有待发消息,将在发送完后立即销毁用户连接
|
||||
// 2、如果无待发消息,将立即销毁用户连接
|
||||
const CMD_KICK = 7;
|
||||
|
||||
// 发给gateway的立即销毁用户连接
|
||||
const CMD_DESTROY = 8;
|
||||
|
||||
// 发给gateway,通知用户session更新
|
||||
const CMD_UPDATE_SESSION = 9;
|
||||
|
||||
// 获取在线状态
|
||||
const CMD_GET_ALL_CLIENT_SESSIONS = 10;
|
||||
|
||||
// 判断是否在线
|
||||
const CMD_IS_ONLINE = 11;
|
||||
|
||||
// client_id绑定到uid
|
||||
const CMD_BIND_UID = 12;
|
||||
|
||||
// 解绑
|
||||
const CMD_UNBIND_UID = 13;
|
||||
|
||||
// 向uid发送数据
|
||||
const CMD_SEND_TO_UID = 14;
|
||||
|
||||
// 根据uid获取绑定的clientid
|
||||
const CMD_GET_CLIENT_ID_BY_UID = 15;
|
||||
|
||||
// 加入组
|
||||
const CMD_JOIN_GROUP = 20;
|
||||
|
||||
// 离开组
|
||||
const CMD_LEAVE_GROUP = 21;
|
||||
|
||||
// 向组成员发消息
|
||||
const CMD_SEND_TO_GROUP = 22;
|
||||
|
||||
// 获取组成员
|
||||
const CMD_GET_CLIENT_SESSIONS_BY_GROUP = 23;
|
||||
|
||||
// 获取组在线连接数
|
||||
const CMD_GET_CLIENT_COUNT_BY_GROUP = 24;
|
||||
|
||||
// 按照条件查找
|
||||
const CMD_SELECT = 25;
|
||||
|
||||
// 获取在线的群组ID
|
||||
const CMD_GET_GROUP_ID_LIST = 26;
|
||||
|
||||
// 取消分组
|
||||
const CMD_UNGROUP = 27;
|
||||
|
||||
// worker连接gateway事件
|
||||
const CMD_WORKER_CONNECT = 200;
|
||||
|
||||
// 心跳
|
||||
const CMD_PING = 201;
|
||||
|
||||
// GatewayClient连接gateway事件
|
||||
const CMD_GATEWAY_CLIENT_CONNECT = 202;
|
||||
|
||||
// 根据client_id获取session
|
||||
const CMD_GET_SESSION_BY_CLIENT_ID = 203;
|
||||
|
||||
// 发给gateway,覆盖session
|
||||
const CMD_SET_SESSION = 204;
|
||||
|
||||
// 当websocket握手时触发,只有websocket协议支持此命令字
|
||||
const CMD_ON_WEBSOCKET_CONNECT = 205;
|
||||
|
||||
// 包体是标量
|
||||
const FLAG_BODY_IS_SCALAR = 0x01;
|
||||
|
||||
// 通知gateway在send时不调用协议encode方法,在广播组播时提升性能
|
||||
const FLAG_NOT_CALL_ENCODE = 0x02;
|
||||
|
||||
/**
|
||||
* 包头长度
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
const HEAD_LEN = 28;
|
||||
|
||||
public static $empty = array(
|
||||
'cmd' => 0,
|
||||
'local_ip' => 0,
|
||||
'local_port' => 0,
|
||||
'client_ip' => 0,
|
||||
'client_port' => 0,
|
||||
'connection_id' => 0,
|
||||
'flag' => 0,
|
||||
'gateway_port' => 0,
|
||||
'ext_data' => '',
|
||||
'body' => '',
|
||||
);
|
||||
|
||||
/**
|
||||
* 返回包长度
|
||||
*
|
||||
* @param string $buffer
|
||||
* @return int return current package length
|
||||
*/
|
||||
public static function input($buffer)
|
||||
{
|
||||
if (strlen($buffer) < self::HEAD_LEN) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
$data = unpack("Npack_len", $buffer);
|
||||
return $data['pack_len'];
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取整个包的 buffer
|
||||
*
|
||||
* @param mixed $data
|
||||
* @return string
|
||||
*/
|
||||
public static function encode($data)
|
||||
{
|
||||
$flag = (int)is_scalar($data['body']);
|
||||
if (!$flag) {
|
||||
$data['body'] = serialize($data['body']);
|
||||
}
|
||||
$data['flag'] |= $flag;
|
||||
$ext_len = strlen($data['ext_data']);
|
||||
$package_len = self::HEAD_LEN + $ext_len + strlen($data['body']);
|
||||
return pack("NCNnNnNCnN", $package_len,
|
||||
$data['cmd'], $data['local_ip'],
|
||||
$data['local_port'], $data['client_ip'],
|
||||
$data['client_port'], $data['connection_id'],
|
||||
$data['flag'], $data['gateway_port'],
|
||||
$ext_len) . $data['ext_data'] . $data['body'];
|
||||
}
|
||||
|
||||
/**
|
||||
* 从二进制数据转换为数组
|
||||
*
|
||||
* @param string $buffer
|
||||
* @return array
|
||||
*/
|
||||
public static function decode($buffer)
|
||||
{
|
||||
$data = unpack("Npack_len/Ccmd/Nlocal_ip/nlocal_port/Nclient_ip/nclient_port/Nconnection_id/Cflag/ngateway_port/Next_len",
|
||||
$buffer);
|
||||
if ($data['ext_len'] > 0) {
|
||||
$data['ext_data'] = substr($buffer, self::HEAD_LEN, $data['ext_len']);
|
||||
if ($data['flag'] & self::FLAG_BODY_IS_SCALAR) {
|
||||
$data['body'] = substr($buffer, self::HEAD_LEN + $data['ext_len']);
|
||||
} else {
|
||||
$data['body'] = unserialize(substr($buffer, self::HEAD_LEN + $data['ext_len']));
|
||||
}
|
||||
} else {
|
||||
$data['ext_data'] = '';
|
||||
if ($data['flag'] & self::FLAG_BODY_IS_SCALAR) {
|
||||
$data['body'] = substr($buffer, self::HEAD_LEN);
|
||||
} else {
|
||||
$data['body'] = unserialize(substr($buffer, self::HEAD_LEN));
|
||||
}
|
||||
}
|
||||
return $data;
|
||||
}
|
||||
}
|
||||
193
vendor/workerman/gateway-worker/src/Register.php
vendored
Normal file
193
vendor/workerman/gateway-worker/src/Register.php
vendored
Normal file
@@ -0,0 +1,193 @@
|
||||
<?php
|
||||
/**
|
||||
* This file is part of workerman.
|
||||
*
|
||||
* Licensed under The MIT License
|
||||
* For full copyright and license information, please see the MIT-LICENSE.txt
|
||||
* Redistributions of files must retain the above copyright notice.
|
||||
*
|
||||
* @author walkor<walkor@workerman.net>
|
||||
* @copyright walkor<walkor@workerman.net>
|
||||
* @link http://www.workerman.net/
|
||||
* @license http://www.opensource.org/licenses/mit-license.php MIT License
|
||||
*/
|
||||
namespace GatewayWorker;
|
||||
|
||||
use Workerman\Worker;
|
||||
use Workerman\Timer;
|
||||
|
||||
/**
|
||||
*
|
||||
* 注册中心,用于注册 Gateway 和 BusinessWorker
|
||||
*
|
||||
* @author walkor<walkor@workerman.net>
|
||||
*
|
||||
*/
|
||||
class Register extends Worker
|
||||
{
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public $name = 'Register';
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public $reloadable = false;
|
||||
|
||||
/**
|
||||
* 秘钥
|
||||
* @var string
|
||||
*/
|
||||
public $secretKey = '';
|
||||
|
||||
/**
|
||||
* 所有 gateway 的连接
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $_gatewayConnections = array();
|
||||
|
||||
/**
|
||||
* 所有 worker 的连接
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $_workerConnections = array();
|
||||
|
||||
/**
|
||||
* 进程启动时间
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
protected $_startTime = 0;
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function run()
|
||||
{
|
||||
// 设置 onMessage 连接回调
|
||||
$this->onConnect = array($this, 'onConnect');
|
||||
|
||||
// 设置 onMessage 回调
|
||||
$this->onMessage = array($this, 'onMessage');
|
||||
|
||||
// 设置 onClose 回调
|
||||
$this->onClose = array($this, 'onClose');
|
||||
|
||||
// 记录进程启动的时间
|
||||
$this->_startTime = time();
|
||||
|
||||
// 强制使用text协议
|
||||
$this->protocol = '\Workerman\Protocols\Text';
|
||||
|
||||
// reusePort
|
||||
$this->reusePort = false;
|
||||
|
||||
// 运行父方法
|
||||
parent::run();
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置个定时器,将未及时发送验证的连接关闭
|
||||
*
|
||||
* @param \Workerman\Connection\ConnectionInterface $connection
|
||||
* @return void
|
||||
*/
|
||||
public function onConnect($connection)
|
||||
{
|
||||
$connection->timeout_timerid = Timer::add(10, function () use ($connection) {
|
||||
Worker::log("Register auth timeout (".$connection->getRemoteIp()."). See http://doc2.workerman.net/register-auth-timeout.html");
|
||||
$connection->close();
|
||||
}, null, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置消息回调
|
||||
*
|
||||
* @param \Workerman\Connection\ConnectionInterface $connection
|
||||
* @param string $buffer
|
||||
* @return void
|
||||
*/
|
||||
public function onMessage($connection, $buffer)
|
||||
{
|
||||
// 删除定时器
|
||||
Timer::del($connection->timeout_timerid);
|
||||
$data = @json_decode($buffer, true);
|
||||
if (empty($data['event'])) {
|
||||
$error = "Bad request for Register service. Request info(IP:".$connection->getRemoteIp().", Request Buffer:$buffer). See http://doc2.workerman.net/register-auth-timeout.html";
|
||||
Worker::log($error);
|
||||
return $connection->close($error);
|
||||
}
|
||||
$event = $data['event'];
|
||||
$secret_key = isset($data['secret_key']) ? $data['secret_key'] : '';
|
||||
// 开始验证
|
||||
switch ($event) {
|
||||
// 是 gateway 连接
|
||||
case 'gateway_connect':
|
||||
if (empty($data['address'])) {
|
||||
echo "address not found\n";
|
||||
return $connection->close();
|
||||
}
|
||||
if ($secret_key !== $this->secretKey) {
|
||||
Worker::log("Register: Key does not match ".var_export($secret_key, true)." !== ".var_export($this->secretKey, true));
|
||||
return $connection->close();
|
||||
}
|
||||
$this->_gatewayConnections[$connection->id] = $data['address'];
|
||||
$this->broadcastAddresses();
|
||||
break;
|
||||
// 是 worker 连接
|
||||
case 'worker_connect':
|
||||
if ($secret_key !== $this->secretKey) {
|
||||
Worker::log("Register: Key does not match ".var_export($secret_key, true)." !== ".var_export($this->secretKey, true));
|
||||
return $connection->close();
|
||||
}
|
||||
$this->_workerConnections[$connection->id] = $connection;
|
||||
$this->broadcastAddresses($connection);
|
||||
break;
|
||||
case 'ping':
|
||||
break;
|
||||
default:
|
||||
Worker::log("Register unknown event:$event IP: ".$connection->getRemoteIp()." Buffer:$buffer. See http://doc2.workerman.net/register-auth-timeout.html");
|
||||
$connection->close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接关闭时
|
||||
*
|
||||
* @param \Workerman\Connection\ConnectionInterface $connection
|
||||
*/
|
||||
public function onClose($connection)
|
||||
{
|
||||
if (isset($this->_gatewayConnections[$connection->id])) {
|
||||
unset($this->_gatewayConnections[$connection->id]);
|
||||
$this->broadcastAddresses();
|
||||
}
|
||||
if (isset($this->_workerConnections[$connection->id])) {
|
||||
unset($this->_workerConnections[$connection->id]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 向 BusinessWorker 广播 gateway 内部通讯地址
|
||||
*
|
||||
* @param \Workerman\Connection\ConnectionInterface $connection
|
||||
*/
|
||||
public function broadcastAddresses($connection = null)
|
||||
{
|
||||
$data = array(
|
||||
'event' => 'broadcast_addresses',
|
||||
'addresses' => array_unique(array_values($this->_gatewayConnections)),
|
||||
);
|
||||
$buffer = json_encode($data);
|
||||
if ($connection) {
|
||||
$connection->send($buffer);
|
||||
return;
|
||||
}
|
||||
foreach ($this->_workerConnections as $con) {
|
||||
$con->send($buffer);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user