php Swoole进程管理

2021-10-03

进程池

  • 基于Swoole\Server的Manager管理进程模块实现。可管理多个工作进程。该模块的核心功能为进程管理
  • 某个工作进程遇到致命错误、主动退出时管理器会进行回收,避免出现僵尸进程
  • 工作进程退出后,管理器会自动拉起、创建一个新的工作进程
  • 主进程收到SIGTERM信号时将停止fork新进程,并kill所有正在运行的工作进程
  • 主进程收到SIGUSR1信号时将将逐个kill正在运行的工作进程,并重新启动新的工作进程
  • 底层仅设置了主进程(管理进程)的信号处理,并未对Worker工作进程设置信号,需要开发者自行实现信号的监听

创建进程间不通信的进程池

use Swoole\Process;
use Swoole\Coroutine;

// 创建5个进程,且进程间启用协程,协程不能设置OnMessage回调
$pool = new Process\Pool(5, SWOOLE_IPC_NONE, 0, true);
// 并且必须在onWorkerStart中实现循环逻辑
// 子进程启动回调
$pool->on('WorkerStart', function (Process\Pool $pool, $workerId) {
    /** 当前是 Worker 进程,$workerId表示第几个进程 */
    $running = true;
    $i = 0;
    Process::signal(SIGTERM, function () use (&$running, $pool, $workerId) {
        $running = false; // 进程终止,并重启进程
        if ($workerId == 4) {
        	echo "KILL\n";
        	// 终止工作进程池
        	$pool->shutdown();
        } else {
        	echo "TERM\n";
        }
    });
    echo("[Worker #{$workerId}] WorkerStart, pid: " . posix_getpid() . "\n");
    while ($running) {
        Coroutine::sleep(1);
        echo "{$i}:sleep 1\n";
        $i++;
        if ($i > 5) {
            // 获取当前工作进程对象(Swoole/Process)
            $worker = $pool->getProcess($workerId);
            // 向指定pid发送信号
            $worker->kill($worker->pid, SIGTERM);
        }
    }
});
// 子进程结束回调
$pool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
    echo("[Worker #{$workerId}] WorkerStop\n");
});
// 启动进程池
if (!$pool->start()) {
	// 如果启用失败,获取最近一次系统调用的错误码,并将其转换为错误信息
	var_dump(swoole_strerror(swoole_errno()));
}

创建进程间使用系统消息队列(sysvmsg)通信的进程池

use Swoole\Process;
use Swoole\Coroutine;

// 进程队列
$key = 1;
// 队列资源
$queue = msg_get_queue($key);
// 创建5个进程,且进程间启用协程,协程不能设置OnMessage回调
$pool = new Process\Pool(5, SWOOLE_IPC_MSGQUEUE, $key, true);
// 并且必须在onWorkerStart中实现循环逻辑
$pool->on('WorkerStart', function (Process\Pool $pool, $workerId) use ($queue) {
    /** 当前是 Worker 进程,$workerId表示第几个进程 */
    $running = true;
    $i = 0;
    Process::signal(SIGTERM, function () use (&$running, $workerId, $pool) {
        $running = false; // 进程终止,并重启进程
        if ($workerId == 4) {
           echo "{$workerId}:KILL\n";
           // 终止工作进程
           $pool->shutdown();
        } else {
            echo "{$workerId}:TERM\n";
        }
     });
    echo("[Worker #{$workerId}] WorkerStart, pid: " . posix_getpid() . "\n");
    while ($running) {
        Coroutine::sleep(1);
        if ($workerId < 4) {
            msg_send($queue, 1, "{$workerId}:数据入队:".base64_encode(random_bytes(5)), false);
        } else {
            if (msg_receive($queue, 0, $msgtype, 65536, $msg, false)) {
                // 推送数据类型和推送的数据
                var_dump($msgtype. '--'. $msg);
            } else {
                echo $workerId.'队列获取失败';
            }
        }
        $i++;
        if ($i > 5) {
            // 获取当前工作进程对象(Swoole/Process)
            $worker = $pool->getProcess($workerId);
            // 向指定pid发送信号
            $worker->kill($worker->pid, SIGTERM);
        }
    }
});
// 进程终止回调
$pool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
    echo("[Worker #{$workerId}] WorkerStop\n");
});
// 启动进程池
if (!$pool->start()) {
    // 如果启用失败,获取最近一次系统调用的错误码,并将其转换为错误信息
    var_dump(swoole_strerror(swoole_errno()));
}

创建进程间使用unixSocket通信的进程池


use Swoole\Process;
use Swoole\Coroutine;

// 创建2个进程,且进程间启用协程,协程不能设置OnMessage回调
$pool = new Swoole\Process\Pool(2, SWOOLE_IPC_UNIXSOCK, 0, true);

$pool->on('workerStart', function (Swoole\Process\Pool $pool, int $workerId) {
   $process = $pool->getProcess(0);
   $socket = $process->exportSocket();
   if ($workerId == 0) {
       echo $socket->recv();
       $socket->send("hello proc{$workerId}\n");
       echo "proc{$workerId} stop\n";
   } else {
       $socket->send("hello-l proc{$workerId}\n");
       echo $socket->recv();
       echo "proc{$workerId} stop\n";
       $pool->shutdown();
   }
});
// 启动进程池
if (!$pool->start()) {
    // 如果启用失败,获取最近一次系统调用的错误码,并将其转换为错误信息
    var_dump(swoole_strerror(swoole_errno()));
}

进程管理

进程管理器,基于Process\Pool实现。可以管理多个进程。相比与Process\Pool,可以非常方便的创建多个执行不同任务的进程,并且可以控制每一个进程是否要处于协程环境。

进程管理器中可以获取Process\Pool的对象,可以使用其所有的方法

创建不通信的进程

use Swoole\Process;
use Swoole\Process\Manager;
use Swoole\Process\Pool;
use Swoole\Coroutine;

$pm = new Manager();

for ($i = 0; $i < 2; $i++) {
    $pm->add(function (Pool $pool, int $workerId) {
		$running = true;
        $i = 0;
        Process::signal(SIGTERM, function () use (&$running, $pool, $workerId) {
            $running = false; // 进程终止,并重启进程
        	echo "KILL\n";
            // 终止工作进程池
            $pool->shutdown();
        });
        echo("[Worker #{$workerId}] WorkerStart, pid: " . posix_getpid() . "\n");
        while ($running) {
            Coroutine::sleep(1);
            echo "{$i}:sleep 1\n";
            $i++;
            if ($i > 5) {
                // 获取当前工作进程对象(Swoole/Process)
                $worker = $pool->getProcess($workerId);
                // 向指定pid发送信号
                $worker->kill($worker->pid, SIGTERM);
            }
        }
     	// 子进程结束回调
        $pool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
            echo("[Worker #{$workerId}] WorkerStop\n");
        });
    }, true);
}

$pm->start();

批量创建进程

use Swoole\Process;
use Swoole\Process\Manager;
use Swoole\Process\Pool;
use Swoole\Coroutine;

$pm = new Manager();

$pm->addBatch(2, function (Pool $pool, int $workerId) {
	$running = true;
    $i = 0;
    Process::signal(SIGTERM, function () use (&$running, $pool, $workerId) {
        $running = false; // 进程终止,并重启进程
    	echo "KILL\n";
        // 终止工作进程池
        $pool->shutdown();
    });
    echo("[Worker #{$workerId}] WorkerStart, pid: " . posix_getpid() . "\n");
    while ($running) {
        Coroutine::sleep(1);
        echo "{$i}:sleep 1\n";
        $i++;
        if ($i > 5) {
            // 获取当前工作进程对象(Swoole/Process)
            $worker = $pool->getProcess($workerId);
            // 向指定pid发送信号
            $worker->kill($worker->pid, SIGTERM);
        }
    }
 	// 子进程结束回调
    $pool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
        echo("[Worker #{$workerId}] WorkerStop\n");
    });
}, true);


$pm->start();

 

{/if}