PHP中的协程

dafenqi
2023-08-10 / 0 评论 / 24 阅读 / 正在检测是否收录...

PHP中的协程

之前学习Lua的时候第一次接触到了协程(coroutine)的概念。而PHP5.5版本中也加入了协程的概念,从此PHP编程又有了新的思路和玩法。这里学习一下PHP中协程的相关概念的使用方法。

分成上下两篇文章吧,这篇主要讲一下基础概念。

协程是什么?
在以前的Lua学习笔记三中可以看到,协程与多线程的比较,有自己的堆栈、局部变量、指令指针等,但是协程本身与其他协程共享全局变量。主要不同在于,多处理器下,多线程可以真实的同时运行多个线程。而协程任意时刻只能有一个在真实运行,并且只有在明确要求被挂起时才会挂起。

PHP中协程如何理解?
这里引用知乎赵老师的答案,说的比较好理解。具体来说,一个包含yeild的php函数,就是协程,他有阶段性的结算值 yield $var, 但是代码并不返回,php的调度者接到这个值后,喂给一个generator,generator是个实现了iterator接口的+和协程通讯接口(比如send方法)的实例,所以可以用在for循环里(另个接口负责和协程通讯)。那么generator收到了这个协程的阶段性的值后,他喂给for循环,等for循环下一次循环的时候,他又启动这个协程,协程从上次中断的点继续执行,继续计算,继续yeild值给generator,generator喂for循环,继续循环,直到协程执行完毕。

相关函数
final class Generator implements Iterator {

public function rewind();     // 返回到迭代器的第一个元素。
public function valid();      // 返回false如果迭代器已经关闭,否则返回true
public function current();    // 返回当前yield值.
public function key();        // 返回当前yield键名.
public function next();       // 恢复生成器的执行。
public function PS_UNRESERVE_PREFIX_throw(Exception $exception) {};//抛出异常
public function send($value); // 将传入的值作为yield表达式的结果并且恢复发生器的执行。

}
简单例子
简单的迭代器给foreach使用

function my_range($start, $end, $step = 1) {

for ($i = $start; $i <= $end; $i += $step) {
    yield $i;
}

}
foreach (my_range(1, 5) as $num) {

echo $num;

}
//output 12345
带send可以交互的例子

function gen() {

$ret = (yield 'a');
echo $ret;
$ret = (yield 'b');
echo $ret;

}

$gen = gen();
$ret = $gen->current();
echo $ret;
$ret = $gen->send("c");
echo $ret;
$ret = $gen->send("d");
echo $ret;
//output acbd
带抛出异常的例子

function gen() {

try{
    $ret = (yield 'a');
    echo $ret;
    $ret = (yield 'b');
    echo $ret;
} catch (Exception $ex) {
    echo $ex->getMessage();
}

}

$gen = gen();
$ret = $gen->current();
echo $ret;
$ret = $gen->send("c");
echo $ret;
$ret = $gen->throw(new Exception("d"));
var_dump($ret);

//output acbdNULL
那么能用来干什么呢?
我们来看看,协程可以自己主动出让执行权,把不需要抢占的操作时间(比如socket等待链接)让出来,并且可以和调用方通过yield的方式传递信息。显而易见,他可以用来做多任务调度!

PHP中协程实现多任务调度,鸟哥有一篇翻译的文章里有讲解,网上能找到的大部分资料,都跟这篇相关。但是至少在我看来,理解起来还是蛮复杂的。这里针对那篇文章的前半部分做一个笔记,忽略后面关于独立堆栈协程的部分。

function里使用yield关键字,将生成迭代器。这样调用functionName()时,其实得到的是一个迭代器对象,而并没有实际运行程序。
为什么要走系统调用SystemCall这一层呢?模拟进程和系统会话的方式,控制权限。通过给yield表达式传递信息来与调度器通信,yield既是中断也是传递给调度器的方式。
SystemCall 包含一个回调函数,他自己本身可以被执行。被执行时实际上是调用了这个回调函数,入参是某个task和调度器。
SystemCall其实并没有其他作用,只是在协程函数里面跟在yield后面传给调度器来执行。
注意SplQueue塞进去的对象其实是引用(PHP里对象入参都是引用,不只是SplQueue)!外面对象改了,里面也会变。
为什么忽略协程堆栈?
我打算在第二篇文章中,把有赞的zan framework里关于协程的部分抽出来,针对性的说一下包含子协程额多任务调度。当然主要想偷个懒。

不过个人感觉zan框架里的协程部分,比之前说的那篇文章要好理解一些。

上篇文章里提到PHP中协程的引入,可以使PHP编程有新的玩法,不在遵循原本顺序执行的思路,从而应对大访问量和并发操作。

有赞的zan framework就是基于PHP协程的,提供最简单的方式开发面向C10K+的高并发HTTP服务或SOA服务。我并没有深入的学习这个框架,这里只打算把关于协程的部分抽出来学习一下。

zan框架高并发设计思路
粗看之下(不一定对哦),框架应该是用swoole_server + 协程解决高并发访问。

比如Web服务中,swoole的http_server只开启了少数的几个worker进程。我们知道,如果worker的onRequest里使用的是异步方法,则worker的响应是异步处理的,反之则是阻塞的。

zan框架在worker进程中大量使用了PHP协程,所以一个worker进程可以响应很多并发的请求(但是本质上正在执行的只有一个),这应该就是能过达到C10K+的原因吧。

并且协程相对于回调的方式,在PHPer看来更容易接受吧。另外,框架设计里还使用了middleware、连接池、依赖注入等等比较现代的设计,感觉可以更深入的学习一下。

一个类一个类来看
我们把框架里关于协程的部分拆出来看,下面一个类一个类的分析。与鸟哥博客里那个文章的实现相比,有一些相同的地方,更多的是不同。比如那篇文章里,多个任务放到一个schedule里调度,对于后面的实现就比较繁琐。

这里把框架里的代码抽出来,并进行了一定删减,去掉了与其他业务强相关的东西。比如Event、Context、Async等等。实际上Async用于处理MySQL查询的返回值的,框架内部将MySQL的具体操作类封装成了Async的子类,并且yield给调度器来用。

Singal类
Singal类里包含了系统调用所需的信号量。指明了协程在一轮运行之后应该处于的状态。

class Signal
{

const TASK_SLEEP        = 1;
const TASK_AWAKE        = 2;
const TASK_CONTINUE     = 3;
const TASK_KILLED       = 4;
const TASK_RUNNING      = 5;
const TASK_WAIT         = 6;
const TASK_DONE         = 7;

public static function isSignal($signal) {
    if(!$signal) {
        return false;
    }

    if (!is_int($signal)) {
        return false;
    }

    if($signal < 1 ) {
        return false;
    }

    if($signal > 7) {
        return false;
    }

    return true;
}

}
Task类
Task包装了具体协程函数,并提供相应的get set方法。与网上流行的那篇文章(以下简称那文)不同的是,我们的scheduler是内置于Task里的,在run方法里实现具体的调度。

这里我们省略了Context,并且让taskId自增。

class Task
{

protected $taskId = 0;
protected $parentId = 0;
protected $coroutine = null;
//这里忽略了context 保存的是当前http请求的相关信息,可以通过系统调用的方式操作
protected $context = null;

protected $sendValue = null;
protected $scheduler = null;
protected $status = 0;

public function __construct(Generator $coroutine, $taskId = 0, $parentId = 0) {
    $this->coroutine = $coroutine;
    if(isset($GLOBALS['stTaskId']) && $taskId == 0){
        global $stTaskId;
        $taskId = $stTaskId ++;
    }
    $this->taskId = $taskId;
    $this->parentId = $parentId;
    $this->scheduler = new Scheduler($this);
}

/**
 * 静态方法调用
 * @param $coroutine
 * @param int $taskId
 * @param int $parentId
 * @return Task
 */
public static function execute($coroutine, $taskId = 0, $parentId = 0) {
    if ($coroutine instanceof Generator) {
        if(isset($GLOBALS['stTaskId']) && $taskId == 0){
            global $stTaskId;
            $taskId = $stTaskId ++;
        }
        $task = new Task($coroutine, $taskId, $parentId);
        $task->run();
        return $task;
    }
    return $coroutine;
}

public function run() {
    while (true) {
        try {
            if ($this->status == Signal::TASK_KILLED){
                $this->fireTaskDoneEvent();
                break;
            }
            $this->status = $this->scheduler->schedule();
            //以下几种状态表示信号量,实际上已经从while里跳出来了。如果需要继续的话,会在其他地方重启。
            switch ($this->status) {
                case Signal::TASK_KILLED:
                case Signal::TASK_SLEEP:
                case Signal::TASK_WAIT:
                    return null;
                case Signal::TASK_DONE:
                    $this->fireTaskDoneEvent();
                    return null;
            }
        } catch (Exception $e) {
            $this->scheduler->throwException($e);
        }
    }
}

public function send($value) {
    $this->sendValue = $value;
    return $this->coroutine->send($value);
}

public function getTaskId() {
    return $this->taskId;
}

public function getContext() {
    return $this->context;
}

public function getSendValue() {
    return $this->sendValue;
}

public function getResult() {
    return $this->sendValue;
}

public function getStatus() {
    return $this->status;
}

public function setStatus($signal) {
    $this->status = $signal;
}

public function getCoroutine() {
    return $this->coroutine;
}

public function setCoroutine(Generator $coroutine) {
    $this->coroutine = $coroutine;
}

public function fireTaskDoneEvent() {
    echo "Task done $this->taskId\n";
}

}
Scheduler类
scheduler类负责:

获取Task里的协程函数跑完一轮的返回值
根据返回值的类型采取不同的处理方式,如系统调用、子协程、普通yield值、检查协程栈等等。
在子协程的调用过程中,负责父子协程的进栈出栈,yield值的传递等等。
class Scheduler
{

private $task = null;
private $stack = null;

public function __construct(Task $task)
{
    $this->task = $task;
    $this->stack = new SplStack();
}

public function schedule()
{
    $coroutine = $this->task->getCoroutine();

    $value = $coroutine->current();

    $signal = $this->handleSysCall($value);
    if ($signal !== null) return $signal;

    $signal = $this->handleCoroutine($value);
    if ($signal !== null) return $signal;

    $signal = $this->handleYieldValue($value);
    if ($signal !== null) return $signal;

    $signal = $this->handleTaskStack($value);
    if ($signal !== null) return $signal;

    $signal = $this->checkTaskDone($value);
    if ($signal !== null) return $signal;

    return Signal::TASK_DONE;
}

public function isStackEmpty()
{
    return $this->stack->isEmpty();
}

public function throwException($e, $isFirstCall = false)
{
    if ($this->isStackEmpty()) {
        $this->task->getCoroutine()->throw($e);
        return;
    }

    try{
        if ($isFirstCall) {
            $coroutine = $this->task->getCoroutine();
        } else {
            $coroutine = $this->stack->pop();
        }

        $this->task->setCoroutine($coroutine);
        $coroutine->throw($e);

        $this->task->run();
    }catch (Exception $e){
        $this->throwException($e);
    }
}

/**
 * 处理系统调用
 * @param $value
 * @return mixed|null
 */
private function handleSysCall($value)
{
    if (!($value instanceof SysCall)
        && !is_subclass_of($value, SysCall::class)
    ) {
        return null;
    }
    echo $this->task->getTaskId()."| SYSCALL\n";
    //走系统调用 实际上因为__invoke 走的是 $value($this->task);
    $signal = call_user_func($value, $this->task);
    if (Signal::isSignal($signal)) {
        return $signal;
    }

    return null;
}

/**
 * 处理子协程
 * @param $value
 * @return int|null
 */
private function handleCoroutine($value)
{
    if (!($value instanceof Generator)) {
        return null;
    }
    echo $this->task->getTaskId()."| COROUTINE\n";
    //获取当前的协程 入栈
    $coroutine = $this->task->getCoroutine();
    $this->stack->push($coroutine);
    //将新的协程设为当前的协程
    $this->task->setCoroutine($value);

    return Signal::TASK_CONTINUE;
}

/**
 * 处理协程栈
 * @param $value
 * @return int|null
 */
private function handleTaskStack($value)
{
    //能够跑到这里说明当前协程已经跑完了 valid()==false了 需要看下栈里是否还有以前的协程
    if ($this->isStackEmpty()) {
        return null;
    }

    echo $this->task->getTaskId()."| TASKSTACK\n";
    //出栈 设置为当前运行的协程
    $coroutine = $this->stack->pop();
    $this->task->setCoroutine($coroutine);

    //这个sendvalue可能是从刚跑完的协程那里得到的 把它当做send值传给老协程 让他继续跑
    $value = $this->task->getSendValue();
    $this->task->send($value);

    return Signal::TASK_CONTINUE;
}

/**
 * 处理普通的yield值
 * @param $value
 * @return int|null
 */
private function handleYieldValue($value)
{
    $coroutine = $this->task->getCoroutine();
    if (!$coroutine->valid()) {
        return null;
    }

// if($this->task->getTaskId() == 2){
//
// }else{

        echo $this->task->getTaskId()."| YIELD VALUE\n";

// }

    //如果协程后面没有yield了 这里发出send以后valid就变成false了 并且current变成NULL
    $status = $this->task->send($value);
    return Signal::TASK_CONTINUE;
}

private function checkTaskDone($value)
{
    $coroutine = $this->task->getCoroutine();
    if ($coroutine->valid()) {
        return null;
    }
    echo $this->task->getTaskId()."| CHECKDONE\n";

    return Signal::TASK_DONE;
}

}
SysCall类
与那文的思路相同,系统调用类一般作为yield后面跟着的值吐给外层的调用方来执行,并且可能返回响应的信号量,标识这个Task是继续运行还是进入等待状态中。

不同的是这里的__invoke入参不需要Scheduler。

class SysCall
{

protected $callback = null;

public function __construct(\Closure $callback)
{
    $this->callback = $callback;
}

public function __invoke(Task $task)
{
    return call_user_func($this->callback, $task);
}

}
组装起来!
基本的组件就是上面的几个类了,下面举一些实际的例子,说明如何利用这几个看似简陋的组件来搞大新闻。

延迟执行任务
function taskSleep($ms)
{

return new SysCall(function (Task $task) use ($ms) {
    swoole_timer_after($ms, function() use($task){
        $task->send("this is send value in sleep function.");
        $task->run();
    });
    return Signal::TASK_SLEEP;
});

}

function delay(){

yield taskSleep(2000);

}

function gen(){

echo "gen1\n";
yield 1;
echo "gen2\n";
yield 2;
echo "gen3\n";
yield 3;

}

//Task::execute(delay(), 1); 亦可
(new Task(delay(), 1))->run();
(new Task(gen(), 2))->run();

/** output
1| SYSCALL
gen1
2| YIELD VALUE
gen2
2| YIELD VALUE
gen3
2| YIELD VALUE
2| CHECKDONE
Task done 2
//2秒以后//
1| CHECKDONE
Task done 1
**/
taskSleep是个系统调用,告诉调度器我要睡眠了(传递给他一个Signal::TASK_SLEEP)。具体说明时候唤醒呢,要等swoole_timer_after2秒以后将它唤醒。

我们这里同时跑了两个任务,从输出来看第一个任务的延时执行,并不会阻塞第二个任务。可以清楚地看到,我们的协程是可以实现多任务并行处理的(当然实际上并不是并行)。

独立堆栈的子协程
function justReturnValue(){

yield (delay());
yield 'yield value 2';

}

function gen2(){

$ret1 = (yield "yield value 1");
echo "[ret] $ret1\n";
$ret2 = (yield justReturnValue());
echo "[ret] $ret2\n";

}

(new Task(gen2(), 1))->run();

/** output
1| YIELD VALUE
[ret] yield value 1
1| COROUTINE
1| COROUTINE
1| SYSCALL
// 2秒以后 //
1| TASKSTACK
1| YIELD VALUE
1| TASKSTACK
[ret] yield value 2
1| CHECKDONE
Task done 1
**/
gen2里有一个子协程justReturnValue的调用,而justReturnValue里也有delay的子协程调用。通过输出可以清楚的看到,父子协程进栈出栈的顺序,以及出栈的协程会将吐出来的值交给原先的协程。

实现一个非阻塞IO的Web服务
参照那文里的实现,我们也可以写一个自己的Web服务。首先还是来说明一下要做什么,以及思路。

直接引用那文的说法:

有一个任务是在套接字上侦听是否有新连接,当有新连接要建立的时候,它创建一个新任务来处理新连接。

Web服务器最难的部分通常是像读数据这样的套接字操作是阻塞的。例如PHP将等待到客户端完成发送为止。对一个Web服务器来说,这有点不太高效。因为服务器在一个时间点上只能处理一个连接。

解决方案是确保在真正对套接字读写之前该套接字已经“准备就绪”. 为了查找哪个套接字已经准备好读或者写了, 可以使用 流选择函数

传统的做法中,创建一个套接字,等待新连接,然后读取、发送、关闭。这些都是阻塞的,会花时间在这些抢占资源的步骤上。如果我们使用协程的方式,可以先将等待操作的任务yield掉,之后结合stream_select方法,选择出可以继续操作的任务将其resume。

通俗的说,可以理解为大家一起挤公交车,原先必须一个一个上,但是上车以后要找公交卡,要刷卡或者投币,操作完了以后下一个乘客才能继续。如果使用协程的话,第一个乘客A上车以后,挂起到一边找公交卡,不影响第二个乘客B上车。等到A掏出公交卡以后,直接插队刷卡上车。虽然还是一个一个排队上车,但是找卡的时间里其他乘客不会干等了。

socket的状态
首先定义2个全局数组保存所有用到的socket。再定义2个系统调用将socket设置进数组里,并且返回等待信号量让Task挂起。

$waitingForRead = [];
$waitingForWrite = [];

function waitForRead($socket) {

return new SysCall(
    function(Task $task) use ($socket) {
        global $waitingForRead;
        if (isset($waitingForRead[(int) $socket])) {
            $waitingForRead[(int) $socket][1][] = $task;
        } else {
            $waitingForRead[(int) $socket] = [$socket, [$task]];
        }
        //设置完了不让他往下走
        return Signal::TASK_WAIT;
    }
);

}

function waitForWrite($socket) {

return new SysCall(
    function(Task $task) use ($socket) {
        global $waitingForWrite;
        if (isset($waitingForWrite[(int) $socket])) {
            $waitingForWrite[(int) $socket][1][] = $task;
        } else {
            $waitingForWrite[(int) $socket] = [$socket, [$task]];
        }
        //设置完了不让他往下走
        return Signal::TASK_WAIT;
    }
);

}
选择可以操作的socket
注册一个任务,不断检查我们的全局数组,直到有socket就绪了,将其对应的任务唤醒。

function ioPoll($timeout) {

global $waitingForRead;
global $waitingForWrite;

$rSocks = [];
foreach ($waitingForRead as list($socket)) {
    $rSocks[] = $socket;
}

$wSocks = [];
foreach ($waitingForWrite as list($socket)) {
    $wSocks[] = $socket;
}

$eSocks = []; // dummy

//stream_select 方法会直接修改入参 只保留就绪的socket数组
if (false === stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
    return;
}

foreach ($rSocks as $socket) {
    list(, $tasks) = $waitingForRead[(int) $socket];
    unset($waitingForRead[(int) $socket]);

    foreach ($tasks as $task) {
        $task->send("ready for read");
        $task->run();
    }
}

foreach ($wSocks as $socket) {
    list(, $tasks) = $waitingForWrite[(int) $socket];
    unset($waitingForWrite[(int) $socket]);

    foreach ($tasks as $task) {
        $task->send("ready for write");
        $task->run();
    }
}

}

function ioPollTask() {

global $waitingForRead;
global $waitingForWrite;
while (true) {
    if(count($waitingForRead) <=1 && count($waitingForWrite) <=1){
        //如果等待检查的socket只有1个 则用阻塞的方式等待
        ioPoll(null);
    }else{
        //否则设为0超时
        ioPoll(0);
    }
    yield;
}

}
封装socket
将socket封装一下,定义了必须的4个方法。

class CoSocket {

protected $socket;

public function __construct($socket) {
    $this->socket = $socket;
}

public function accept() {
    //等待本socket就绪
    yield waitForRead($this->socket);
    //就绪以后会继续走到这里 返回给外层一个客户端连接socket
    yield stream_socket_accept($this->socket, 0);
}

public function read($size) {
    //等待本socket就绪
    yield waitForRead($this->socket);
    //就绪以后回把读取到的内容 返回给外层
    yield fread($this->socket, $size);
}

public function write($string) {
    //等待本socket就绪
    yield waitForWrite($this->socket);
    //就绪以后把响应写给客户端
    fwrite($this->socket, $string);
}

public function close() {
    @fclose($this->socket);
}

}
处理客户端新连接
服务端socket接受到新的连接以后,创建新的任务。下面是这个任务里实际运行的协程。

function handleClient(CoSocket $socket) {

$data = (yield $socket->read(8192));
$msg = "Received following request:\n\n$data";
$msgLength = strlen($msg);

//响应报文由状态行(HTTP版本、状态码)+HTTP首部字段(响应首部字段、通用首部字段、实体首部字段)组成。
//空行(CR+LF)分隔首部与报文主体。所以这里留个空行在打印$msg
$response = <<<RES

HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;

yield $socket->write($response);
yield $socket->close();

}
开启服务!
直接创建一个包含server协程的任务,和一个不断刷新stream_select的任务。之后的流程都交给ioPollTask来调度了。

//定义全局的taskId自增用
static $stTaskId = 1;

function server($port){

echo "Starting server at port $port...\n";
//这里抛出的异常会被scheduler和task抛来抛去 最后还是到这里catch一下
try{
    $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
    if (!$socket) throw new Exception($errStr, $errNo);
    //设置为读写非阻塞
    stream_set_blocking($socket, 0);
    $socket = new CoSocket($socket);
    while (true) {
        $clientSocket = (yield $socket->accept());
        $clientCoSocket = new CoSocket($clientSocket);
        //为新的链接创建Task
        Task::execute(handleClient($clientCoSocket));
    }
}catch (Exception $e){
    echo $e->getMessage();
}

}

//创建服务端socket的task 1
Task::execute(server(8000));
//不断刷新socket_select的task 2
Task::execute(ioPollTask());
运行效果
开启服务后,我们先直接用curl访问,观测一下得到的结果。

➜ ~ curl -d "a=123&b=456" http://localhost:8000
Received following request:

POST / HTTP/1.1
Host: localhost:8000
User-Agent: curl/7.51.0
Accept: /
Content-Length: 11
Content-Type: application/x-www-form-urlencoded

a=123&b=456
可以看到服务端吐出了我们发送给他的信息,包括HTTP请求行、首部字段和正文。如果我们在浏览器里访问的话,正文内容会丰富许多,会有Cookie,UA等等,如下:

Received following request:

GET / HTTP/1.1
Host: localhost:8000
Connection: keep-alive
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,/;q=0.8
DontTrackMeHere: gzip, deflate, sdch, br
Accept-Language: zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4
Cookie: Phpstorm-f86ac615=34137ba0-5113-4922-b809-b6fa20dbf937
不足的地方
zan framework里的协程调度,并没有采用任务队列的方式。可能是因为他只是针对单独的http或者tcp请求来设计的吧,一般是链式调用。由于这个原因,所以没法设置具体某个任务的执行顺序。当然实际效果跟那文里是相同的,说到底还是由ioPollTask来驱动。

小结
我们通过几个例子加深了对PHP中协程用法的理解。需要注意的是,在协程中(本文构造的这种结构)我们要避免使用死循环,除非循环里yield的结果可以将其挂起并出让控制权给其他协程。

比如上文的Web服务器中,因为有不能主动挂起的ioPollTask,所以不能实现在响应时延迟几秒的效果。因为即使使用了taskSleep这种系统调用,也会因为ioPollTask死循环导致不能获取控制权无法执行

0

Deprecated: strtolower(): Passing null to parameter #1 ($string) of type string is deprecated in /www/wwwroot/testblog.58heshihu.com/var/Widget/Archive.php on line 1032

评论 (0)

取消