PHP中的协程
之前学习Lua的时候第一次接触到了协程(coroutine)的概念。而PHP5.5版本中也加入了协程的概念,从此PHP编程又有了新的思路和玩法。这里学习一下PHP中协程的相关概念的使用方法。
分成上下两篇文章吧,这篇主要讲一下基础概念。
协程是什么?
在以前的Lua学习笔记三中可以看到,协程与多线程的比较,有自己的堆栈、局部变量、指令指针等,但是协程本身与其他协程共享全局变量。主要不同在于,多处理器下,多线程可以真实的同时运行多个线程。而协程任意时刻只能有一个在真实运行,并且只有在明确要求被挂起时才会挂起。
PHP中协程如何理解?
这里引用知乎赵老师的答案,说的比较好理解。具体来说,一个包含yeild的php函数,就是协程,他有阶段性的结算值 yield $var, 但是代码并不返回,php的调度者接到这个值后,喂给一个generator,generator是个实现了iterator接口的+和协程通讯接口(比如send方法)的实例,所以可以用在for循环里(另个接口负责和协程通讯)。那么generator收到了这个协程的阶段性的值后,他喂给for循环,等for循环下一次循环的时候,他又启动这个协程,协程从上次中断的点继续执行,继续计算,继续yeild值给generator,generator喂for循环,继续循环,直到协程执行完毕。
相关函数
final class Generator implements Iterator {
public function rewind(); // 返回到迭代器的第一个元素。
public function valid(); // 返回false如果迭代器已经关闭,否则返回true
public function current(); // 返回当前yield值.
public function key(); // 返回当前yield键名.
public function next(); // 恢复生成器的执行。
public function PS_UNRESERVE_PREFIX_throw(Exception $exception) {};//抛出异常
public function send($value); // 将传入的值作为yield表达式的结果并且恢复发生器的执行。
}
简单例子
简单的迭代器给foreach使用
function my_range($start, $end, $step = 1) {
for ($i = $start; $i <= $end; $i += $step) {
yield $i;
}
}
foreach (my_range(1, 5) as $num) {
echo $num;
}
//output 12345
带send可以交互的例子
function gen() {
$ret = (yield 'a');
echo $ret;
$ret = (yield 'b');
echo $ret;
}
$gen = gen();
$ret = $gen->current();
echo $ret;
$ret = $gen->send("c");
echo $ret;
$ret = $gen->send("d");
echo $ret;
//output acbd
带抛出异常的例子
function gen() {
try{
$ret = (yield 'a');
echo $ret;
$ret = (yield 'b');
echo $ret;
} catch (Exception $ex) {
echo $ex->getMessage();
}
}
$gen = gen();
$ret = $gen->current();
echo $ret;
$ret = $gen->send("c");
echo $ret;
$ret = $gen->throw(new Exception("d"));
var_dump($ret);
//output acbdNULL
那么能用来干什么呢?
我们来看看,协程可以自己主动出让执行权,把不需要抢占的操作时间(比如socket等待链接)让出来,并且可以和调用方通过yield的方式传递信息。显而易见,他可以用来做多任务调度!
PHP中协程实现多任务调度,鸟哥有一篇翻译的文章里有讲解,网上能找到的大部分资料,都跟这篇相关。但是至少在我看来,理解起来还是蛮复杂的。这里针对那篇文章的前半部分做一个笔记,忽略后面关于独立堆栈协程的部分。
function里使用yield关键字,将生成迭代器。这样调用functionName()时,其实得到的是一个迭代器对象,而并没有实际运行程序。
为什么要走系统调用SystemCall这一层呢?模拟进程和系统会话的方式,控制权限。通过给yield表达式传递信息来与调度器通信,yield既是中断也是传递给调度器的方式。
SystemCall 包含一个回调函数,他自己本身可以被执行。被执行时实际上是调用了这个回调函数,入参是某个task和调度器。
SystemCall其实并没有其他作用,只是在协程函数里面跟在yield后面传给调度器来执行。
注意SplQueue塞进去的对象其实是引用(PHP里对象入参都是引用,不只是SplQueue)!外面对象改了,里面也会变。
为什么忽略协程堆栈?
我打算在第二篇文章中,把有赞的zan framework里关于协程的部分抽出来,针对性的说一下包含子协程额多任务调度。当然主要想偷个懒。
不过个人感觉zan框架里的协程部分,比之前说的那篇文章要好理解一些。
上篇文章里提到PHP中协程的引入,可以使PHP编程有新的玩法,不在遵循原本顺序执行的思路,从而应对大访问量和并发操作。
有赞的zan framework就是基于PHP协程的,提供最简单的方式开发面向C10K+的高并发HTTP服务或SOA服务。我并没有深入的学习这个框架,这里只打算把关于协程的部分抽出来学习一下。
zan框架高并发设计思路
粗看之下(不一定对哦),框架应该是用swoole_server + 协程解决高并发访问。
比如Web服务中,swoole的http_server只开启了少数的几个worker进程。我们知道,如果worker的onRequest里使用的是异步方法,则worker的响应是异步处理的,反之则是阻塞的。
zan框架在worker进程中大量使用了PHP协程,所以一个worker进程可以响应很多并发的请求(但是本质上正在执行的只有一个),这应该就是能过达到C10K+的原因吧。
并且协程相对于回调的方式,在PHPer看来更容易接受吧。另外,框架设计里还使用了middleware、连接池、依赖注入等等比较现代的设计,感觉可以更深入的学习一下。
一个类一个类来看
我们把框架里关于协程的部分拆出来看,下面一个类一个类的分析。与鸟哥博客里那个文章的实现相比,有一些相同的地方,更多的是不同。比如那篇文章里,多个任务放到一个schedule里调度,对于后面的实现就比较繁琐。
这里把框架里的代码抽出来,并进行了一定删减,去掉了与其他业务强相关的东西。比如Event、Context、Async等等。实际上Async用于处理MySQL查询的返回值的,框架内部将MySQL的具体操作类封装成了Async的子类,并且yield给调度器来用。
Singal类
Singal类里包含了系统调用所需的信号量。指明了协程在一轮运行之后应该处于的状态。
class Signal
{
const TASK_SLEEP = 1;
const TASK_AWAKE = 2;
const TASK_CONTINUE = 3;
const TASK_KILLED = 4;
const TASK_RUNNING = 5;
const TASK_WAIT = 6;
const TASK_DONE = 7;
public static function isSignal($signal) {
if(!$signal) {
return false;
}
if (!is_int($signal)) {
return false;
}
if($signal < 1 ) {
return false;
}
if($signal > 7) {
return false;
}
return true;
}
}
Task类
Task包装了具体协程函数,并提供相应的get set方法。与网上流行的那篇文章(以下简称那文)不同的是,我们的scheduler是内置于Task里的,在run方法里实现具体的调度。
这里我们省略了Context,并且让taskId自增。
class Task
{
protected $taskId = 0;
protected $parentId = 0;
protected $coroutine = null;
//这里忽略了context 保存的是当前http请求的相关信息,可以通过系统调用的方式操作
protected $context = null;
protected $sendValue = null;
protected $scheduler = null;
protected $status = 0;
public function __construct(Generator $coroutine, $taskId = 0, $parentId = 0) {
$this->coroutine = $coroutine;
if(isset($GLOBALS['stTaskId']) && $taskId == 0){
global $stTaskId;
$taskId = $stTaskId ++;
}
$this->taskId = $taskId;
$this->parentId = $parentId;
$this->scheduler = new Scheduler($this);
}
/**
* 静态方法调用
* @param $coroutine
* @param int $taskId
* @param int $parentId
* @return Task
*/
public static function execute($coroutine, $taskId = 0, $parentId = 0) {
if ($coroutine instanceof Generator) {
if(isset($GLOBALS['stTaskId']) && $taskId == 0){
global $stTaskId;
$taskId = $stTaskId ++;
}
$task = new Task($coroutine, $taskId, $parentId);
$task->run();
return $task;
}
return $coroutine;
}
public function run() {
while (true) {
try {
if ($this->status == Signal::TASK_KILLED){
$this->fireTaskDoneEvent();
break;
}
$this->status = $this->scheduler->schedule();
//以下几种状态表示信号量,实际上已经从while里跳出来了。如果需要继续的话,会在其他地方重启。
switch ($this->status) {
case Signal::TASK_KILLED:
case Signal::TASK_SLEEP:
case Signal::TASK_WAIT:
return null;
case Signal::TASK_DONE:
$this->fireTaskDoneEvent();
return null;
}
} catch (Exception $e) {
$this->scheduler->throwException($e);
}
}
}
public function send($value) {
$this->sendValue = $value;
return $this->coroutine->send($value);
}
public function getTaskId() {
return $this->taskId;
}
public function getContext() {
return $this->context;
}
public function getSendValue() {
return $this->sendValue;
}
public function getResult() {
return $this->sendValue;
}
public function getStatus() {
return $this->status;
}
public function setStatus($signal) {
$this->status = $signal;
}
public function getCoroutine() {
return $this->coroutine;
}
public function setCoroutine(Generator $coroutine) {
$this->coroutine = $coroutine;
}
public function fireTaskDoneEvent() {
echo "Task done $this->taskId\n";
}
}
Scheduler类
scheduler类负责:
获取Task里的协程函数跑完一轮的返回值
根据返回值的类型采取不同的处理方式,如系统调用、子协程、普通yield值、检查协程栈等等。
在子协程的调用过程中,负责父子协程的进栈出栈,yield值的传递等等。
class Scheduler
{
private $task = null;
private $stack = null;
public function __construct(Task $task)
{
$this->task = $task;
$this->stack = new SplStack();
}
public function schedule()
{
$coroutine = $this->task->getCoroutine();
$value = $coroutine->current();
$signal = $this->handleSysCall($value);
if ($signal !== null) return $signal;
$signal = $this->handleCoroutine($value);
if ($signal !== null) return $signal;
$signal = $this->handleYieldValue($value);
if ($signal !== null) return $signal;
$signal = $this->handleTaskStack($value);
if ($signal !== null) return $signal;
$signal = $this->checkTaskDone($value);
if ($signal !== null) return $signal;
return Signal::TASK_DONE;
}
public function isStackEmpty()
{
return $this->stack->isEmpty();
}
public function throwException($e, $isFirstCall = false)
{
if ($this->isStackEmpty()) {
$this->task->getCoroutine()->throw($e);
return;
}
try{
if ($isFirstCall) {
$coroutine = $this->task->getCoroutine();
} else {
$coroutine = $this->stack->pop();
}
$this->task->setCoroutine($coroutine);
$coroutine->throw($e);
$this->task->run();
}catch (Exception $e){
$this->throwException($e);
}
}
/**
* 处理系统调用
* @param $value
* @return mixed|null
*/
private function handleSysCall($value)
{
if (!($value instanceof SysCall)
&& !is_subclass_of($value, SysCall::class)
) {
return null;
}
echo $this->task->getTaskId()."| SYSCALL\n";
//走系统调用 实际上因为__invoke 走的是 $value($this->task);
$signal = call_user_func($value, $this->task);
if (Signal::isSignal($signal)) {
return $signal;
}
return null;
}
/**
* 处理子协程
* @param $value
* @return int|null
*/
private function handleCoroutine($value)
{
if (!($value instanceof Generator)) {
return null;
}
echo $this->task->getTaskId()."| COROUTINE\n";
//获取当前的协程 入栈
$coroutine = $this->task->getCoroutine();
$this->stack->push($coroutine);
//将新的协程设为当前的协程
$this->task->setCoroutine($value);
return Signal::TASK_CONTINUE;
}
/**
* 处理协程栈
* @param $value
* @return int|null
*/
private function handleTaskStack($value)
{
//能够跑到这里说明当前协程已经跑完了 valid()==false了 需要看下栈里是否还有以前的协程
if ($this->isStackEmpty()) {
return null;
}
echo $this->task->getTaskId()."| TASKSTACK\n";
//出栈 设置为当前运行的协程
$coroutine = $this->stack->pop();
$this->task->setCoroutine($coroutine);
//这个sendvalue可能是从刚跑完的协程那里得到的 把它当做send值传给老协程 让他继续跑
$value = $this->task->getSendValue();
$this->task->send($value);
return Signal::TASK_CONTINUE;
}
/**
* 处理普通的yield值
* @param $value
* @return int|null
*/
private function handleYieldValue($value)
{
$coroutine = $this->task->getCoroutine();
if (!$coroutine->valid()) {
return null;
}
// if($this->task->getTaskId() == 2){
//
// }else{
echo $this->task->getTaskId()."| YIELD VALUE\n";
// }
//如果协程后面没有yield了 这里发出send以后valid就变成false了 并且current变成NULL
$status = $this->task->send($value);
return Signal::TASK_CONTINUE;
}
private function checkTaskDone($value)
{
$coroutine = $this->task->getCoroutine();
if ($coroutine->valid()) {
return null;
}
echo $this->task->getTaskId()."| CHECKDONE\n";
return Signal::TASK_DONE;
}
}
SysCall类
与那文的思路相同,系统调用类一般作为yield后面跟着的值吐给外层的调用方来执行,并且可能返回响应的信号量,标识这个Task是继续运行还是进入等待状态中。
不同的是这里的__invoke入参不需要Scheduler。
class SysCall
{
protected $callback = null;
public function __construct(\Closure $callback)
{
$this->callback = $callback;
}
public function __invoke(Task $task)
{
return call_user_func($this->callback, $task);
}
}
组装起来!
基本的组件就是上面的几个类了,下面举一些实际的例子,说明如何利用这几个看似简陋的组件来搞大新闻。
延迟执行任务
function taskSleep($ms)
{
return new SysCall(function (Task $task) use ($ms) {
swoole_timer_after($ms, function() use($task){
$task->send("this is send value in sleep function.");
$task->run();
});
return Signal::TASK_SLEEP;
});
}
function delay(){
yield taskSleep(2000);
}
function gen(){
echo "gen1\n";
yield 1;
echo "gen2\n";
yield 2;
echo "gen3\n";
yield 3;
}
//Task::execute(delay(), 1); 亦可
(new Task(delay(), 1))->run();
(new Task(gen(), 2))->run();
/** output
1| SYSCALL
gen1
2| YIELD VALUE
gen2
2| YIELD VALUE
gen3
2| YIELD VALUE
2| CHECKDONE
Task done 2
//2秒以后//
1| CHECKDONE
Task done 1
**/
taskSleep是个系统调用,告诉调度器我要睡眠了(传递给他一个Signal::TASK_SLEEP)。具体说明时候唤醒呢,要等swoole_timer_after2秒以后将它唤醒。
我们这里同时跑了两个任务,从输出来看第一个任务的延时执行,并不会阻塞第二个任务。可以清楚地看到,我们的协程是可以实现多任务并行处理的(当然实际上并不是并行)。
独立堆栈的子协程
function justReturnValue(){
yield (delay());
yield 'yield value 2';
}
function gen2(){
$ret1 = (yield "yield value 1");
echo "[ret] $ret1\n";
$ret2 = (yield justReturnValue());
echo "[ret] $ret2\n";
}
(new Task(gen2(), 1))->run();
/** output
1| YIELD VALUE
[ret] yield value 1
1| COROUTINE
1| COROUTINE
1| SYSCALL
// 2秒以后 //
1| TASKSTACK
1| YIELD VALUE
1| TASKSTACK
[ret] yield value 2
1| CHECKDONE
Task done 1
**/
gen2里有一个子协程justReturnValue的调用,而justReturnValue里也有delay的子协程调用。通过输出可以清楚的看到,父子协程进栈出栈的顺序,以及出栈的协程会将吐出来的值交给原先的协程。
实现一个非阻塞IO的Web服务
参照那文里的实现,我们也可以写一个自己的Web服务。首先还是来说明一下要做什么,以及思路。
直接引用那文的说法:
有一个任务是在套接字上侦听是否有新连接,当有新连接要建立的时候,它创建一个新任务来处理新连接。
Web服务器最难的部分通常是像读数据这样的套接字操作是阻塞的。例如PHP将等待到客户端完成发送为止。对一个Web服务器来说,这有点不太高效。因为服务器在一个时间点上只能处理一个连接。
解决方案是确保在真正对套接字读写之前该套接字已经“准备就绪”. 为了查找哪个套接字已经准备好读或者写了, 可以使用 流选择函数
传统的做法中,创建一个套接字,等待新连接,然后读取、发送、关闭。这些都是阻塞的,会花时间在这些抢占资源的步骤上。如果我们使用协程的方式,可以先将等待操作的任务yield掉,之后结合stream_select方法,选择出可以继续操作的任务将其resume。
通俗的说,可以理解为大家一起挤公交车,原先必须一个一个上,但是上车以后要找公交卡,要刷卡或者投币,操作完了以后下一个乘客才能继续。如果使用协程的话,第一个乘客A上车以后,挂起到一边找公交卡,不影响第二个乘客B上车。等到A掏出公交卡以后,直接插队刷卡上车。虽然还是一个一个排队上车,但是找卡的时间里其他乘客不会干等了。
socket的状态
首先定义2个全局数组保存所有用到的socket。再定义2个系统调用将socket设置进数组里,并且返回等待信号量让Task挂起。
$waitingForRead = [];
$waitingForWrite = [];
function waitForRead($socket) {
return new SysCall(
function(Task $task) use ($socket) {
global $waitingForRead;
if (isset($waitingForRead[(int) $socket])) {
$waitingForRead[(int) $socket][1][] = $task;
} else {
$waitingForRead[(int) $socket] = [$socket, [$task]];
}
//设置完了不让他往下走
return Signal::TASK_WAIT;
}
);
}
function waitForWrite($socket) {
return new SysCall(
function(Task $task) use ($socket) {
global $waitingForWrite;
if (isset($waitingForWrite[(int) $socket])) {
$waitingForWrite[(int) $socket][1][] = $task;
} else {
$waitingForWrite[(int) $socket] = [$socket, [$task]];
}
//设置完了不让他往下走
return Signal::TASK_WAIT;
}
);
}
选择可以操作的socket
注册一个任务,不断检查我们的全局数组,直到有socket就绪了,将其对应的任务唤醒。
function ioPoll($timeout) {
global $waitingForRead;
global $waitingForWrite;
$rSocks = [];
foreach ($waitingForRead as list($socket)) {
$rSocks[] = $socket;
}
$wSocks = [];
foreach ($waitingForWrite as list($socket)) {
$wSocks[] = $socket;
}
$eSocks = []; // dummy
//stream_select 方法会直接修改入参 只保留就绪的socket数组
if (false === stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
return;
}
foreach ($rSocks as $socket) {
list(, $tasks) = $waitingForRead[(int) $socket];
unset($waitingForRead[(int) $socket]);
foreach ($tasks as $task) {
$task->send("ready for read");
$task->run();
}
}
foreach ($wSocks as $socket) {
list(, $tasks) = $waitingForWrite[(int) $socket];
unset($waitingForWrite[(int) $socket]);
foreach ($tasks as $task) {
$task->send("ready for write");
$task->run();
}
}
}
function ioPollTask() {
global $waitingForRead;
global $waitingForWrite;
while (true) {
if(count($waitingForRead) <=1 && count($waitingForWrite) <=1){
//如果等待检查的socket只有1个 则用阻塞的方式等待
ioPoll(null);
}else{
//否则设为0超时
ioPoll(0);
}
yield;
}
}
封装socket
将socket封装一下,定义了必须的4个方法。
class CoSocket {
protected $socket;
public function __construct($socket) {
$this->socket = $socket;
}
public function accept() {
//等待本socket就绪
yield waitForRead($this->socket);
//就绪以后会继续走到这里 返回给外层一个客户端连接socket
yield stream_socket_accept($this->socket, 0);
}
public function read($size) {
//等待本socket就绪
yield waitForRead($this->socket);
//就绪以后回把读取到的内容 返回给外层
yield fread($this->socket, $size);
}
public function write($string) {
//等待本socket就绪
yield waitForWrite($this->socket);
//就绪以后把响应写给客户端
fwrite($this->socket, $string);
}
public function close() {
@fclose($this->socket);
}
}
处理客户端新连接
服务端socket接受到新的连接以后,创建新的任务。下面是这个任务里实际运行的协程。
function handleClient(CoSocket $socket) {
$data = (yield $socket->read(8192));
$msg = "Received following request:\n\n$data";
$msgLength = strlen($msg);
//响应报文由状态行(HTTP版本、状态码)+HTTP首部字段(响应首部字段、通用首部字段、实体首部字段)组成。
//空行(CR+LF)分隔首部与报文主体。所以这里留个空行在打印$msg
$response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
yield $socket->write($response);
yield $socket->close();
}
开启服务!
直接创建一个包含server协程的任务,和一个不断刷新stream_select的任务。之后的流程都交给ioPollTask来调度了。
//定义全局的taskId自增用
static $stTaskId = 1;
function server($port){
echo "Starting server at port $port...\n";
//这里抛出的异常会被scheduler和task抛来抛去 最后还是到这里catch一下
try{
$socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
if (!$socket) throw new Exception($errStr, $errNo);
//设置为读写非阻塞
stream_set_blocking($socket, 0);
$socket = new CoSocket($socket);
while (true) {
$clientSocket = (yield $socket->accept());
$clientCoSocket = new CoSocket($clientSocket);
//为新的链接创建Task
Task::execute(handleClient($clientCoSocket));
}
}catch (Exception $e){
echo $e->getMessage();
}
}
//创建服务端socket的task 1
Task::execute(server(8000));
//不断刷新socket_select的task 2
Task::execute(ioPollTask());
运行效果
开启服务后,我们先直接用curl访问,观测一下得到的结果。
➜ ~ curl -d "a=123&b=456" http://localhost:8000
Received following request:
POST / HTTP/1.1
Host: localhost:8000
User-Agent: curl/7.51.0
Accept: /
Content-Length: 11
Content-Type: application/x-www-form-urlencoded
a=123&b=456
可以看到服务端吐出了我们发送给他的信息,包括HTTP请求行、首部字段和正文。如果我们在浏览器里访问的话,正文内容会丰富许多,会有Cookie,UA等等,如下:
Received following request:
GET / HTTP/1.1
Host: localhost:8000
Connection: keep-alive
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,/;q=0.8
DontTrackMeHere: gzip, deflate, sdch, br
Accept-Language: zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4
Cookie: Phpstorm-f86ac615=34137ba0-5113-4922-b809-b6fa20dbf937
不足的地方
zan framework里的协程调度,并没有采用任务队列的方式。可能是因为他只是针对单独的http或者tcp请求来设计的吧,一般是链式调用。由于这个原因,所以没法设置具体某个任务的执行顺序。当然实际效果跟那文里是相同的,说到底还是由ioPollTask来驱动。
小结
我们通过几个例子加深了对PHP中协程用法的理解。需要注意的是,在协程中(本文构造的这种结构)我们要避免使用死循环,除非循环里yield的结果可以将其挂起并出让控制权给其他协程。
比如上文的Web服务器中,因为有不能主动挂起的ioPollTask,所以不能实现在响应时延迟几秒的效果。因为即使使用了taskSleep这种系统调用,也会因为ioPollTask死循环导致不能获取控制权无法执行
评论 (0)