进程池
- 基于Swoole\Server的Manager管理进程模块实现。可管理多个工作进程。该模块的核心功能为进程管理
- 某个工作进程遇到致命错误、主动退出时管理器会进行回收,避免出现僵尸进程
- 工作进程退出后,管理器会自动拉起、创建一个新的工作进程
- 主进程收到SIGTERM信号时将停止fork新进程,并kill所有正在运行的工作进程
- 主进程收到SIGUSR1信号时将将逐个kill正在运行的工作进程,并重新启动新的工作进程
- 底层仅设置了主进程(管理进程)的信号处理,并未对Worker工作进程设置信号,需要开发者自行实现信号的监听
创建进程间不通信的进程池
use Swoole\Process;
use Swoole\Coroutine;
// 创建5个进程,且进程间启用协程,协程不能设置OnMessage回调
$pool = new Process\Pool(5, SWOOLE_IPC_NONE, 0, true);
// 并且必须在onWorkerStart中实现循环逻辑
// 子进程启动回调
$pool->on('WorkerStart', function (Process\Pool $pool, $workerId) {
/** 当前是 Worker 进程,$workerId表示第几个进程 */
$running = true;
$i = 0;
Process::signal(SIGTERM, function () use (&$running, $pool, $workerId) {
$running = false; // 进程终止,并重启进程
if ($workerId == 4) {
echo "KILL\n";
// 终止工作进程池
$pool->shutdown();
} else {
echo "TERM\n";
}
});
echo("[Worker #{$workerId}] WorkerStart, pid: " . posix_getpid() . "\n");
while ($running) {
Coroutine::sleep(1);
echo "{$i}:sleep 1\n";
$i++;
if ($i > 5) {
// 获取当前工作进程对象(Swoole/Process)
$worker = $pool->getProcess($workerId);
// 向指定pid发送信号
$worker->kill($worker->pid, SIGTERM);
}
}
});
// 子进程结束回调
$pool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
echo("[Worker #{$workerId}] WorkerStop\n");
});
// 启动进程池
if (!$pool->start()) {
// 如果启用失败,获取最近一次系统调用的错误码,并将其转换为错误信息
var_dump(swoole_strerror(swoole_errno()));
}
创建进程间使用系统消息队列(sysvmsg)通信的进程池
use Swoole\Process;
use Swoole\Coroutine;
// 进程队列
$key = 1;
// 队列资源
$queue = msg_get_queue($key);
// 创建5个进程,且进程间启用协程,协程不能设置OnMessage回调
$pool = new Process\Pool(5, SWOOLE_IPC_MSGQUEUE, $key, true);
// 并且必须在onWorkerStart中实现循环逻辑
$pool->on('WorkerStart', function (Process\Pool $pool, $workerId) use ($queue) {
/** 当前是 Worker 进程,$workerId表示第几个进程 */
$running = true;
$i = 0;
Process::signal(SIGTERM, function () use (&$running, $workerId, $pool) {
$running = false; // 进程终止,并重启进程
if ($workerId == 4) {
echo "{$workerId}:KILL\n";
// 终止工作进程
$pool->shutdown();
} else {
echo "{$workerId}:TERM\n";
}
});
echo("[Worker #{$workerId}] WorkerStart, pid: " . posix_getpid() . "\n");
while ($running) {
Coroutine::sleep(1);
if ($workerId < 4) {
msg_send($queue, 1, "{$workerId}:数据入队:".base64_encode(random_bytes(5)), false);
} else {
if (msg_receive($queue, 0, $msgtype, 65536, $msg, false)) {
// 推送数据类型和推送的数据
var_dump($msgtype. '--'. $msg);
} else {
echo $workerId.'队列获取失败';
}
}
$i++;
if ($i > 5) {
// 获取当前工作进程对象(Swoole/Process)
$worker = $pool->getProcess($workerId);
// 向指定pid发送信号
$worker->kill($worker->pid, SIGTERM);
}
}
});
// 进程终止回调
$pool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
echo("[Worker #{$workerId}] WorkerStop\n");
});
// 启动进程池
if (!$pool->start()) {
// 如果启用失败,获取最近一次系统调用的错误码,并将其转换为错误信息
var_dump(swoole_strerror(swoole_errno()));
}
创建进程间使用unixSocket通信的进程池
use Swoole\Process;
use Swoole\Coroutine;
// 创建2个进程,且进程间启用协程,协程不能设置OnMessage回调
$pool = new Swoole\Process\Pool(2, SWOOLE_IPC_UNIXSOCK, 0, true);
$pool->on('workerStart', function (Swoole\Process\Pool $pool, int $workerId) {
$process = $pool->getProcess(0);
$socket = $process->exportSocket();
if ($workerId == 0) {
echo $socket->recv();
$socket->send("hello proc{$workerId}\n");
echo "proc{$workerId} stop\n";
} else {
$socket->send("hello-l proc{$workerId}\n");
echo $socket->recv();
echo "proc{$workerId} stop\n";
$pool->shutdown();
}
});
// 启动进程池
if (!$pool->start()) {
// 如果启用失败,获取最近一次系统调用的错误码,并将其转换为错误信息
var_dump(swoole_strerror(swoole_errno()));
}
进程管理
进程管理器,基于Process\Pool实现。可以管理多个进程。相比与Process\Pool,可以非常方便的创建多个执行不同任务的进程,并且可以控制每一个进程是否要处于协程环境。
进程管理器中可以获取Process\Pool的对象,可以使用其所有的方法
创建不通信的进程
use Swoole\Process;
use Swoole\Process\Manager;
use Swoole\Process\Pool;
use Swoole\Coroutine;
$pm = new Manager();
for ($i = 0; $i < 2; $i++) {
$pm->add(function (Pool $pool, int $workerId) {
$running = true;
$i = 0;
Process::signal(SIGTERM, function () use (&$running, $pool, $workerId) {
$running = false; // 进程终止,并重启进程
echo "KILL\n";
// 终止工作进程池
$pool->shutdown();
});
echo("[Worker #{$workerId}] WorkerStart, pid: " . posix_getpid() . "\n");
while ($running) {
Coroutine::sleep(1);
echo "{$i}:sleep 1\n";
$i++;
if ($i > 5) {
// 获取当前工作进程对象(Swoole/Process)
$worker = $pool->getProcess($workerId);
// 向指定pid发送信号
$worker->kill($worker->pid, SIGTERM);
}
}
// 子进程结束回调
$pool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
echo("[Worker #{$workerId}] WorkerStop\n");
});
}, true);
}
$pm->start();
批量创建进程
use Swoole\Process;
use Swoole\Process\Manager;
use Swoole\Process\Pool;
use Swoole\Coroutine;
$pm = new Manager();
$pm->addBatch(2, function (Pool $pool, int $workerId) {
$running = true;
$i = 0;
Process::signal(SIGTERM, function () use (&$running, $pool, $workerId) {
$running = false; // 进程终止,并重启进程
echo "KILL\n";
// 终止工作进程池
$pool->shutdown();
});
echo("[Worker #{$workerId}] WorkerStart, pid: " . posix_getpid() . "\n");
while ($running) {
Coroutine::sleep(1);
echo "{$i}:sleep 1\n";
$i++;
if ($i > 5) {
// 获取当前工作进程对象(Swoole/Process)
$worker = $pool->getProcess($workerId);
// 向指定pid发送信号
$worker->kill($worker->pid, SIGTERM);
}
}
// 子进程结束回调
$pool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
echo("[Worker #{$workerId}] WorkerStop\n");
});
}, true);
$pm->start();