php Swoole使用通道进行协程间通讯

2021-10-17
  1. 仅占用内存,没有其他额外的资源申请,所有操作均为内存操作,无IO消耗
  2. 底层使用PHP引用计数实现,无内存拷贝。即使是传递巨大字符串或数组也不会产生额外性能消耗
  3. 不同的进程之间内存是隔离的。只能在同一进程的不同协程内进行push和pop操作
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use function Swoole\Coroutine\run;

run(function(){
	// 构造指定容量的通道,容量应大于1,容量为几就只能插入几个元素
    $channel = new Channel(1);

    Coroutine::create(function () use ($channel) {
        for($i = 0; $i < 10; $i++) {
        	// 等待时间,防止写入数据超过容量
            Coroutine::sleep(1.0);
            // 向通道写入数据,数据类型不限(资源、匿名函数、其他数据类型(避免0、false、null、空字符串)),-1为超时时间
            // 在通道已满的情况下,push会挂起当前协程,在约定的时间内,如果没有任何消费者消费数据,将发生超时,底层会恢复当前协程,push调用立即返回false,写入失败
            if (!$channel->push(['rand' => rand(1000, 9999), 'index' => $i], -1)) {

            	echo '错误码:' . $channel->errCode;
            }
            echo "{$i}\n";
        }
    });
    echo '通道状态:' . PHP_EOL;
	var_dump($channel->stats());
	[
		'consumer_num' => '消费者数量,表示当前通道为空,有N个协程正在等待其他协程调用push方法生产数据',
		'producer_num' => '生产者数量,表示当前通道已满,有N个协程正在等待其他协程调用pop方法消费数据',
		'queue_num' => '通道中的元素数量',
	];
    Coroutine::create(function () use ($channel) {
        while(1) {
        	// 从通道中读取数据,2.0为超时时间,指定时间未获取到数据将失败返回false
            $data = $channel->pop(2.0);
            if ($data) {
                var_dump($data);
            } else {
                assert($channel->errCode === SWOOLE_CHANNEL_TIMEOUT);
                break;
            }
        }
    });

    echo '通道中的元素数量:' . $channel->length();

    var_dump('通道是否为空:' . $channel->isEmpty());

    var_dump('通道是否已满:' . $channel->isFull());

	// 关闭通道。并唤醒所有等待读写的协程
	// 唤醒所有生产者协程,push方法返回false;唤醒所有消费者协程,pop方法返回false
    // $channel->close();
});
{/if}