php多线程--parallel扩展

2021-03-22

简介

PHP 7.2+的并行并发扩展,支持windows

  1. 一个parallel\Runtime代表一个PHP解释器线程。
  2. parallel\Runtime配置了一个可选的bootstrap文件,该文件被传递给parallel\Runtime::__construct(),这通常是一个自动加载器,或者一些其他的预加载例程:在任何任务执行之前都会包含bootstrap文件。在构建之后,并行运行时仍然可用,直到它被PHP对象的正常作用域规则关闭、杀死或销毁。
  3. parallel\Runtime::run()允许程序员安排任务并行执行。并行运行时有一个FIFO调度,任务将按照它们被调度的顺序执行。该API提供了一个通过自动调度执行并行代码的函数入口点:parallel\run()。

任务

任务只是旨在并行执行的闭包。该闭合几乎可以包含任何指令,包括嵌套关闭。但是,有些指令在任务中被禁止

  1. yield
  2. 使用按引用调用
  3.  声明类
  4. 声明命名函数

通信

  1. parallel\Future允许从一个任务发送数据和检索数据,但不允许任务之间的双向通信
  2. parallel\Channel允许任务之间的双向通信,是任务之间类似socket的链接,可以使用它来发送和接收数据。
  3. 不要通过共享内存进行通信;而是通过通信共享内存
  4. 因为PHP本身不共享任何内容-默认情况下,PHP线程在其自己的虚拟地址空间中运行,因此只能通过通信共享内存

安装

扩展地址:https://pecl.php.net/package/parallel

windows

地址:https://windows.php.net/downloads/pecl/releases/parallel/

php_parallel和pthreadVC2两个dll都需要配置在ini中

Functional API

  1. 为非常熟悉编写使用并行并发的应用程序的人提供了很大程度的控制。
  2. 所有执行的运行时都被相同地引导
  3. 调度是由API决定的,而不是由程序员决定的
  4. 只要硬件和操作系统限制允许,任务就会开始并行执行,而无需创建不必要的运行
  5. 简便的多线程方法,实现与parallel\Runtime一致
// 使用文件$file引导所有通过parallel\run()自动调度创建的运行
\parallel\bootstrap($file);

// 并行执行闭包$task,数组$argv为闭包的参数,可不传,参数中不能包含引用、资源、使用自定义结构的内部对象;parallel\Channel是编写并行代码的核心,必要时支持并发访问和执行,因此可以共享
// 调用parallel\run()前,内部已创建并缓存的\parallel\Runtime是空闲的,它将被用来执行任务,否则将先创建并缓存一个\parallel\Runtime
// 返回值为\parallel\Future对象
$future = \parallel\run($task, $argv);

parallel\Runtime类

运行时对象

  1. 每个运行代表一个PHP线程,该线程是在构造时创建(并引导)的。然后,线程等待任务被调度:已调度的任务将以FIFO的方式执行,然后线程将继续等待,直到调度了更多任务,或者被PHP对象的常规作用域规则关闭,杀死或破坏了任务。
  2. 当运行时被PHP对象的常规作用域规则破坏时,它将首先执行所有已调度的任务,并在执行时阻塞。

运行时引导

  1. 创建新的运行时,它不会与创建它的线程(或进程)共享代码。这意味着它没有加载相同的类和函数,也没有相同的自动加载器集。
  2. 在这些任务确实需要访问相同代码的情况下,将自动加载器设置为引导程序就足够了。
  3. 预加载可以与并行结合使用,在这种情况下,预加载的代码无需引导即可使用
// 构建一个新的运行对象,使用文件$bootstrap(通常是自动加载器)引导,可以不使用引导
$runtime = new \parallel\Runtime($bootstrap);

// 并行执行闭包$task,数组$argv为闭包的参数,可不传,参数中不能包含引用、资源、使用自定义结构的内部对象;parallel\Channel是编写并行代码的核心,必要时支持并发访问和执行,因此可以共享
// 返回值为\parallel\Future对象
$future = $runtime->run($task, $agrv);

// 关闭运行,计划执行的任务将在关闭之前执行
$runtime->close();

// 尝试强制关闭运行,计划执行的任务将不会执行,当前正在运行的任务将被中断
$runtime->kill();

parallel\Future类

获取任务的返回值和未捕获的异常

// 获取任务的返回
$future->value();

// 取消任务,如果任务正在运行,它将被中断,成功返回true,失败返回false
$future->cancel();

// 查询任务是否被取消,取消返回true,否则返回false
$future->cancelled();

// 查询任务是否完成,完成返回true,否则返回false
$future->done();

parallel\Channel类

无缓冲通道

一个无缓冲的通道将阻塞对parallel\Channel::send()的调用,直到有接收者为止,并阻塞对parallel\Channel::recv()的调用,直到有一个发送者。这意味着无缓冲通道不仅是在任务之间共享数据的一种方式,而且还是一种简单的同步方法。无缓冲通道是在任务之间共享数据的最快方法,需要最少的复制。

缓冲通道

在达到容量之前,缓冲的通道不会阻塞对parallel\Channel::send()的调用,直到缓冲区中有数据之前,对parallel\Channel::recv()的调用才会阻塞。

并行通道

并行通道的强大功能是它们允许在任务(和运行时)之间交换闭包。当通过通道发送闭包时,闭包会被缓冲,它不会更改传输闭包的通道的缓冲,但是会影响闭包内部的静态作用域:同一闭包发送到不同的运行时,或同一运行时,不会共享其静态范围。
这意味着无论何时执行由通道传输的闭包,静态状态都将与缓冲闭包时的状态相同。

匿名频道

匿名通道构造函数可以避免为每个通道分配名称:parallel将为匿名通道生成一个唯一的名称

创建通道

// 创建一个匿名无缓冲通道
$channel = new \parallel\Channel();

// 创建一个容量为$capacity的匿名缓冲通道
$channel = new \parallel\Channel($capacity);

// 创建一个名称为$name的无缓冲通道,如果通道已存在,则应抛出parallel\Channel\Error\Existence
$channel = \parallel\Channel::make($name);

// 创建一个容量为$capacity的名称为$name的缓冲通道,如果通道已存在,则应抛出parallel\Channel\Error\Existence
$channel = \parallel\Channel::make($name, $capacity);

// 打开指定名称$name通道,通道不存在,则抛出parallel\Channel\Error\Existence。
$channel = \parallel\Channel::open($name);

// 关闭通道
$channel->close();

使用通道

// 在通道中发送一个值
$channel->send($value);

// 从通道中接收一个值
$channel->recv();

 

{/if}