fix:修复BUG/升级1.1.6版本

This commit is contained in:
Ying
2023-04-25 20:11:49 +08:00
parent 445e5f9662
commit 6a6866bbaf
2357 changed files with 456920 additions and 140567 deletions

View File

@@ -4,6 +4,7 @@
"homepage": "http://www.workerman.net",
"license" : "MIT",
"require": {
"php": ">=7.0",
"workerman/workerman" : "^4.0.30"
},
"autoload": {

View File

@@ -1,4 +1,5 @@
<?php
/**
* This file is part of workerman.
*
@@ -51,20 +52,6 @@ class BusinessWorker extends Worker
*/
public $eventHandler = 'Events';
/**
* 业务超时时间,可用来定位程序卡在哪里
*
* @var int
*/
public $processTimeout = 30;
/**
* 业务超时时间,可用来定位程序卡在哪里
*
* @var callable
*/
public $processTimeoutHandler = '\\Workerman\\Worker::log';
/**
* 秘钥
*
@@ -82,21 +69,21 @@ class BusinessWorker extends Worker
/**
* 保存用户设置的 worker 启动回调
*
* @var callback
* @var callable|null
*/
protected $_onWorkerStart = null;
/**
* 保存用户设置的 workerReload 回调
*
* @var callback
* @var callable|null
*/
protected $_onWorkerReload = null;
/**
* 保存用户设置的 workerStop 回调
*
* @var callback
* @var callable|null
*/
protected $_onWorkerStop= null;
@@ -131,21 +118,21 @@ class BusinessWorker extends Worker
/**
* Event::onConnect 回调
*
* @var callback
* @var callable|null
*/
protected $_eventOnConnect = null;
/**
* Event::onMessage 回调
*
* @var callback
* @var callable|null
*/
protected $_eventOnMessage = null;
/**
* Event::onClose 回调
*
* @var callback
* @var callable|null
*/
protected $_eventOnClose = null;
@@ -227,13 +214,6 @@ class BusinessWorker extends Worker
call_user_func($this->eventHandler . '::onWorkerStart', $this);
}
if (function_exists('pcntl_signal')) {
// 业务超时信号处理
pcntl_signal(SIGALRM, array($this, 'timeoutHandler'), false);
} else {
$this->processTimeout = 0;
}
// 设置回调
if (is_callable($this->eventHandler . '::onConnect')) {
$this->_eventOnConnect = $this->eventHandler . '::onConnect';
@@ -393,9 +373,6 @@ class BusinessWorker extends Worker
}
}
if ($this->processTimeout) {
pcntl_alarm($this->processTimeout);
}
// 尝试执行 Event::onConnection、Event::onMessage、Event::onClose
switch ($cmd) {
case GatewayProtocol::CMD_ON_CONNECT:
@@ -420,9 +397,6 @@ class BusinessWorker extends Worker
}
break;
}
if ($this->processTimeout) {
pcntl_alarm(0);
}
// session 必须是数组
if ($_SESSION !== null && !is_array($_SESSION)) {
@@ -447,7 +421,7 @@ class BusinessWorker extends Worker
*/
public function onGatewayClose($connection)
{
$addr = $connection->remoteAddress;
$addr = $connection->remoteAddr;
unset($this->gatewayConnections[$addr], $this->_connectingGatewayAddresses[$addr]);
if (isset($this->_gatewayAddresses[$addr]) && !isset($this->_waitingConnectGatewayAddresses[$addr])) {
Timer::add(1, array($this, 'tryToConnectGateway'), array($addr), false);
@@ -464,7 +438,7 @@ class BusinessWorker extends Worker
{
if (!isset($this->gatewayConnections[$addr]) && !isset($this->_connectingGatewayAddresses[$addr]) && isset($this->_gatewayAddresses[$addr])) {
$gateway_connection = new AsyncTcpConnection("GatewayProtocol://$addr");
$gateway_connection->remoteAddress = $addr;
$gateway_connection->remoteAddr = $addr;
$gateway_connection->onConnect = array($this, 'onConnectGateway');
$gateway_connection->onMessage = array($this, 'onGatewayMessage');
$gateway_connection->onClose = array($this, 'onGatewayClose');
@@ -513,8 +487,8 @@ class BusinessWorker extends Worker
*/
public function onConnectGateway($connection)
{
$this->gatewayConnections[$connection->remoteAddress] = $connection;
unset($this->_connectingGatewayAddresses[$connection->remoteAddress], $this->_waitingConnectGatewayAddresses[$connection->remoteAddress]);
$this->gatewayConnections[$connection->remoteAddr] = $connection;
unset($this->_connectingGatewayAddresses[$connection->remoteAddr], $this->_waitingConnectGatewayAddresses[$connection->remoteAddr]);
}
/**
@@ -538,28 +512,4 @@ class BusinessWorker extends Worker
{
return $this->_gatewayAddresses;
}
/**
* 业务超时回调
*
* @param int $signal
* @throws \Exception
*/
public function timeoutHandler($signal)
{
switch ($signal) {
// 超时时钟
case SIGALRM:
// 超时异常
$e = new \Exception("process_timeout", 506);
$trace_str = $e->getTraceAsString();
// 去掉第一行timeoutHandler的调用栈
$trace_str = $e->getMessage() . ":\n" . substr($trace_str, strpos($trace_str, "\n") + 1) . "\n";
// 开发者没有设置超时处理函数,或者超时处理函数返回空则执行退出
if (!$this->processTimeoutHandler || !call_user_func($this->processTimeoutHandler, $trace_str, $e)) {
Worker::stopAll();
}
break;
}
}
}

View File

@@ -1,4 +1,5 @@
<?php
/**
* This file is part of workerman.
*
@@ -38,7 +39,7 @@ class Gateway extends Worker
*
* @var string
*/
const VERSION = '3.0.22';
const VERSION = '3.0.28';
/**
* 本机 IP
@@ -48,6 +49,21 @@ class Gateway extends Worker
*/
public $lanIp = '127.0.0.1';
/**
* 如果宿主机为192.168.1.2 , gatewayworker in docker container (172.25.0.2)
* 此时 lanIp=192.68.1.2 GatewayClientSDK 能连上,但是$this->_innerTcpWorker stream_socket_server(): Unable to connect to tcp://192.168.1.2:2901 (Address not available) in
* 此时 lanIp=172.25.0.2 GatewayClientSDK stream_socket_server(): Unable to connect to tcp://172.25.0.2:2901 (Address not available) $this->_innerTcpWorker 正常监听
*
* solution:
* $gateway->lanIp=192.168.1.2 ;
* $gateway->innerTcpWorkerListen=172.25.0.2; // || 0.0.0.0
*
* GatewayClientSDK connect 192.168.1.2:lanPort
* $this->_innerTcpWorker listen $gateway->innerTcpWorkerListen:lanPort
*
*/
public $innerTcpWorkerListen='';
/**
* 本机端口
*
@@ -107,7 +123,7 @@ class Gateway extends Worker
/**
* 路由函数
*
* @var callback
* @var callable|null
*/
public $router = null;
@@ -136,14 +152,14 @@ class Gateway extends Worker
/**
* BusinessWorker 连接成功之后触发
*
* @var callback|null
* @var callable|null
*/
public $onBusinessWorkerConnected = null;
/**
* BusinessWorker 关闭时触发
*
* @var callback|null
* @var callable|null
*/
public $onBusinessWorkerClose = null;
@@ -183,35 +199,35 @@ class Gateway extends Worker
/**
* 当 worker 启动时
*
* @var callback
* @var callable|null
*/
protected $_onWorkerStart = null;
/**
* 当有客户端连接时
*
* @var callback
* @var callable|null
*/
protected $_onConnect = null;
/**
* 当客户端发来消息时
*
* @var callback
* @var callable|null
*/
protected $_onMessage = null;
/**
* 当客户端连接关闭时
*
* @var callback
* @var callable|null
*/
protected $_onClose = null;
/**
* 当 worker 停止时
*
* @var callback
* @var callable|null
*/
protected $_onWorkerStop = null;
@@ -348,15 +364,35 @@ class Gateway extends Worker
* websocket握手时触发
*
* @param $connection
* @param $http_buffer
* @param $request
*/
public function onWebsocketConnect($connection, $http_buffer)
public function onWebsocketConnect($connection, $request)
{
if (isset($connection->_onWebSocketConnect)) {
call_user_func($connection->_onWebSocketConnect, $connection, $http_buffer);
call_user_func($connection->_onWebSocketConnect, $connection, $request);
unset($connection->_onWebSocketConnect);
}
$this->sendToWorker(GatewayProtocol::CMD_ON_WEBSOCKET_CONNECT, $connection, array('get' => $_GET, 'server' => $_SERVER, 'cookie' => $_COOKIE));
if (is_object($request)) {
$server = [
'QUERY_STRING' => $request->queryString(),
'REQUEST_METHOD' => $request->method(),
'REQUEST_URI' => $request->uri(),
'SERVER_PROTOCOL' => "HTTP/" . $request->protocolVersion(),
'SERVER_NAME' => $request->host(false),
'CONTENT_TYPE' => $request->header('content-type'),
'REMOTE_ADDR' => $connection->getRemoteIp(),
'REMOTE_PORT' => $connection->getRemotePort(),
'SERVER_PORT' => $connection->getLocalPort(),
];
foreach ($request->header() as $key => $header) {
$key = str_replace('-', '_', strtoupper($key));
$server["HTTP_$key"] = $header;
}
$data = array('get' => $request->get(), 'server' => $server, 'cookie' => $request->cookie());
} else {
$data = array('get' => $_GET, 'server' => $_SERVER, 'cookie' => $_COOKIE);
}
$this->sendToWorker(GatewayProtocol::CMD_ON_WEBSOCKET_CONNECT, $connection, $data);
}
/**
@@ -506,13 +542,19 @@ class Gateway extends Worker
//如为公网IP监听直接换成0.0.0.0 否则用内网IP
$listen_ip=filter_var($this->lanIp,FILTER_VALIDATE_IP,FILTER_FLAG_NO_PRIV_RANGE | FILTER_FLAG_NO_RES_RANGE)?'0.0.0.0':$this->lanIp;
//Use scenario to see line 64
if($this->innerTcpWorkerListen != '') {
$listen_ip = $this->innerTcpWorkerListen;
}
// 初始化 gateway 内部的监听,用于监听 worker 的连接已经连接上发来的数据
$this->_innerTcpWorker = new Worker("GatewayProtocol://{$listen_ip}:{$this->lanPort}");
$this->_innerTcpWorker->reusePort = false;
$this->_innerTcpWorker->listen();
$this->_innerTcpWorker->name = 'GatewayInnerWorker';
if ($this->_autoloadRootPath) {
if ($this->_autoloadRootPath && class_exists(Autoloader::class)) {
Autoloader::setRootPath($this->_autoloadRootPath);
}

View File

@@ -52,14 +52,20 @@ class Gateway
* 与Gateway是否是长链接
* @var bool
*/
public static $persistentConnection = false;
public static $persistentConnection = true;
/**
* 是否清除注册地址缓存
* @var bool
*/
public static $addressesCacheDisable = false;
/**
* 与gateway建立的连接
* @var array
*/
protected static $gatewayConnections = [];
/**
* 向所有客户端连接(或者 client_id_array 指定的客户端连接)广播消息
*
@@ -758,8 +764,8 @@ class Gateway
$client_array = $status_data = $client_address_map = $receive_buffer_array = $recv_length_array = array();
// 批量向所有gateway进程发送请求数据
foreach ($gateway_buffer_array as $address => $gateway_buffer) {
$client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout);
if ($client && strlen($gateway_buffer) === stream_socket_sendto($client, $gateway_buffer)) {
$client = static::getGatewayConnection("tcp://$address");
if (strlen($gateway_buffer) === stream_socket_sendto($client, $gateway_buffer)) {
$socket_id = (int)$client;
$client_array[$socket_id] = $client;
$client_address_map[$socket_id] = explode(':', $address);
@@ -792,6 +798,7 @@ class Gateway
}
}
if (microtime(true) - $time_start > $timeout) {
static::$gatewayConnections = [];
break;
}
}
@@ -1154,10 +1161,8 @@ class Gateway
{
$buffer = GatewayProtocol::encode($data);
$buffer = static::$secretKey ? static::generateAuthBuffer() . $buffer : $buffer;
$client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout);
if (!$client) {
throw new Exception("can not connect to tcp://$address $errmsg");
}
$address = "tcp://$address";
$client = static::getGatewayConnection($address);
if (strlen($buffer) === stream_socket_sendto($client, $buffer)) {
$timeout = 5;
// 阻塞读
@@ -1173,8 +1178,10 @@ class Gateway
$all_buffer .= $buf;
} else {
if (feof($client)) {
throw new Exception("connection close tcp://$address");
unset(static::$gatewayConnections[$address]);
throw new Exception("connection close $address");
} elseif (microtime(true) - $time_start > $timeout) {
unset(static::$gatewayConnections[$address]);
break;
}
continue;
@@ -1183,8 +1190,12 @@ class Gateway
if (!$pack_len && $recv_len >= 4) {
$pack_len= current(unpack('N', $all_buffer));
}
if (microtime(true) - $time_start > $timeout) {
unset(static::$gatewayConnections[$address]);
break;
}
// 回复的数据都是以\n结尾
if (($pack_len && $recv_len >= $pack_len + 4) || microtime(true) - $time_start > $timeout) {
if (($pack_len && $recv_len >= $pack_len + 4)) {
break;
}
}
@@ -1224,8 +1235,7 @@ class Gateway
}
// 非workerman环境
$gateway_buffer = static::$secretKey ? static::generateAuthBuffer() . $gateway_buffer : $gateway_buffer;
$flag = static::$persistentConnection ? STREAM_CLIENT_PERSISTENT | STREAM_CLIENT_CONNECT : STREAM_CLIENT_CONNECT;
$client = stream_socket_client("tcp://$address", $errno, $errmsg, static::$connectTimeout, $flag);
$client = static::getGatewayConnection("tcp://$address");
return strlen($gateway_buffer) == stream_socket_sendto($client, $gateway_buffer);
}
@@ -1373,6 +1383,44 @@ class Gateway
}
return true;
}
/**
* 获取与gateway的连接用于数据返回
*
* @param $address
* @return mixed
* @throws Exception
*/
protected static function getGatewayConnection($address)
{
$ttl = 50;
$time = time();
if (isset(static::$gatewayConnections[$address])) {
$created_time = static::$gatewayConnections[$address]['created_time'];
$connection = static::$gatewayConnections[$address]['connection'];
if ($time - $created_time > $ttl || !is_resource($connection) || feof($connection)) {
\set_error_handler(function () {});
fclose($connection);
\restore_error_handler();
unset(static::$gatewayConnections[$address]);
}
}
if (!isset(static::$gatewayConnections[$address])) {
$client = stream_socket_client($address, $errno, $errmsg, static::$connectTimeout);
if (!$client) {
throw new Exception("can not connect to $address $errmsg");
}
static::$gatewayConnections[$address] = [
'created_time' => $time,
'connection' => $client
];
}
$client = static::$gatewayConnections[$address]['connection'];
if (!static::$persistentConnection) {
static::$gatewayConnections = [];
}
return $client;
}
}
if (!class_exists('\Protocols\GatewayProtocol')) {

View File

@@ -176,7 +176,7 @@ class GatewayProtocol
$data['body'] = serialize($data['body']);
}
$data['flag'] |= $flag;
$ext_len = strlen($data['ext_data']);
$ext_len = strlen($data['ext_data']??'');
$package_len = self::HEAD_LEN + $ext_len + strlen($data['body']);
return pack("NCNnNnNCnN", $package_len,
$data['cmd'], $data['local_ip'],

View File

@@ -161,6 +161,7 @@ class Register extends Worker
*/
public function onClose($connection)
{
Timer::del($connection->timeout_timerid);
if (isset($this->_gatewayConnections[$connection->id])) {
unset($this->_gatewayConnections[$connection->id]);
$this->broadcastAddresses();