Skip to content

Commit

Permalink
Rewrited $eventLoopClass initialization process. Print all throwed ex…
Browse files Browse the repository at this point in the history
…ceptions through Worker::log method.
  • Loading branch information
luzrain committed Nov 4, 2023
1 parent ba38b9d commit b3b8c99
Showing 1 changed file with 62 additions and 71 deletions.
133 changes: 62 additions & 71 deletions src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,9 @@ class Worker
/**
* Log file.
*
* @var mixed
* @var string
*/
public static mixed $logFile = '';
public static string $logFile;

/**
* Global event loop.
Expand Down Expand Up @@ -314,9 +314,9 @@ class Worker
/**
* EventLoopClass
*
* @var string|class-string
* @var class-string<EventInterface>
*/
public static string $eventLoopClass = '';
public static string $eventLoopClass;

/**
* After sending the stop command to the child process stopTimeout seconds,
Expand Down Expand Up @@ -485,15 +485,6 @@ class Worker
'worker_exit_info' => []
];

/**
* Available event loops.
*
* @var array<string, string>
*/
protected static array $availableEventLoops = [
'event' => Event::class,
];

/**
* PHP built-in protocols.
*
Expand Down Expand Up @@ -563,19 +554,23 @@ class Worker
*/
public static function runAll(): void
{
static::checkSapiEnv();
static::init();
static::parseCommand();
static::lock();
static::daemonize();
static::initWorkers();
static::installSignal();
static::saveMasterPid();
static::lock(LOCK_UN);
static::displayUI();
static::forkWorkers();
static::resetStd();
static::monitorWorkers();
try {
static::checkSapiEnv();
static::init();
static::parseCommand();
static::lock();
static::daemonize();
static::initWorkers();
static::installSignal();
static::saveMasterPid();
static::lock(LOCK_UN);
static::displayUI();
static::forkWorkers();
static::resetStd();
static::monitorWorkers();
} catch (\Throwable $e) {
static::log($e);
}
}

/**
Expand All @@ -587,7 +582,7 @@ protected static function checkSapiEnv(): void
{
// Only for cli and micro.
if (!in_array(\PHP_SAPI, ['cli', 'micro'])) {
exit("Only run in command line mode\n");
throw new \RuntimeException("Only run in command line mode");
}
}

Expand Down Expand Up @@ -630,8 +625,8 @@ protected static function init(): void
// State.
static::$status = static::STATUS_STARTING;

// Avoiding incorrect user calls.
static::resetGlobalEvent();
// Init global event.
static::initGlobalEvent();

// For statistics.
static::$globalStatistics['start_timestamp'] = time();
Expand All @@ -647,16 +642,30 @@ protected static function init(): void
}

/**
* reset globalEvent Instance.
* Init global event.
*
* @return void
*/
protected static function resetGlobalEvent(): void
protected static function initGlobalEvent(): void
{
if (static::$status === static::STATUS_STARTING && static::$globalEvent instanceof EventInterface) {
if (static::$globalEvent !== null) {
static::$eventLoopClass = get_class(static::$globalEvent);
static::$globalEvent = null;
return;
}

if (isset(static::$eventLoopClass)) {
if (!is_subclass_of(static::$eventLoopClass, EventInterface::class)) {
throw new RuntimeException(sprintf('%s::$eventLoopClass must implement %s', static::class, EventInterface::class));
}
return;
}

static::$eventLoopClass = match (true) {
class_exists(EventLoop::class) => Revolt::class,
extension_loaded('event') => Event::class,
default => Select::class,
};
}

/**
Expand Down Expand Up @@ -1332,32 +1341,10 @@ protected static function saveMasterPid(): void
/**
* Get event loop name.
*
* @return string
* @return class-string<EventInterface>
*/
protected static function getEventLoopName(): string
{
if (static::$eventLoopClass) {
return static::$eventLoopClass;
}

if (class_exists(EventLoop::class)) {
static::$eventLoopClass = Revolt::class;
return static::$eventLoopClass;
}

$loopName = '';
foreach (static::$availableEventLoops as $name => $class) {
if (extension_loaded($name)) {
$loopName = $name;
break;
}
}

if ($loopName) {
static::$eventLoopClass = static::$availableEventLoops[$loopName];
} else {
static::$eventLoopClass = Select::class;
}
return static::$eventLoopClass;
}

Expand Down Expand Up @@ -1447,9 +1434,9 @@ protected static function forkWorkersForWindows(): void
register_shutdown_function(static::checkErrors(...));

// Create a global event loop.
if (!static::$globalEvent) {
if (static::$globalEvent === null) {
$eventLoopClass = static::getEventLoopName();
static::$globalEvent = new $eventLoopClass;
static::$globalEvent = new $eventLoopClass();
static::$globalEvent->setErrorHandler(function ($exception) {
static::stopAll(250, $exception);
});
Expand Down Expand Up @@ -1517,7 +1504,7 @@ public static function forkOneWorkerForWindows(string $startFile): void
$pipes = [];
$process = proc_open('"' . PHP_BINARY . '" ' . " \"$startFile\" -q", $descriptorSpec, $pipes, null, null, ['bypass_shell' => true]);

if (empty(static::$globalEvent)) {
if (static::$globalEvent === null) {
static::$globalEvent = new Select();
static::$globalEvent->setErrorHandler(function ($exception) {
static::stopAll(250, $exception);
Expand Down Expand Up @@ -1588,9 +1575,9 @@ protected static function forkOneWorkerForLinux(self $worker): void
register_shutdown_function(static::checkErrors(...));

// Create a global event loop.
if (!static::$globalEvent) {
if (static::$globalEvent === null) {
$eventLoopClass = static::getEventLoopName();
static::$globalEvent = new $eventLoopClass;
static::$globalEvent = new $eventLoopClass();
static::$globalEvent->setErrorHandler(function ($exception) {
static::stopAll(250, $exception);
});
Expand Down Expand Up @@ -1798,7 +1785,7 @@ protected static function exitAndClearAll(): void
@unlink(static::$pidFile);
static::log("Workerman[" . basename(static::$startFile) . "] has been stopped");
if (static::$onMasterStop) {
call_user_func(static::$onMasterStop);
(static::$onMasterStop)();
}
exit(0);
}
Expand All @@ -1823,7 +1810,7 @@ protected static function reload(): void
// Try to emit onMasterReload callback.
if (static::$onMasterReload) {
try {
call_user_func(static::$onMasterReload);
(static::$onMasterReload)();
} catch (Throwable $e) {
static::stopAll(250, $e);
}
Expand Down Expand Up @@ -1867,7 +1854,7 @@ protected static function reload(): void
// Try to emit onWorkerReload callback.
if ($worker->onWorkerReload) {
try {
call_user_func($worker->onWorkerReload, $worker);
($worker->onWorkerReload)($worker);
} catch (Throwable $e) {
static::stopAll(250, $e);
}
Expand Down Expand Up @@ -2165,18 +2152,22 @@ protected static function getErrorType(int $type): string
/**
* Log.
*
* @param mixed $msg
* @param \Stringable|string $msg
* @param bool $decorated
* @return void
*/
public static function log(mixed $msg, bool $decorated = false): void
public static function log(\Stringable|string $msg, bool $decorated = false): void
{
$msg .= "\n";
$msg = trim((string)$msg);

if (!static::$daemonize) {
static::safeEcho($msg, $decorated);
static::safeEcho("$msg\n", $decorated);
}

if (isset(static::$logFile)) {
$pid = DIRECTORY_SEPARATOR === '/' ? posix_getpid() : 1;
file_put_contents(static::$logFile, sprintf("%s pid:%d %s\n", date('Y-m-d H:i:s'), $pid, $msg), FILE_APPEND | LOCK_EX);
}
file_put_contents(static::$logFile, date('Y-m-d H:i:s') . ' ' . 'pid:'
. (DIRECTORY_SEPARATOR === '/' ? posix_getpid() : 1) . ' ' . $msg, FILE_APPEND | LOCK_EX);
}

/**
Expand Down Expand Up @@ -2400,7 +2391,7 @@ protected function parseSocketAddress(): ?string
*/
public function pauseAccept(): void
{
if (static::$globalEvent && false === $this->pauseAccept && $this->mainSocket) {
if (static::$globalEvent !== null && $this->pauseAccept === false && $this->mainSocket !== null) {
static::$globalEvent->offReadable($this->mainSocket);
$this->pauseAccept = true;
}
Expand All @@ -2414,7 +2405,7 @@ public function pauseAccept(): void
public function resumeAccept(): void
{
// Register a listener to be notified when server socket is ready to read.
if (static::$globalEvent && true === $this->pauseAccept && $this->mainSocket) {
if (static::$globalEvent !== null && $this->pauseAccept === true && $this->mainSocket !== null) {
if ($this->transport !== 'udp') {
static::$globalEvent->onReadable($this->mainSocket, $this->acceptTcpConnection(...));
} else {
Expand Down

0 comments on commit b3b8c99

Please sign in to comment.