swoole--异步TCP/UDP服务器

2021-05-21

Swoole\Server类是所有异步风格服务器的基类,Http\Server、WebSocket\Server都继承于它

TCP服务端

use Swoole/Server;

$modes = [
	WOOLE_PROCESS, // 多进程模式
    SWOOLE_BASE,   // 基本模式
    $sock_type | SWOOLE_SSL,        // 支持SSL
];

$sockTypes = [
        SWOOLE_TCP/SWOOLE_SOCK_TCP,     // tcp ipv4 socket
        SWOOLE_TCP6/SWOOLE_SOCK_TCP6,   // tcp ipv6 socket
        SWOOLE_UNIX_DGRAM,              // unix socket dgram
        SWOOLE_UNIX_STREAM,             // unix socket stream
        $sock_type | SWOOLE_SSL,        // 支持SSL
];

// 启动监听$host,$port $mode模式的服务,服务类型为$sockType
$server = new Server($host, $port = 0, $mode = SWOOLE_PROCESS, $sockType = SWOOLE_SOCK_TCP);


// 设置参数
$server->set(array());

// 监听连接事件
$server->on('connect', function ($server, $fd, $reactorId){
	// 将$fd与用户自定义$uid绑定,保证一个UID的连接全部会分配到同一个worker进程,同一连接只能被bind一次,再次调用会返回false,$uid为-2147483648至4294967295的非0数字
	// 仅在dispatch_mode=5时有效,$uid为int类型
	$server->bind($fd, $uid);

	// 与enable_delay_receive配合使用,确认$fd后才开始监听可读事件并接收客户端的数据,保护服务器避免流量过载攻击
	$server->confirm($fd);

	// 获取客户端$fd的信息,连接所在的$Reactor线程ID -- $reactorId,$ignoreError为true时连接关闭也会返回连接信息
	$server->getClientInfo($fd, $reactorId, $ignoreError = false);
	[
		'reactor_id',        // Reactor线程id
		'server_fd',         // 来自哪个监听端口socket,这里不是客户端连接的fd
		'server_port',       // 来自哪个监听端口
		'remote_port',       // 客户端连接的端口
		'remote_ip',         // 客户端连接的IP地址
		'connect_time',      // 客户端连接到Server的时间,单位秒,由master进程设置
		'last_time',         // 最后一次收到数据的时间,单位秒,由master进程设置
		'close_errno',       // 连接关闭的错误码,如果连接异常关闭,close_errno的值是非零,可以参考Linux错误信息列表
		'recv_queued_bytes', // 等待处理的数据量
		'send_queued_bytes', // 等待发送的数据量
		'websocket_status',  // WebSocket连接状态,当服务器是Swoole\WebSocket\Server时会额外增加此项信息
		'uid', 				 // 使用bind绑定了用户ID时会额外增加此项信息
		'ssl_client_cert',   // 使用SSL隧道加密,并且客户端设置了证书时会额外添加此项信息
	];

    echo "Client:Connect.\n";
});

// 监听接收消息事件
$server->on('receive', function (Swoole\Server $server, $fd, $reactor_id, $data) {
    if (trim($data) == 'task') {
    	// 投递一个异步任务$data到task_worker池中,可以指定task进程$dstWorkerId,$dstWorkerId为-1时随机投递空闲task进程,$finishCallback回调设置时不会指定onFinish回调(只有worker进程投递才生效);方法成功返回task进程id,失败返回false;$data类型不限(允许序列化即可)
    	// 使用task必须设置task进程数和onTask和onFinish回调
    	// 从Master、Manager、UserProcess进程中投递的任务,是单向的
    	// task方法不能在task进程/用户自定义进程中调用
    	// task操作的次数必须小于onTask处理速度,如果投递容量超过处理能力,task数据会塞满缓存区,导致Worker进程发生阻塞,Worker 进程将无法接收新的请求
    	// 使用addProcess添加的用户进程中无法使用task投递任务,请使用sendMessage接口与Worker/Task进程通信
        $server->task($data = "async task coming", $dstWorkerId = -1, $finishCallback);

        // 与task方法作用相同,taskwait是同步等待的直到超过超时时间$timeout秒返回false或任务完成返回结果$result(由 $server->finish函数发出)
        // 在协程中调用时将自动进行协程调度,不再阻塞等待
        $server->taskwait($data = "async task coming", $timeout = 0.5, $dstWorkerId = -1);

        // 并发执行多个task异步任务,不支持协程,任务完成或超时,返回结果数组,结果数组与任务task任务顺序对应
        // 某个任务执行超时不会影响其他任务,返回的结果数据中将不包含超时的任务;最大并发任务不得超过1024
        // $tasks为索引数组,超时时间$timeout秒
        $server->taskWaitMulti($tasks = ["async task coming"], $timeout = 0.5);

		// 协程环境下的taskWaitMulti
        $server->taskCo($tasks = ["async task coming"], $timeout = 0.5);


    } else {

    	// 给客户端$fd发送数据$data,发送过程为异步,如果发送的数据超过2M(修改buffer_output_size可以加大数据长度)可以间数据写入临时文件,然后通过sendfile方法发送
    	// 向UnixSocket DGRAM对端发送数据时需要设置$serverSocket,TCP不需要
		$server->send($fd, $data, $serverSocket  = -1);

		// 从文件$filename中获取$offset位置开始发送$length长度的数据到客户端$fd
		$server->sendfile($fd, $filename, $offset = 0, $length = 0);

		// 同步发送数据$data到客户端$fd,等待响应,目前仅可用于SWOOLE_BASE模式,只用于本机或内网通信;启用协程时使用此函数会卡死协程
		$server->sendwait($fd, $data);

		// 向任意Worker进程或者Task进程ID $workerId发送消息$message,在非主进程和管理进程中可调用,收到消息会触发onPipeMessage事件
		// $message没有长度限制,但超过8K会使用内存临时文件
		// 必须注册onPipeMessage回调才能使用,task_ipc_mode = 3时无法向task进程发消息
		$server->sendMessage($message, $workerId);
    }
});


// 监听关闭连接
$server->on('close', function ($server, $fd, $reactorId) {
    echo "Client: Close.\n";
});

// 工作进程收到$server->sendMessage()发送的消息时触发
$server->on('pipeMessage', function ($server, $src_worker_id, $message) {
    echo "#{$server->worker_id} message from #$src_worker_id: $message\n";
});

// task_worker进程接收任务时触发
// 当前的Task进程在调用onTask回调函数时会将进程状态切换为忙碌,这时将不再接收新的Task;当onTask函数返回时会将进程状态切换为空闲然后继续接收新的Task
// $task_id和$src_worker_id组合起来才是全局唯一的;$src_worker_id为投递任务的worker进程id,$task_id为执行任务的task进程的id
$server->on('task', function ($server, $task_id, $src_worker_id, $data) {
    // 通知worker进程,投递任务已完成,结果为$data,投递会触发onFinish事件,可以多次发送结果并触发事件
	$server->finish($data);
});


// task进程完成任务时通过$server->finish()方法触发,接收处理结果;指定逻辑的worker进程和下发task任务的worker进程是同一个进程
$server->on('finish', function ($server, $task_id, $data) {
	echo '结果为:';
	var_dump($data);
});

// 当Worker/Task进程发生异常时Manager进程会回调;用于报警和监控
$server->on('WorkerError', function ($server, $worker_id, $worker_pid, $exit_code, $signal) {
	var_dump('异常worker进程的id: ' . $worker_id);
	var_dump('异常worker进程的pid: ' . $worker_id);
});
// signal = 11:说明Worker进程发生了segment fault段错误,可能触发了底层的BUG
// exit_code = 255:说明Worker进程发生了Fatal Error致命错误,请检查PHP的错误日志
// signal = 9:说明Worker被系统强行Kill,请检查是否有人为的kill -9操作,检查dmesg信息中是否存在OOM(Out of memory)

// 当管理进程启动时触发此事件
$server->on('ManagerStart', function ($server) {
    var_dmup('管理进程启动');
});

// 管理进程结束时触发
$server->on('ManagerStop', function ($server) {
    var_dmup('管理进程结束');
});

// Worker进程Reload之前触发此事件,在Manager进程中回调
$server->on('BeforeReload', function ($server) {
	var_dump('worker即将重启');
});

// Worker进程Reload之后触发此事件,在Manager进程中回调
$server->on('AfterReload', function ($server) {
	var_dump('worker完成重启');
});

// 设置允许安全地重启所有Worker/Task进程,$only_reload_taskworker为true仅重启Task进程,Base模式下不支持重启Task进程
// Reload操作只能重新载入Worker进程启动后加载的PHP文件,使用get_included_files函数来列出哪些文件是在WorkerStart之前就加载的PHP文件,在此列表中的PHP文件,即使进行了reload操作也无法重新载入。要关闭服务器重新启动才能生效。
$server->reload($only_reload_taskworker = false);
# 重启所有worker进程
# kill -USR1 主进程PID
# 仅重启task进程
# kill -USR2 主进程PID

// 启动服务,启动成功后会创建worker_num+2个进程。Master进程+Manager进程+serv->worker_num个Worker 进程
$server->start();


// 停止运行$workerId指定的Worker进程,并立即触发onWorkerStop回调函数;$waitEvent为false时立即退出,为true时等待时间循环为空时再退出
$server->stop($workerId = -1, $waitEvent = false);

// 关闭服务,函数可以用在Worker进程内使用,向主进程发送SIGTERM也可以实现关闭服务
$server->shutdown();
// kill -15 主进程PID

UDP服务端

use Swoole/Server;

$modes = [
	WOOLE_PROCESS, // 多进程模式
    SWOOLE_BASE,   // 基本模式
];

$sockTypes = [
        SWOOLE_UDP/SWOOLE_SOCK_UDP,     // udp ipv4 socket
        SWOOLE_UDP6/SWOOLE_SOCK_UDP6,   // udp ipv6 socket
        SWOOLE_UNIX_DGRAM,              // unix socket dgram
        SWOOLE_UNIX_STREAM,             // unix socket stream
        $sock_type | SWOOLE_SSL,        // 支持SSL
];

// 启动监听$host,$port $mode模式的服务,服务类型为$sockType
$server = new Server($host, $port = 0, $mode = SWOOLE_PROCESS, $sockType = SWOOLE_SOCK_UDP);

// 设置参数
$server->set(array());

// 接收到UDP数据包时回调此函数,发生在worker进程中
$server->on('Packet', function (Swoole\Server $server, $data, $clientInfo) {
    
    
});

// 监听关闭连接
$server->on('close', function ($server, $fd, $reactorId) {
    echo "Client: Close.\n";
});


// 启动服务,启动成功后会创建worker_num+2个进程。Master进程+Manager进程+serv->worker_num个Worker 进程
$server->start();

客户端连接接口

// 检测fd对应的连接是否存在。
$server->exist($fd);

// 获取当前server的所有客户端连接,可以指定启示$fd和获取条数$pageSize
// 仅可用于TCP 客户端,SWOOLE_BASE模式下只能获取当前进程的连接
$server->getClientList($start_fd = 0, $pageSize = 10);

// 得到当前Server的活动TCP连接数,启动时间等信息,accept/close(建立连接 / 关闭连接) 的总次数等信息数组
$server->stats();

// 不再接收客户端$fd的数据,只支持SWOOLE_PROCESS模式
$server->pause($fd);

// 恢复数据接收。与pause方法成对使用
$server->resume($fd);

// 主动检测服务器所有连接,并找出已经超过约定时间的连接。if_close_connection为true则自动关闭超时的连接。指定false仅返回连接的fd数组。
$server->heartbeat($ifCloseConnection = true);

// 设置客户端连接为保护状态,不被心跳线程切断。
$server->protect($fd, $value = true);

// 关闭客户端连接,也会触发onClose 事件,$reset为true强制关闭连接,丢弃队列中的数据
$server->close($fd, $reset = false);

进程ID

// 获取当前worker进程的id,和onWorkerStart时的$workerId一致
$server->getWorkerId();

// 获取当前Worker进程的PID
$server->getWorkerPid();

// 获取Worker进程$worker_id的状态,不是Worker进程或者进程不存在返回false
$server->getWorkerStatus($worker_id);

// 获取当前服务的Manager进程PID
$server->getManagerPid();

// 获取当前服务的Master进程PID
$server->getMasterPid();

错误码

// 获取最近一次操作错误的错误码
$server->getLastError();
[
	1001, // 连接已经被Server端关闭了,出现这个错误一般是代码中已经执行了$server->close()关闭了某个连接,但仍然调用$server->send()向这个连接发送数据
	1002, // 连接已被Client端关闭了,Socket已关闭无法发送数据到对端
	1003, // 正在执行close,onClose回调函数中不得使用$server->send()
	1004, // 连接已关闭
	1005, // 连接不存在,传入$fd可能是错误的
	1007, // 接收到了超时的数据,TCP关闭连接后,可能会有部分数据残留在unixSocket缓存区内,这部分数据会被丢弃
	1008, // 发送缓存区已满无法执行send操作,出现这个错误表示这个连接的对端无法及时收数据导致发送缓存区已塞满
	1202, // 发送的数据超过了server->buffer_output_size设置
	9007, // 仅在使用dispatch_mode=3时出现,表示当前没有可用的进程,可以调大worker_num进程数量
];

服务端属性

// 获取服务的配置信息数组
$server->setting;

// 获取当前服务器主进程的PID,只能在onStart/onWorkerStart之后获取到
$server->master_pid;

// 获取当前服务器管理进程的PID,只能在onStart/onWorkerStart之后获取到
$server->manager_pid;

// 获取当前worker进程的编号,包括Task进程。工作进程重启后worker_id的值是不变的
$server->worker_id;

// 获取当前worker进程的操作系统进程ID,与posix_getpid()的返回值相同
$server->worker_pid;

// 检查当前进程是否为Task进程。true是,false为Worker进程
$server->taskworker;

// TCP连接迭代器,可以使用foreach遍历服务器当前所有客户端连接,SWOOLE_BASE模式下不支持跨进程操作TCP连接
$server->connections;

// 获取监听端口的数组,元素为Swoole\Server\Port对象,第0个元素是主服务端口
$server->ports;

添加监听端口

// 添加额外监听的端口,填写后需要给$port单独设置参数与监听事件
$port = $server->addListener('0.0.0.0', 9502, SWOOLE_TCP);

定时器

 
// 设置毫秒定时器返回定时器id,每$millisecond毫秒执行$callback回调
// 定时器不能在Server->start之前使用;可以在onReceive和onWorkerStart使用
// 在onWorkerStart中使用时$callback参数为定时器id,其他无参数
$server->tick($millisecond, $callback);

// 设置一次性毫秒定时器返回定时器id,$millisecond毫秒后执行$callback回调,执行完成后就会销毁
// 不能在Server->start之前使用
// $callback没有参数
$server->after($millisecond, $callback);

// 延后执行函数$classback(底层会在EventLoop循环完成后执行此函数,底层不保证defer的函数会立即执行, 在onWorkerStart回调中执行defer时,必须要等到有事件发生才会回调)
$server->defer($callback);

// 清楚定时器$timerId
// 仅可用于清除当前进程的定时器
$server->clearTimer($timerId);

示例

添加自定义进程

$server = new Swoole\Server('127.0.0.1', 9501);

/**
 * 用户进程实现了广播功能,循环接收unixSocket的消息,并发给服务器的所有连接
 */
$process = new Swoole\Process(function ($process) use ($server) {
    $socket = $process->exportSocket();
    while (true) {
        $msg = $socket->recv();
        foreach ($server->connections as $conn) {
            $server->send($conn, $msg);
        }
    }
}, false, 2, 1);

// 添加一个用户自定义的工作进程。此函数通常用于创建一个特殊的工作进程,用于监控、上报或者其他特殊的任务。创建的子进程可以调用$server对象提供的各个方法,用户进程内不能使用task/taskwait、send/close
$server->addProcess($process);

$server->on('receive', function ($serv, $fd, $reactor_id, $data) use ($process) {
    //群发收到的消息
    $socket = $process->exportSocket();
    $socket->send($data);
});

// 此事件在 Worker 进程 / Task 进程 启动时发生,这里创建的对象可以在进程生命周期内使用。
$server->on('WorkerStart', function (Swoole\Server $server, $workerId) {
    //群发收到的消息
    $socket = $process->exportSocket();
    $socket->send($data);
});
// 此事件在 Worker 进程终止时发生。在此函数中可以回收 Worker 进程申请的各类资源。
$server->on('WorkerStop', function (Swoole\Server $server, $workerId) {
    //群发收到的消息
    $socket = $process->exportSocket();
    $socket->send($data);
});
// 仅在开启 reload_async 特性后有效
$server->on('WorkerExit', function (Swoole\Server $server, $workerId) {
    //群发收到的消息
    $socket = $process->exportSocket();
    $socket->send($data);
});

绑定uid

$serv->on('receive', function (Swoole\Server $serv, $fd, $reactor_id, $data) {
    $conn = $serv->getClientInfo($fd);
    print_r($conn);
    echo "worker_id: " . $serv->worker_id . PHP_EOL;
    if (empty($conn['uid'])) {
        $uid = $fd + 1;
        if ($serv->bind($fd, $uid)) {
            $serv->send($fd, "bind {$uid} success");
        }
    } else {
        if (!isset($serv->fdlist[$fd])) {
            $serv->fdlist[$fd] = $conn['uid'];
        }
        print_r($serv->fdlist);
        foreach ($serv->fdlist as $_fd => $uid) {
            $serv->send($_fd, "{$fd} say:" . $data);
        }
    }
});

$serv->on('close', function ($serv, $fd, $reactor_id) {
    echo "{$fd} Close". PHP_EOL;
    unset($serv->fdlist[$fd]);
});

 

{/if}