首页
关于
Search
1
给你10个市场数据调研报告的免费下载网站!以后竞品数据就从这里找!
185 阅读
2
php接口优化 使用curl_multi_init批量请求
144 阅读
3
《从菜鸟到大师之路 ElasticSearch 篇》
107 阅读
4
2024年备考系统架构设计师
104 阅读
5
PHP 文件I/O
92 阅读
php
thinkphp
laravel
工具
开源
mysql
数据结构
总结
思维逻辑
令人感动的创富故事
读书笔记
前端
vue
js
css
书籍
开源之旅
架构
消息队列
docker
教程
代码片段
redis
服务器
nginx
linux
科普
java
c
ElasticSearch
测试
php进阶
php基础
登录
Search
标签搜索
php函数
php语法
性能优化
安全
错误和异常处理
问题
vue
Composer
Session
缓存
框架
Swoole
api
并发
异步
正则表达式
php-fpm
mysql 索引
开发规范
协程
dafenqi
累计撰写
786
篇文章
累计收到
34
条评论
首页
栏目
php
thinkphp
laravel
工具
开源
mysql
数据结构
总结
思维逻辑
令人感动的创富故事
读书笔记
前端
vue
js
css
书籍
开源之旅
架构
消息队列
docker
教程
代码片段
副业
redis
服务器
nginx
linux
科普
java
c
ElasticSearch
测试
php进阶
php基础
页面
关于
搜索到
560
篇与
的结果
2023-08-10
PHP mongodb 操作类
PHP mongodb 操作类<?php namespace Mongodb; class Mongo_db { private $config; private $hostname; private $port = 27017; private $database; private $username; private $password; private $debug = false; private $collection = ''; private $selects; private $wheres; private $updates; private $limit = 999999; private $offset = 0; private $sorts; private $manager; private $result; public function __construct($config) { $this->config = $config; $this->connect(); } /** * 预处理 */ private function prepareConfig() { if (isset($this->config['hostname'])) { $this->hostname = trim($this->config['hostname']); } if (isset($this->config['port'])) { $this->port = trim($this->config['port']); } if (!empty($this->config['username'])) { $this->username = trim($this->config['username']); } if (!empty($this->config['password'])) { $this->password = trim($this->config['password']); } if (isset($this->config['database'])) { $this->database = trim($this->config['database']); } if (isset($this->config['db_debug'])) { $this->debug = $this->config['db_debug']; } } /** * 链接 */ private function connect() { $this->prepareConfig(); try { if (strstr($this->hostname, ',')) { $dsn = "mongodb://{$this->hostname}/{$this->database}"; }else { $dsn = "mongodb://{$this->hostname}:{$this->port}/{$this->database}"; } $options = array( 'username' => $this->username, 'password' => $this->password ); $this->manager = new \MongoDB\Driver\Manager($dsn, $options); } catch (\Exception $e) { $this->showError($e); } } /** * 获取当前连接 * @return mixed */ public function getManager() { return $this->manager; } /** * @param mixed $collection */ public function getCollection(): string { return $this->collection; } /** * 获取查询所需胡字段 * @return array */ public function getSelects(): array { return $this->selects; } /** * 获取条件 * @return array */ public function getWheres(): array { return $this->wheres; } /** * 获取更新内容 * @return array */ public function getUpdates(): array { return $this->updates; } /** * 获取条数 * @return int */ public function getLimit(): int { return $this->limit; } /** * 获取偏移量 * @return int */ public function getOffset(): int { return $this->offset; } /** * 获取排序 * @return array */ public function getSorts(): array { return $this->sorts; } /** * @param mixed $collection */ public function setCollection(string $collection) { $this->collection = $collection; } /** * @param array $selects */ public function setSelects(array $selects) { $this->selects = $selects; } /** * @param array $wheres */ public function setWheres(array $wheres) { $this->wheres = $wheres; } /** * @param array $updates */ public function setUpdates(array $updates) { $this->updates = $updates; } /** * @param int $limit */ public function setLimit(int $limit) { $this->limit = $limit; } /** * @param int $offset */ public function setOffset(int $offset) { $this->offset = $offset; } /** * @param array $sorts */ public function setSorts(array $sorts) { $this->sorts = $sorts; } /** * @param $database * @return $this */ public function switch_db($database) { $this->database = $database; return $this; } /** * @param $table * @return $this */ public function collection($collection) { $this->collection = $collection; return $this; } /** * @param $collection * @return Mongo_db */ public function table($collection) { return $this->collection($collection); } /** * 增 * @param array $document * @param string $wstring * @param int $wtimeout * @return mixed */ public function insert( $document = array(), $wstring = \MongoDB\Driver\WriteConcern::MAJORITY, $wtimeout = 1000) { try { $wc = new \MongoDB\Driver\WriteConcern($wstring, $wtimeout); $bulk = new \MongoDB\Driver\BulkWrite(); $bulk->insert($document); $dbc = $this->database . '.' . $this->collection; $result = $this->manager->executeBulkWrite($dbc, $bulk, $wc); $this->result = $result; //增加几条 return $result->getInsertedCount(); } catch (\Exception $e) { $this->showError($e); } } /** * 批量添加 * @param array $documents * @param string $wstring * @param int $wtimeout * @return mixed */ public function batch_insert( $documents = array(), $wstring = \MongoDB\Driver\WriteConcern::MAJORITY, $wtimeout = 1000) { try { $wc = new \MongoDB\Driver\WriteConcern($wstring, $wtimeout); $bulk = new \MongoDB\Driver\BulkWrite(); foreach ($documents as $k => $document) { $bulk->insert($document); } $dbc = $this->database . '.' . $this->collection; $result = $this->manager->executeBulkWrite($dbc, $bulk, $wc); $this->result = $result; //增加几条 return $result->getInsertedCount(); } catch (\Exception $e) { $this->showError($e); } } /** * 删 * @param array $deleteOptions * @param string $wstring * @param int $wtimeout * @return mixed */ public function delete( $deleteOptions = ["limit" => 1], $wstring = \MongoDB\Driver\WriteConcern::MAJORITY, $wtimeout = 1000 ) { try { $wc = new \MongoDB\Driver\WriteConcern($wstring, $wtimeout); $bulk = new \MongoDB\Driver\BulkWrite(); $filter = $this->wheres; if (count($filter) < 1 && $deleteOptions['limit'] == 1) { throw new \Exception('filter is error!'); } $bulk->delete($filter, $deleteOptions); $dbc = $this->database . '.' . $this->collection; $result = $this->manager->executeBulkWrite($dbc, $bulk, $wc); $this->result = $result; //删除几条 return $result->getDeletedCount(); } catch (\Exception $e) { $this->showError($e); } } /** * 删除所有 * @param array $deleteOptions * @param string $wstring * @param int $wtimeout * @return mixed */ public function delete_all( $deleteOptions = ["limit" => 0], $wstring = \MongoDB\Driver\WriteConcern::MAJORITY, $wtimeout = 1000 ) { return $this->delete($deleteOptions, $wstring, $wtimeout); } /** * 更新 * @param array $updateOptions * @param string $wstring * @param int $wtimeout */ public function update( $updateOptions = ['multi' => false, 'upsert' => false], $wstring = \MongoDB\Driver\WriteConcern::MAJORITY, $wtimeout = 1000 ) { try { $wc = new \MongoDB\Driver\WriteConcern($wstring, $wtimeout); $bulk = new \MongoDB\Driver\BulkWrite(); $filter = $this->wheres; if (count($filter) < 1 && $updateOptions['multi'] == false) { throw new \Exception('filter is error!'); } $newObj = $this->updates; $bulk->update( $filter, $newObj, $updateOptions ); $dbc = $this->database . '.' . $this->collection; $result = $this->manager->executeBulkWrite($dbc, $bulk, $wc); $this->result = $result; return $result->getModifiedCount(); } catch (\Exception $e) { $this->showError($e); } } /** * 更新所有 * @param array $updateOptions * @param string $wstring * @param int $wtimeout */ public function update_all( $updateOptions = ['multi' => true, 'upsert' => false], $wstring = \MongoDB\Driver\WriteConcern::MAJORITY, $wtimeout = 1000 ) { return $this->update($updateOptions, $wstring, $wtimeout); } /** * 查询单条 * @param null $id * @return mixed|null */ public function find($id = null) { if ($id != null) { $this->where('_id', new \MongoDB\BSON\ObjectID($id)); } $filter = $this->wheres; $options = [ 'projection' => $this->selects, "sort" => $this->sorts, "skip" => 0, "limit" => 1, ]; $query = new \MongoDB\Driver\Query($filter, $options); $dbc = $this->database . '.' . $this->collection; $documents = $this->manager->executeQuery($dbc, $query); $this->result = $documents; $returns = null; foreach ($documents as $document) { $bson = \MongoDB\BSON\fromPHP($document); $returns = json_decode(\MongoDB\BSON\toJSON($bson), true); } return $returns; } /** * command * @param $db * @param $commands * @return mixed */ public function command($db, $commands) { try { $cursor = $this->manager->executeCommand($db, $commands); $this->result = $cursor; return $cursor; } catch (\Exception $e) { $this->showError($e); } } public function dropDatabase() { $cmd = array( 'dropDatabase' => 1, ); $db = $this->database; $commands = new \MongoDB\Driver\Command($cmd); $cursor = $this->command($db, $commands); $this->result = $cursor; $response = current($cursor->toArray()); return $response; } public function drop_collection() { $cmd = array( 'drop' => $this->collection, ); $db = $this->database; $commands = new \MongoDB\Driver\Command($cmd); $cursor = $this->command($db, $commands); $this->result = $cursor; $response = current($cursor->toArray()); return $response; } //unique public function add_index($key, $name = 'index') { $cmd = array( 'createIndexes' => $this->collection, 'indexes' => array( array( 'name' => $name, 'key' => $key, ) ) ); $db = $this->database; $commands = new \MongoDB\Driver\Command($cmd); $cursor = $this->command($db, $commands); $this->result = $cursor; $response = current($cursor->toArray()); return $response; } public function remove_index($index) { $cmd = array( 'dropIndexes' => $this->collection, 'index' => $index ); $db = $this->database; $commands = new \MongoDB\Driver\Command($cmd); $cursor = $this->command($db, $commands); $this->result = $cursor; $response = current($cursor->toArray()); return $response; } public function list_indexes() { $cmd = array( 'listIndexes' => $this->collection, ); $db = $this->database; $commands = new \MongoDB\Driver\Command($cmd); $cursor = $this->command($db, $commands); $this->result = $cursor; return $cursor; } public function aggregate($commands) { $db = $this->database; $commands = new \MongoDB\Driver\Command( [ 'aggregate' => $this->collection, 'pipeline' => [$commands] ] ); $cursor = $this->command($db, $commands); $this->result = $cursor; $response = current($cursor->toArray())->result; return $response; } /** * @param $key * @return mixed */ public function distinct($key) { $db = $this->database; $commands = new \MongoDB\Driver\Command( [ 'distinct' => $this->collection, 'key' => $key, 'query' => $this->wheres ] ); $cursor = $this->command($db, $commands); $this->result = $cursor; $response = current($cursor->toArray())->values; return $response; } /** * count * @return mixed */ public function count() { $db = $this->database; $commands = new \MongoDB\Driver\Command( [ "count" => $this->collection, "query" => $this->wheres ] ); $cursor = $this->command($db, $commands); $this->result = $cursor; $response = $cursor->toArray()[0]; return $response->n; } /** * 查 * @return mixed */ public function get() { try { $filter = (array)$this->wheres; $options = [ 'projection' => (array)$this->selects, "sort" => (array)$this->sorts, "skip" => (int)$this->offset, "limit" => (int)$this->limit, ]; $query = new \MongoDB\Driver\Query($filter, $options); $dbc = $this->database . '.' . $this->collection; $documents = $this->manager->executeQuery($dbc, $query); $this->result = $documents; $returns = array(); foreach ($documents as $document) { $bson = \MongoDB\BSON\fromPHP($document); $returns[] = json_decode(\MongoDB\BSON\toJSON($bson), true); } return $returns; } catch (\Exception $e) { $this->showError($e); } } /** * @param $fields * @param null $value * @return $this */ public function set($fields, $value = NULL) { if (is_string($fields)) { $this->updates['$set'][$fields] = $value; } elseif (is_array($fields)) { foreach ($fields as $field => $value) { $this->updates['$set'][$field] = $value; } } return $this; } /** * 要获取的字段 * @param $wheres * @param null $value * @return $this */ public function field($includes = array(), $excludes = array()) { if (!is_array($includes)) { $includes = array(); } if (!is_array($excludes)) { $excludes = array(); } if (!empty($includes)) { foreach ($includes as $col) { $this->selects[$col] = 1; } } if (!empty($excludes)) { foreach ($excludes as $col) { $this->selects[$col] = 0; } } return $this; } /** * 条件 * @param $wheres * @param null $value * @return $this */ public function where($wheres, $value = null) { if (is_array($wheres)) { foreach ($wheres as $wh => $val) { $this->wheres[$wh] = $val; } } else { $this->wheres[$wheres] = $value; } return $this; } public function where_in($field = "", $in = array()) { $this->wheres[$field]['$in'] = $in; return $this; } public function where_in_all($field = "", $in = array()) { $this->wheres[$field]['$all'] = $in; return $this; } public function where_or($wheres = array()) { foreach ($wheres as $wh => $val) { $this->wheres['$or'][] = array($wh => $val); } return $this; } public function where_not_in($field = "", $in = array()) { $this->wheres[$field]['$nin'] = $in; return $this; } public function where_gt($field = "", $x) { $this->wheres[$field]['$gt'] = $x; return $this; } public function where_gte($field = "", $x) { $this->wheres[$field]['$gte'] = $x; return $this; } public function where_lt($field = "", $x) { $this->wheres[$field]['$lt'] = $x; return $this; } public function where_lte($field = "", $x) { $this->wheres[$field]['$lte'] = $x; return $this; } public function where_between($field = "", $x, $y) { $this->wheres[$field]['$gte'] = $x; $this->wheres[$field]['$lte'] = $y; return $this; } public function where_between_ne($field = "", $x, $y) { $this->wheres[$field]['$gt'] = $x; $this->wheres[$field]['$lt'] = $y; return $this; } public function where_ne($field = '', $x) { $this->wheres[$field]['$ne'] = $x; return $this; } public function push($fields, $value = array()) { if (is_string($fields)) { $this->updates['$push'][$fields] = $value; } elseif (is_array($fields)) { foreach ($fields as $field => $value) { $this->updates['$push'][$field] = $value; } } return $this; } public function addtoset($field, $values) { if (is_string($values)) { $this->updates['$addToSet'][$field] = $values; } elseif (is_array($values)) { $this->updates['$addToSet'][$field] = array('$each' => $values); } return $this; } public function pop($field) { if (is_string($field)) { $this->updates['$pop'][$field] = -1; } elseif (is_array($field)) { foreach ($field as $pop_field) { $this->updates['$pop'][$pop_field] = -1; } } return $this; } public function pull($field = "", $value = array()) { $this->updates['$pull'] = array($field => $value); return $this; } public function rename_field($old, $new) { $this->updates['$rename'] = array($old => $new); return $this; } public function unset_field($fields) { if (is_string($fields)) { $this->updates['$unset'][$fields] = 1; } elseif (is_array($fields)) { foreach ($fields as $field) { $this->updates['$unset'][$field] = 1; } } return $this; } public function inc($fields = array(), $value = 0) { if (is_string($fields)) { $this->updates['$inc'][$fields] = $value; } elseif (is_array($fields)) { foreach ($fields as $field => $value) { $this->updates['$inc'][$field] = $value; } } return $this; } public function mul($fields = array(), $value = 0) { if (is_string($fields)) { $this->updates['$mul'][$fields] = $value; } elseif (is_array($fields)) { foreach ($fields as $field => $value) { $this->updates['$mul'][$field] = $value; } } return $this; } public function max($fields = array(), $value = 0) { if (is_string($fields)) { $this->updates['$max'][$fields] = $value; } elseif (is_array($fields)) { foreach ($fields as $field => $value) { $this->updates['$max'][$field] = $value; } } return $this; } public function min($fields = array(), $value = 0) { if (is_string($fields)) { $this->updates['$min'][$fields] = $value; } elseif (is_array($fields)) { foreach ($fields as $field => $value) { $this->updates['$min'][$field] = $value; } } return $this; } /** * 排序 * @param array $fields * @return $this */ public function order_by($fields = array()) { foreach ($fields as $col => $val) { if ($val == -1 || $val === FALSE || strtolower($val) == 'desc') { $this->sorts[$col] = -1; } else { $this->sorts[$col] = 1; } } return $this; } /** * 条数 * @param int $x * @return $this */ public function limit($x = 99999) { if ($x !== NULL && is_numeric($x) && $x >= 1) { $this->limit = (int)$x; } return $this; } /** * 偏移量 * @param int $x * @return $this */ public function offset($x = 0) { if ($x !== NULL && is_numeric($x) && $x >= 1) { $this->offset = (int)$x; } return $this; } /** * 生成mongo时间 * @param bool $stamp * @return \MongoDB\BSON\UTCDatetime */ public function date($stamp = false) { if ($stamp == false) { return new \MongoDB\BSON\UTCDatetime(time() * 1000); } else { return new \MongoDB\BSON\UTCDatetime($stamp); } } /** * 生成mongo时间戳 * @param bool $stamp * @return \MongoDB\BSON\Timestamp */ public function timestamp($stamp = false) { if ($stamp == false) { return new \MongoDB\BSON\Timestamp(0, time()); } else { return new \MongoDB\BSON\Timestamp(0, $stamp); } } /** * 生成mongo uuid * @param bool $stamp * @return */ public function uuid($uuid) { return new \MongoDB\BSON\Binary($uuid,\MongoDB\BSON\Binary::TYPE_UUID); } /** * 抛出异常 * @param $e */ public function showError($e) { exit($e->getMessage()); } }
2023年08月10日
12 阅读
0 评论
0 点赞
2023-08-10
PHP中的协程
PHP中的协程之前学习Lua的时候第一次接触到了协程(coroutine)的概念。而PHP5.5版本中也加入了协程的概念,从此PHP编程又有了新的思路和玩法。这里学习一下PHP中协程的相关概念的使用方法。分成上下两篇文章吧,这篇主要讲一下基础概念。协程是什么?在以前的Lua学习笔记三中可以看到,协程与多线程的比较,有自己的堆栈、局部变量、指令指针等,但是协程本身与其他协程共享全局变量。主要不同在于,多处理器下,多线程可以真实的同时运行多个线程。而协程任意时刻只能有一个在真实运行,并且只有在明确要求被挂起时才会挂起。PHP中协程如何理解?这里引用知乎赵老师的答案,说的比较好理解。具体来说,一个包含yeild的php函数,就是协程,他有阶段性的结算值 yield $var, 但是代码并不返回,php的调度者接到这个值后,喂给一个generator,generator是个实现了iterator接口的+和协程通讯接口(比如send方法)的实例,所以可以用在for循环里(另个接口负责和协程通讯)。那么generator收到了这个协程的阶段性的值后,他喂给for循环,等for循环下一次循环的时候,他又启动这个协程,协程从上次中断的点继续执行,继续计算,继续yeild值给generator,generator喂for循环,继续循环,直到协程执行完毕。相关函数final class Generator implements Iterator {public function rewind(); // 返回到迭代器的第一个元素。 public function valid(); // 返回false如果迭代器已经关闭,否则返回true public function current(); // 返回当前yield值. public function key(); // 返回当前yield键名. public function next(); // 恢复生成器的执行。 public function PS_UNRESERVE_PREFIX_throw(Exception $exception) {};//抛出异常 public function send($value); // 将传入的值作为yield表达式的结果并且恢复发生器的执行。}简单例子简单的迭代器给foreach使用function my_range($start, $end, $step = 1) {for ($i = $start; $i <= $end; $i += $step) { yield $i; }}foreach (my_range(1, 5) as $num) {echo $num;}//output 12345带send可以交互的例子function gen() {$ret = (yield 'a'); echo $ret; $ret = (yield 'b'); echo $ret;}$gen = gen();$ret = $gen->current();echo $ret;$ret = $gen->send("c");echo $ret;$ret = $gen->send("d");echo $ret;//output acbd带抛出异常的例子function gen() {try{ $ret = (yield 'a'); echo $ret; $ret = (yield 'b'); echo $ret; } catch (Exception $ex) { echo $ex->getMessage(); }}$gen = gen();$ret = $gen->current();echo $ret;$ret = $gen->send("c");echo $ret;$ret = $gen->throw(new Exception("d"));var_dump($ret);//output acbdNULL那么能用来干什么呢?我们来看看,协程可以自己主动出让执行权,把不需要抢占的操作时间(比如socket等待链接)让出来,并且可以和调用方通过yield的方式传递信息。显而易见,他可以用来做多任务调度!PHP中协程实现多任务调度,鸟哥有一篇翻译的文章里有讲解,网上能找到的大部分资料,都跟这篇相关。但是至少在我看来,理解起来还是蛮复杂的。这里针对那篇文章的前半部分做一个笔记,忽略后面关于独立堆栈协程的部分。function里使用yield关键字,将生成迭代器。这样调用functionName()时,其实得到的是一个迭代器对象,而并没有实际运行程序。为什么要走系统调用SystemCall这一层呢?模拟进程和系统会话的方式,控制权限。通过给yield表达式传递信息来与调度器通信,yield既是中断也是传递给调度器的方式。SystemCall 包含一个回调函数,他自己本身可以被执行。被执行时实际上是调用了这个回调函数,入参是某个task和调度器。SystemCall其实并没有其他作用,只是在协程函数里面跟在yield后面传给调度器来执行。注意SplQueue塞进去的对象其实是引用(PHP里对象入参都是引用,不只是SplQueue)!外面对象改了,里面也会变。为什么忽略协程堆栈?我打算在第二篇文章中,把有赞的zan framework里关于协程的部分抽出来,针对性的说一下包含子协程额多任务调度。当然主要想偷个懒。不过个人感觉zan框架里的协程部分,比之前说的那篇文章要好理解一些。上篇文章里提到PHP中协程的引入,可以使PHP编程有新的玩法,不在遵循原本顺序执行的思路,从而应对大访问量和并发操作。有赞的zan framework就是基于PHP协程的,提供最简单的方式开发面向C10K+的高并发HTTP服务或SOA服务。我并没有深入的学习这个框架,这里只打算把关于协程的部分抽出来学习一下。zan框架高并发设计思路粗看之下(不一定对哦),框架应该是用swoole_server + 协程解决高并发访问。比如Web服务中,swoole的http_server只开启了少数的几个worker进程。我们知道,如果worker的onRequest里使用的是异步方法,则worker的响应是异步处理的,反之则是阻塞的。zan框架在worker进程中大量使用了PHP协程,所以一个worker进程可以响应很多并发的请求(但是本质上正在执行的只有一个),这应该就是能过达到C10K+的原因吧。并且协程相对于回调的方式,在PHPer看来更容易接受吧。另外,框架设计里还使用了middleware、连接池、依赖注入等等比较现代的设计,感觉可以更深入的学习一下。一个类一个类来看我们把框架里关于协程的部分拆出来看,下面一个类一个类的分析。与鸟哥博客里那个文章的实现相比,有一些相同的地方,更多的是不同。比如那篇文章里,多个任务放到一个schedule里调度,对于后面的实现就比较繁琐。这里把框架里的代码抽出来,并进行了一定删减,去掉了与其他业务强相关的东西。比如Event、Context、Async等等。实际上Async用于处理MySQL查询的返回值的,框架内部将MySQL的具体操作类封装成了Async的子类,并且yield给调度器来用。Singal类Singal类里包含了系统调用所需的信号量。指明了协程在一轮运行之后应该处于的状态。class Signal{const TASK_SLEEP = 1; const TASK_AWAKE = 2; const TASK_CONTINUE = 3; const TASK_KILLED = 4; const TASK_RUNNING = 5; const TASK_WAIT = 6; const TASK_DONE = 7; public static function isSignal($signal) { if(!$signal) { return false; } if (!is_int($signal)) { return false; } if($signal < 1 ) { return false; } if($signal > 7) { return false; } return true; }}Task类Task包装了具体协程函数,并提供相应的get set方法。与网上流行的那篇文章(以下简称那文)不同的是,我们的scheduler是内置于Task里的,在run方法里实现具体的调度。这里我们省略了Context,并且让taskId自增。class Task{protected $taskId = 0; protected $parentId = 0; protected $coroutine = null; //这里忽略了context 保存的是当前http请求的相关信息,可以通过系统调用的方式操作 protected $context = null; protected $sendValue = null; protected $scheduler = null; protected $status = 0; public function __construct(Generator $coroutine, $taskId = 0, $parentId = 0) { $this->coroutine = $coroutine; if(isset($GLOBALS['stTaskId']) && $taskId == 0){ global $stTaskId; $taskId = $stTaskId ++; } $this->taskId = $taskId; $this->parentId = $parentId; $this->scheduler = new Scheduler($this); } /** * 静态方法调用 * @param $coroutine * @param int $taskId * @param int $parentId * @return Task */ public static function execute($coroutine, $taskId = 0, $parentId = 0) { if ($coroutine instanceof Generator) { if(isset($GLOBALS['stTaskId']) && $taskId == 0){ global $stTaskId; $taskId = $stTaskId ++; } $task = new Task($coroutine, $taskId, $parentId); $task->run(); return $task; } return $coroutine; } public function run() { while (true) { try { if ($this->status == Signal::TASK_KILLED){ $this->fireTaskDoneEvent(); break; } $this->status = $this->scheduler->schedule(); //以下几种状态表示信号量,实际上已经从while里跳出来了。如果需要继续的话,会在其他地方重启。 switch ($this->status) { case Signal::TASK_KILLED: case Signal::TASK_SLEEP: case Signal::TASK_WAIT: return null; case Signal::TASK_DONE: $this->fireTaskDoneEvent(); return null; } } catch (Exception $e) { $this->scheduler->throwException($e); } } } public function send($value) { $this->sendValue = $value; return $this->coroutine->send($value); } public function getTaskId() { return $this->taskId; } public function getContext() { return $this->context; } public function getSendValue() { return $this->sendValue; } public function getResult() { return $this->sendValue; } public function getStatus() { return $this->status; } public function setStatus($signal) { $this->status = $signal; } public function getCoroutine() { return $this->coroutine; } public function setCoroutine(Generator $coroutine) { $this->coroutine = $coroutine; } public function fireTaskDoneEvent() { echo "Task done $this->taskId\n"; }}Scheduler类scheduler类负责:获取Task里的协程函数跑完一轮的返回值根据返回值的类型采取不同的处理方式,如系统调用、子协程、普通yield值、检查协程栈等等。在子协程的调用过程中,负责父子协程的进栈出栈,yield值的传递等等。class Scheduler{private $task = null; private $stack = null; public function __construct(Task $task) { $this->task = $task; $this->stack = new SplStack(); } public function schedule() { $coroutine = $this->task->getCoroutine(); $value = $coroutine->current(); $signal = $this->handleSysCall($value); if ($signal !== null) return $signal; $signal = $this->handleCoroutine($value); if ($signal !== null) return $signal; $signal = $this->handleYieldValue($value); if ($signal !== null) return $signal; $signal = $this->handleTaskStack($value); if ($signal !== null) return $signal; $signal = $this->checkTaskDone($value); if ($signal !== null) return $signal; return Signal::TASK_DONE; } public function isStackEmpty() { return $this->stack->isEmpty(); } public function throwException($e, $isFirstCall = false) { if ($this->isStackEmpty()) { $this->task->getCoroutine()->throw($e); return; } try{ if ($isFirstCall) { $coroutine = $this->task->getCoroutine(); } else { $coroutine = $this->stack->pop(); } $this->task->setCoroutine($coroutine); $coroutine->throw($e); $this->task->run(); }catch (Exception $e){ $this->throwException($e); } } /** * 处理系统调用 * @param $value * @return mixed|null */ private function handleSysCall($value) { if (!($value instanceof SysCall) && !is_subclass_of($value, SysCall::class) ) { return null; } echo $this->task->getTaskId()."| SYSCALL\n"; //走系统调用 实际上因为__invoke 走的是 $value($this->task); $signal = call_user_func($value, $this->task); if (Signal::isSignal($signal)) { return $signal; } return null; } /** * 处理子协程 * @param $value * @return int|null */ private function handleCoroutine($value) { if (!($value instanceof Generator)) { return null; } echo $this->task->getTaskId()."| COROUTINE\n"; //获取当前的协程 入栈 $coroutine = $this->task->getCoroutine(); $this->stack->push($coroutine); //将新的协程设为当前的协程 $this->task->setCoroutine($value); return Signal::TASK_CONTINUE; } /** * 处理协程栈 * @param $value * @return int|null */ private function handleTaskStack($value) { //能够跑到这里说明当前协程已经跑完了 valid()==false了 需要看下栈里是否还有以前的协程 if ($this->isStackEmpty()) { return null; } echo $this->task->getTaskId()."| TASKSTACK\n"; //出栈 设置为当前运行的协程 $coroutine = $this->stack->pop(); $this->task->setCoroutine($coroutine); //这个sendvalue可能是从刚跑完的协程那里得到的 把它当做send值传给老协程 让他继续跑 $value = $this->task->getSendValue(); $this->task->send($value); return Signal::TASK_CONTINUE; } /** * 处理普通的yield值 * @param $value * @return int|null */ private function handleYieldValue($value) { $coroutine = $this->task->getCoroutine(); if (!$coroutine->valid()) { return null; }// if($this->task->getTaskId() == 2){//// }else{ echo $this->task->getTaskId()."| YIELD VALUE\n";// } //如果协程后面没有yield了 这里发出send以后valid就变成false了 并且current变成NULL $status = $this->task->send($value); return Signal::TASK_CONTINUE; } private function checkTaskDone($value) { $coroutine = $this->task->getCoroutine(); if ($coroutine->valid()) { return null; } echo $this->task->getTaskId()."| CHECKDONE\n"; return Signal::TASK_DONE; }}SysCall类与那文的思路相同,系统调用类一般作为yield后面跟着的值吐给外层的调用方来执行,并且可能返回响应的信号量,标识这个Task是继续运行还是进入等待状态中。不同的是这里的__invoke入参不需要Scheduler。class SysCall{protected $callback = null; public function __construct(\Closure $callback) { $this->callback = $callback; } public function __invoke(Task $task) { return call_user_func($this->callback, $task); }}组装起来!基本的组件就是上面的几个类了,下面举一些实际的例子,说明如何利用这几个看似简陋的组件来搞大新闻。延迟执行任务function taskSleep($ms){return new SysCall(function (Task $task) use ($ms) { swoole_timer_after($ms, function() use($task){ $task->send("this is send value in sleep function."); $task->run(); }); return Signal::TASK_SLEEP; });}function delay(){yield taskSleep(2000);}function gen(){echo "gen1\n"; yield 1; echo "gen2\n"; yield 2; echo "gen3\n"; yield 3;}//Task::execute(delay(), 1); 亦可(new Task(delay(), 1))->run();(new Task(gen(), 2))->run();/** output1| SYSCALLgen12| YIELD VALUEgen22| YIELD VALUEgen32| YIELD VALUE2| CHECKDONETask done 2//2秒以后//1| CHECKDONETask done 1**/taskSleep是个系统调用,告诉调度器我要睡眠了(传递给他一个Signal::TASK_SLEEP)。具体说明时候唤醒呢,要等swoole_timer_after2秒以后将它唤醒。我们这里同时跑了两个任务,从输出来看第一个任务的延时执行,并不会阻塞第二个任务。可以清楚地看到,我们的协程是可以实现多任务并行处理的(当然实际上并不是并行)。独立堆栈的子协程function justReturnValue(){yield (delay()); yield 'yield value 2';}function gen2(){$ret1 = (yield "yield value 1"); echo "[ret] $ret1\n"; $ret2 = (yield justReturnValue()); echo "[ret] $ret2\n";}(new Task(gen2(), 1))->run();/** output1| YIELD VALUE[ret] yield value 11| COROUTINE1| COROUTINE1| SYSCALL// 2秒以后 //1| TASKSTACK1| YIELD VALUE1| TASKSTACK[ret] yield value 21| CHECKDONETask done 1**/gen2里有一个子协程justReturnValue的调用,而justReturnValue里也有delay的子协程调用。通过输出可以清楚的看到,父子协程进栈出栈的顺序,以及出栈的协程会将吐出来的值交给原先的协程。实现一个非阻塞IO的Web服务参照那文里的实现,我们也可以写一个自己的Web服务。首先还是来说明一下要做什么,以及思路。直接引用那文的说法:有一个任务是在套接字上侦听是否有新连接,当有新连接要建立的时候,它创建一个新任务来处理新连接。Web服务器最难的部分通常是像读数据这样的套接字操作是阻塞的。例如PHP将等待到客户端完成发送为止。对一个Web服务器来说,这有点不太高效。因为服务器在一个时间点上只能处理一个连接。解决方案是确保在真正对套接字读写之前该套接字已经“准备就绪”. 为了查找哪个套接字已经准备好读或者写了, 可以使用 流选择函数传统的做法中,创建一个套接字,等待新连接,然后读取、发送、关闭。这些都是阻塞的,会花时间在这些抢占资源的步骤上。如果我们使用协程的方式,可以先将等待操作的任务yield掉,之后结合stream_select方法,选择出可以继续操作的任务将其resume。通俗的说,可以理解为大家一起挤公交车,原先必须一个一个上,但是上车以后要找公交卡,要刷卡或者投币,操作完了以后下一个乘客才能继续。如果使用协程的话,第一个乘客A上车以后,挂起到一边找公交卡,不影响第二个乘客B上车。等到A掏出公交卡以后,直接插队刷卡上车。虽然还是一个一个排队上车,但是找卡的时间里其他乘客不会干等了。socket的状态首先定义2个全局数组保存所有用到的socket。再定义2个系统调用将socket设置进数组里,并且返回等待信号量让Task挂起。$waitingForRead = [];$waitingForWrite = [];function waitForRead($socket) {return new SysCall( function(Task $task) use ($socket) { global $waitingForRead; if (isset($waitingForRead[(int) $socket])) { $waitingForRead[(int) $socket][1][] = $task; } else { $waitingForRead[(int) $socket] = [$socket, [$task]]; } //设置完了不让他往下走 return Signal::TASK_WAIT; } );}function waitForWrite($socket) {return new SysCall( function(Task $task) use ($socket) { global $waitingForWrite; if (isset($waitingForWrite[(int) $socket])) { $waitingForWrite[(int) $socket][1][] = $task; } else { $waitingForWrite[(int) $socket] = [$socket, [$task]]; } //设置完了不让他往下走 return Signal::TASK_WAIT; } );}选择可以操作的socket注册一个任务,不断检查我们的全局数组,直到有socket就绪了,将其对应的任务唤醒。function ioPoll($timeout) {global $waitingForRead; global $waitingForWrite; $rSocks = []; foreach ($waitingForRead as list($socket)) { $rSocks[] = $socket; } $wSocks = []; foreach ($waitingForWrite as list($socket)) { $wSocks[] = $socket; } $eSocks = []; // dummy //stream_select 方法会直接修改入参 只保留就绪的socket数组 if (false === stream_select($rSocks, $wSocks, $eSocks, $timeout)) { return; } foreach ($rSocks as $socket) { list(, $tasks) = $waitingForRead[(int) $socket]; unset($waitingForRead[(int) $socket]); foreach ($tasks as $task) { $task->send("ready for read"); $task->run(); } } foreach ($wSocks as $socket) { list(, $tasks) = $waitingForWrite[(int) $socket]; unset($waitingForWrite[(int) $socket]); foreach ($tasks as $task) { $task->send("ready for write"); $task->run(); } }}function ioPollTask() {global $waitingForRead; global $waitingForWrite; while (true) { if(count($waitingForRead) <=1 && count($waitingForWrite) <=1){ //如果等待检查的socket只有1个 则用阻塞的方式等待 ioPoll(null); }else{ //否则设为0超时 ioPoll(0); } yield; }}封装socket将socket封装一下,定义了必须的4个方法。class CoSocket {protected $socket; public function __construct($socket) { $this->socket = $socket; } public function accept() { //等待本socket就绪 yield waitForRead($this->socket); //就绪以后会继续走到这里 返回给外层一个客户端连接socket yield stream_socket_accept($this->socket, 0); } public function read($size) { //等待本socket就绪 yield waitForRead($this->socket); //就绪以后回把读取到的内容 返回给外层 yield fread($this->socket, $size); } public function write($string) { //等待本socket就绪 yield waitForWrite($this->socket); //就绪以后把响应写给客户端 fwrite($this->socket, $string); } public function close() { @fclose($this->socket); }}处理客户端新连接服务端socket接受到新的连接以后,创建新的任务。下面是这个任务里实际运行的协程。function handleClient(CoSocket $socket) {$data = (yield $socket->read(8192)); $msg = "Received following request:\n\n$data"; $msgLength = strlen($msg); //响应报文由状态行(HTTP版本、状态码)+HTTP首部字段(响应首部字段、通用首部字段、实体首部字段)组成。 //空行(CR+LF)分隔首部与报文主体。所以这里留个空行在打印$msg $response = <<<RESHTTP/1.1 200 OK\rContent-Type: text/plain\rContent-Length: $msgLength\rConnection: close\r\r$msgRES;yield $socket->write($response); yield $socket->close();}开启服务!直接创建一个包含server协程的任务,和一个不断刷新stream_select的任务。之后的流程都交给ioPollTask来调度了。//定义全局的taskId自增用static $stTaskId = 1;function server($port){echo "Starting server at port $port...\n"; //这里抛出的异常会被scheduler和task抛来抛去 最后还是到这里catch一下 try{ $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); if (!$socket) throw new Exception($errStr, $errNo); //设置为读写非阻塞 stream_set_blocking($socket, 0); $socket = new CoSocket($socket); while (true) { $clientSocket = (yield $socket->accept()); $clientCoSocket = new CoSocket($clientSocket); //为新的链接创建Task Task::execute(handleClient($clientCoSocket)); } }catch (Exception $e){ echo $e->getMessage(); }}//创建服务端socket的task 1Task::execute(server(8000));//不断刷新socket_select的task 2Task::execute(ioPollTask());运行效果开启服务后,我们先直接用curl访问,观测一下得到的结果。➜ ~ curl -d "a=123&b=456" http://localhost:8000Received following request:POST / HTTP/1.1Host: localhost:8000User-Agent: curl/7.51.0Accept: /Content-Length: 11Content-Type: application/x-www-form-urlencodeda=123&b=456可以看到服务端吐出了我们发送给他的信息,包括HTTP请求行、首部字段和正文。如果我们在浏览器里访问的话,正文内容会丰富许多,会有Cookie,UA等等,如下:Received following request:GET / HTTP/1.1Host: localhost:8000Connection: keep-aliveUpgrade-Insecure-Requests: 1User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,/;q=0.8DontTrackMeHere: gzip, deflate, sdch, brAccept-Language: zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4Cookie: Phpstorm-f86ac615=34137ba0-5113-4922-b809-b6fa20dbf937不足的地方zan framework里的协程调度,并没有采用任务队列的方式。可能是因为他只是针对单独的http或者tcp请求来设计的吧,一般是链式调用。由于这个原因,所以没法设置具体某个任务的执行顺序。当然实际效果跟那文里是相同的,说到底还是由ioPollTask来驱动。小结我们通过几个例子加深了对PHP中协程用法的理解。需要注意的是,在协程中(本文构造的这种结构)我们要避免使用死循环,除非循环里yield的结果可以将其挂起并出让控制权给其他协程。比如上文的Web服务器中,因为有不能主动挂起的ioPollTask,所以不能实现在响应时延迟几秒的效果。因为即使使用了taskSleep这种系统调用,也会因为ioPollTask死循环导致不能获取控制权无法执行
2023年08月10日
24 阅读
0 评论
0 点赞
2023-08-10
PHP 知识补全 —— 生成器 (generator)和协程的实现
PHP 知识补全 —— 生成器 (generator)和协程的实现作者:Chongyi 原文地址:https://www.insp.top/content/php-knowledge-completion-generator-and-the-realization-of-coroutine先说一些废话PHP 5.5 以来,新的诸多特性又一次令 PHP 焕发新的光彩,虽然在本文写的时候已是 PHP 7 alpha 2 发布后的一段时间,但此时国内依旧是 php 5.3 的天下。不过我认为新的特性迟早会因为旧的版本的逐渐消失而变得越发重要,尤其是 PHP 7 的正式版出来后,因此本文的目的就是为了在这之前,帮助一些 PHPer 了解一些他们从没有了解的东西。所以打算将以本篇作为博客中 PHP 知识补全 系列文章的开篇。其实在写本文之前,我对生成器以及基于此特性延伸出来的 php 的协程实现并没有比较直观的了解,主要是我个人水平并不是很高,属于典型的刚入了门的 PHPer。所以在看了前段时间鸟哥(laruence)博客中对协程的讲解(原文链接:《在PHP中使用协程实现多任务调度》)后,在我个人对本篇的理解上,针对那些比较难以理解的概念(包括我个人在理解这一概念的时候的难点),以一个更为通俗的方式去讲明白。当然由于本人也是刚刚去学习这一概念,所以有些不得当的地方在所难免,希望大神看见了请不吝赐教。一切从 Iterator 和 Generator 开始为便于新入门开发者理解,本文一半篇幅是讲述迭代器接口(Iterator)和 Generator 类的,对此已经理解的话,可以直接跳过。迭代和迭代器在理解本文大多数概念前,有必要知道迭代和迭代器。事实上,迭代大家都知道是什么,可是我不知道(真的,在此之前对这个概念没有系统了解)。迭代是指反复执行一个过程,每执行一次叫做一次迭代。实际上我们经常做这种事情,比如:<?php$mapping = ['red' => '#FF0000', 'green' => '#00FF00', 'blue' => '#0000FF'];foreach ($mapping as $key => $value) {printf("key: %d - value: %s\n", $key, $value);}我们可以看到通过 foreach 对数组遍历并迭代输出其内容。在这一环节中,我们需要关注的重点是数组。虽然我们迭代的过程是 foreach 语句中的代码块,但实际上数组 $mapping 在每一次迭代中发生了变化,意味着数组内部也存在着一次迭代。如果我们把数组看做一个对象,foreach 实际上在每一次迭代过程都会调用该对象的一个方法,让数组在自己内部进行一次变动(迭代),随后通过另一个方法取出当前数组对象的键和值。这样一个可通过外部遍历其内部数据的对象就是一个迭代器对象,其遵循的统一的访问接口就是迭代器接口(Iterator)。PHP 提供了一个统一的迭代器接口。关于迭代器 PHP 官方文档有更为详细的描述,建议去了解。interface Iterator extends Traversable{/** * 获取当前内部标量指向的元素的数据 */ public mixed current ( void ) /** * 获取当前标量 */ public scalar key ( void ) /** * 移动到下一个标量 */ public void next ( void ) /** * 重置标量 */ public void rewind ( void ) /** * 检查当前标量是否有效 */ public boolean valid ( void )}我们来给出一个实例,去实现一个简单的迭代器:class Xrange implements Iterator{protected $start; protected $limit; protected $step; protected $i; public function __construct($start, $limit, $step = 0) { $this->start = $start; $this->limit = $limit; $this->step = $step; } public function rewind() { $this->i = $this->start; } public function next() { $this->i += $this->step; } public function current() { return $this->i; } public function key() { return $this->i + 1; } public function valid() { return $this->i <= $this->limit; }}通过 foreach 遍历来看看这个迭代器的效果:foreach (new Xrange(0, 10, 2) as $key => $value) {printf("%d %d\n", $key, $value);}输出:1 03 25 47 69 811 10至此我们看到了一个迭代器的实现。一些人在了解这一特性会很激动的将其应用在实际项目中,但有些则疑惑这有什么卵用呢?迭代器只是将一个普通对象变成了一个可被遍历的对象,这在有些时候,如一个对象 StudentsContact,这个对象是用于处理学生联系方式的,通过 addStudent 方法注册学生,通过 getAllStudent 获取全部注册的学生联系方式数组。我们以往遍历是通过 StudentsContact::getAllStudent() 获取一个数组然后遍历该数组,但是现在有了迭代器,只要这个类继承这个接口,就可以直接遍历该对象获取学生数组,并且可以在获取之前在类的内部就对输出的数据做好处理工作。当然用处远不止这么点,但在这里就不过多纠结。有一个在此基础上更为强大的东西,生成器。生成器,Generator虽然迭代器仅需继承接口即可实现,但依旧很麻烦,我们毕竟需要定义一个类并实现该接口所有方法,这十分繁琐。在一些情景下我们需要更简洁的办法。生成器提供了一种更容易的方法来实现简单的对象迭代,相比较定义类实现 Iterator 接口的方式,性能开销和复杂性大大降低。PHP 官方文档这样说的:生成器允许你在 foreach 代码块中写代码来迭代一组数据而不需要在内存中创建一个数组, 那会使你的内存达到上限,或者会占据可观的处理时间。相反,你可以写一个生成器函数,就像一个普通的自定义函数一样, 和普通函数只返回一次不同的是, 生成器可以根据需要 yield 多次,以便生成需要迭代的值。一个简单的例子就是使用生成器来重新实现 range() 函数。 标准的 range() 函数需要在内存中生成一个数组包含每一个在它范围内的值,然后返回该数组, 结果就是会产生多个很大的数组。 比如,调用 range(0, 1000000) 将导致内存占用超过 100 MB。做为一种替代方法, 我们可以实现一个 xrange() 生成器, 只需要足够的内存来创建 Iterator 对象并在内部跟踪生成器的当前状态,这样只需要不到1K字节的内存。官方文档给了上文对应的例子,我们在此简化了一下:function xrange($start, $limit, $step = 1) {for ($i = $start; $i <= $limit; $i += $step) { yield $i + 1 => $i; // 关键字 yield 表明这是一个 generator }}// 我们可以这样调用foreach (xrange(0, 10, 2) as $key => $value) {printf("%d %d\n", $key, $value);}可能你已经发现了,这个例子的输出和我们前面在说迭代器的时候那个例子结果一样。实际上生成器生成的正是一个迭代器对象实例,该迭代器对象继承了 Iterator 接口,同时也包含了生成器对象自有的接口,具体可以参考 Generator 类的定义。当一个生成器被调用的时候,它返回一个可以被遍历的对象.当你遍历这个对象的时候(例如通过一个foreach循环),PHP 将会在每次需要值的时候调用生成器函数,并在产生一个值之后保存生成器的状态,这样它就可以在需要产生下一个值的时候恢复调用状态。一旦不再需要产生更多的值,生成器函数可以简单退出,而调用生成器的代码还可以继续执行,就像一个数组已经被遍历完了。我们需要注意的关键是 yield,这是生成器的关键。我们通过上面例子,可以看得出,yield 会将当前一个值传递给 foreach,换句话说,foreach 每一次迭代过程都会从 yield 处取一个值,直到整个遍历过程不再存在 yield 为止的时候,遍历结束。我们也可以发现,yield 和 return 都会返回值,但区别在于一个 return 是返回既定结果,一次返回完毕就不再返回新的结果,而 yield 是不断产出直到无法产出为止。实际上存在 yield 的函数返回值返回的是一个 Generator 对象(这个对象不能手动通过 new 实例化),该对象实现了 Iterator 接口。那么 Generator 自身有什么独特之处?继续看:yield字面上解释,yield 代表着让位、让行。正是这个让行使得通过 yield 实现协程变得可能。生成器函数的核心是 yield 关键字。它最简单的调用形式看起来像一个 return 申明,不同之处在于普通 return 会返回值并终止函数的执行,而 yield 会返回一个值给循环调用此生成器的代码并且只是暂停执行生成器函数。yield 和 return 的区别,前者是暂停当前过程的执行并返回值,而后者是中断当前过程并返回值。暂停当前过程,意味着将处理权转交由上一级继续进行,直至上一级再次调用被暂停的过程,该过程则会从上一次暂停的位置继续执行。这像是什么呢?如果读者在读本篇文章之前已经在鸟哥的文章中粗略看过,应该知道这很像是一个操作系统的进程调度管理,多个进程在一个 CPU 核心上执行,在系统调度下每一个进程执行一段指令就被暂停,切换到下一个进程,这样看起来就像是同时在执行多个任务。但仅仅是如此还远远不够,yield 更重要的特性是除了可以返回一个值以外,还能够接收一个值!function printer(){while (true) { printf("receive: %s\n", yield); }}$printer = printer();$printer->send('hello');$printer->send('world');上述例子输出内容为:receive: helloreceive: world参考 PHP 官方中文文档:生成器 对象 我们可以得知 Generator 对象除了实现 Iterator 接口中的必要方法以外,还有一个 send 方法,这个方法就是向 yield 语句处传递一个值,同时从 yied 语句处继续执行,直至再次遇到 yield 后控制权回到外部。我们通过之前也了解了一个问题,yield 可以在其位置中断并返回一个值,那么能不能同时进行 接收 和 返回 呢?当然,这可是实现协程的根本。我们对上述代码做出修改:<?phpfunction printer(){$i = 0; while (true) { printf("receive: %s\n", (yield ++$i)); }}$printer = printer();printf("%d\n", $printer->current());$printer->send('hello');printf("%d\n", $printer->current());$printer->send('world');printf("%d\n", $printer->current());输出内容如下:1receive: hello2receive: world3current 方法是迭代器( Iterator )接口必要的方法,foreach 语句每一次迭代都会通过其获取当前值,而后调用迭代器的 next 方法。我们为了使程序不会无限执行,手动调用 current 方法获取值。上述例子已经足以表示 yield 在那一个位置作为双向传输的 工具,已具备实现协程的条件。协程这一部分我不打算长篇大论,本文开头已经给出了鸟哥博客中更为完善的文章,本文的目的是出于补充对 Generator 的细节。我们要知道,对于单核处理器,多任务的执行原理是让每一个任务执行一段时间,然后中断、让另一个任务执行然后在中断后执行下一个,如此反复。由于其执行切换速度很快,让外部认为多个任务实际上是 “并行” 的。鸟哥那篇文章这么说道:多任务协作这个术语中的 “协作” 很好的说明了如何进行这种切换的:它要求当前正在运行的任务自动把控制传回给调度器,这样就可以运行其他任务了。这与 “抢占” 多任务相反, 抢占多任务是这样的:调度器可以中断运行了一段时间的任务, 不管它喜欢还是不喜欢。协作多任务在 Windows 的早期版本 (windows95) 和 Mac OS 中有使用, 不过它们后来都切换到使用抢先多任务了。理由相当明确:如果你依靠程序自动交出控制的话,那么一些恶意的程序将很容易占用整个CPU,不与其他任务共享。我们结合之前的例子,可以发现,yield 作为可以让一段任务自身中断,然后回到外部继续执行。利用这个特性可以实现多任务调度的功能,配合 yield 的双向通讯功能,以实现任务和调度器之间进行通信。这样的功能对于读写和操作 Stream 资源时尤为重要,我们可以极大的提高程序对于并发流资源的处理能力,比如实现 tcp server。以上在 《在PHP中使用协程实现多任务调度》 有更为详尽的例子。本文不再赘述。总结PHP 自 5.4 到如今愈发稳定的 PHP 7,可以看到许多的新特性令这门语言愈发强大和完善,逐渐从纯粹的 Web 语言变得有着更为广泛的适用面,作为一枚 PHPer 的确不应当止步不前,我们依然有很多的东西需要不断学习和加强。虽然 “PHP 是世界上最好的语言” 这句话只是个调侃,但不可否认 PHP 即使不是最好,但也在努力变好的事实,对吧?
2023年08月10日
11 阅读
0 评论
0 点赞
2023-08-10
在 PHP 中使用 Promise + co/yield 协程
在 PHP 中使用 Promise + co/yield 协程摘要: 我们知道 JavaScript 自从有了 Generator 之后,就有了各种基于 Generator 封装的协程。其中 hprose 中封装的 Promise 和协程库实现了跟 ES2016 的 async/await 一样的功能,并且更加灵活。我们还知道 PHP 自从 5.5 之后,也引入了 Generator,同样也有了各种基于它封装的 PHP 协程库,hprose 同样也为 PHP 提供的跟 JavaScript 版本类似的 Promise 和协程库。下面我们就来看一下它跟 swoole 结合的效果。为什么需要异步方式一个函数执行之后,在它后面顺序编写的代码中,如果能够直接使用它的返回结果或者它修改之后的引用参数,那么我们通常认为该函数是同步的。而如果一个函数的执行结果或者其修改的引用参数,需要通过设置回调函数或者回调事件的方式来获取,而在其后顺序编写的代码中无法直接获取的话,那么我们通常认为这样的函数是异步的。PHP 提供的大部分函数都是同步的。通常我们会有一个误解,那就是容易把同步和阻塞当成同一个概念,但实际上同步代码不一定都是阻塞的,只是同步代码对阻塞天然友好,当同步代码和阻塞结合时,代码通常是简单易懂的。阻塞带来的问题是当前线程(或进程)会陷入等待,一直等到阻塞结束,这样就会造成线程(或进程)资源的浪费。所以,通常认为阻塞是不够高效的。但是如果要编写非阻塞代码,使用同步方式会变得有些复杂,且不够灵活。同步方式的非阻塞代码通常会使用 select 模式,例如 curl_multi_select, stream_select, socket_select 等就是 PHP 中提供的一些典型的 select 模式的函数。我们说它复杂且不够灵活是有理由的,例如使用上面的 select 模式编写同步的非阻塞代码时,我们需要先构造一个并发任务的列表,之后手动构造循环来执行这些并发的任务,在循环开始之后,虽然这几个任务可以并发,但是这个循环相对于其后的代码总体上仍然是阻塞的,我们要想拿到这些并发任务的结果时,仍然需要等待。select 虽然可以同时等待多个任务中某一个或几个就位后,再执行后续操作,但仍然有一部分时间是被等待消耗掉的。而且如果是纯同步非阻塞的情况下,我们也很难在循环开始后,动态添加更多的任务到这个循环中去。所以,如果我们希望程序能够更加高效,更加灵活,就需要引入异步方式。传统的异步方式有什么问题一提到异步模式,大家脑子中的第一印象可能就是回调、回调、回调。是的,这是最简单最直接也是之前最常见的异步模式。只要在调用异步函数时设置一个或多个回调函数,函数就会在完成时自动调用回调函数。或者为一个对象设置一堆事件,之后调用该对象上的某个异步方法,虽然这个异步方法本身可能不再需要设置回调函数,但是设置的这堆事件实际上跟回调函数所起到的作用是一样的。如果你的程序逻辑够简单,简单的一两层回调也许并不会让你觉得异步方式的编程有什么麻烦。但如果你的程序逻辑一旦有些复杂,你可能就会被层层回调搞得疲惫不堪了。当然,实际上你的程序需要层层回调的原因,也许并不是你的程序逻辑真的复杂,而是你没有办法将回调函数中的参数结果传出来,所以,你就不得不将另一个回调函数传进去。我们来举一个简单的例子,假设我们有 1 个同步函数:function sum($a, $b) {return $a + $b;}然后我们按照下面的方式去调用它:$a = sum(1, 2);$b = sum($a, 3);$c = sum($b, 4);var_dump(array($a, $b, $c));虽然上面的代码很不精简,但我们要表达的意图很明确,而且代码看起来很清楚。那接下来我们把这个函数换成一个形式上的异步函数,例如:function async_sum($a, $b, $callback) {$callback($a + $b);}当然,它的执行并不是异步的,这里我们先不关心它的实现是不是真异步的。现在如果要做上面同样的操作,代码就要这样写了:async_sum(1, 2, function($a) {async_sum($a, 3, function($b) use ($a) { async_sum($b, 4, function($c) use ($a, $b) { var_dump(array($a, $b, $c)); }); });});代码的执行结果是一样的。但异步的代码看起来显然更难读一些,虽然这已经是很简单的例子了。好了,看到这里,有些读者可能会觉的我上面的这个例子很糟糕。因为明明有同步的函数可以使用,并且代码清晰可读,为啥非要写个形似异步的函数,把本来同步可以做的很好的事情用异步方式复杂化呢?而且那个异步调用的方式,最后不还是想要实现同步化的结果吗?如果你这么想的话,一点都没错。但我们这里想要解决的问题是,如果我们拿到的只有一个异步函数,这个函数没有同步实现,我们也不知道这个异步函数的内部定义是怎样的,我们也没办法将这个异步函数改为同步函数实现。那我们有没有办法将上面的程序改的更可读一些呢?当然是可以的,所以,现在 Promise 要登场了。为什么要引入 Promise通常我们对 Promise 的一个误解就是,它要解决的是层层回调的问题,比如上面的问题看上去就是一个典型的层层回调的问题。然而实际上,Promise 要解决的并不是回调不回调的问题,如果你使用过 Promise 的话,你会发现使用 Promise 你仍然少不了要使用回调。Promise 要解决的问题是,如何将回调方法的参数从回调方法中传递出来,让它可以像同步函数的返回结果一样,在回调函数以外的控制范围内,可以传递和复用。下面这几篇文章可能会对大家理解 Promise 有所帮助:深入理解 Promise 五部曲:1. 异步问题深入理解 Promise 五部曲:2. 控制权转换问题深入理解 Promise 五部曲:3. 可靠性问题深入理解 Promise 五部曲:4. 扩展问题深入理解 Promise 五部曲:5. LEGO我觉得这几篇文章讲的比较透彻,所以我就不重复文章中的内容了。下面我们来看上面的例子用 Promise 如何解。我们现在用最简单粗暴的方式来引入 Hprose 的库,直接复制源码而不是使用 composer。然后我们在代码中直接使用:<?phprequire_once("Hprose.php");use Hprose\Promise;这种方式来引入 Hprose 的 Promise 库,当然你也可以写成:<?phprequire_once("Hprose.php");use Hprose\Future;Future 库跟 Promise 库基本上是一样的,你可以认为 Future 是 Promise 的具体实现,Promise 只是 Future实现的一个包装。这个区别你可以从源码中直接看出来,这里就不多做解释了。接下来,我们要把前面的 async_sum 函数 Promise 化,Hprose 提供了这样一个函数:Promise\promisify(或者 Future\promisify),它的作用就是将一个使用回调方式的异步函数变成一个返回 Promise 对象的异步函数。这样说,也许有些不好理解,下面直接上代码:<?phprequire_once("Hprose.php");use Hprose\Promise;function async_sum($a, $b, $callback) {$callback($a + $b);}$sum = Promise\promisify('async_sum');$a = $sum(1, 2);$b = $a->then(function($a) use ($sum) {return $sum($a, 3);});$c = $b->then(function($b) use ($sum) { return $sum($b, 4);});Promise\all(array($a, $b, $c))->then(function($result) {var_dump($result);});好了,看到这里,如果你对 Promise 的理解还不够深入的话,你的第一反应可能是:这不是把程序变得更复杂了吗?原来的程序是 3 个回调,现在仍然是 3 个回调,还多了包装,都玩出花来了,有意思吗?确实,从上面的代码来看,代码并没有被简化,但是你会发现,现在回调函数中的参数已经通过 Promise 返回值的方式传递出来了,而且可以在原本的回调函数控制范围以外被传递和复用了。但是你可能会说然并卵,程序不是仍然很复杂吗?那我们就来进一步简化一下:<?phprequire_once("Hprose.php");use Hprose\Promise;function async_sum($a, $b, $callback) {$callback($a + $b);}$sum = Promise\wrap(Promise\promisify('async_sum'));$var_dump = Promise\wrap('var_dump');$a = $sum(1, 2);$b = $sum($a, 3);$c = $sum($b, 4);$var_dump(Promise\all(array($a, $b, $c)));现在,代码中再也看不到回调了。因为我们把函数包装成了可以接收 Promise 变量的函数。当然,其实现细节略微有些复杂,如果你感兴趣,可以去看一下源码,这里就不做源码剖析了。如果感兴趣的读者多得话,以后有时间再写源码剖析。当然,如果你只是想把异步调用同步化,除了 Promise\wrap 外,你还可以通过 co/yield 协程来实现。Hprose 中的 co/yield 协程还是上面的例子,如果你使用的是 PHP 5.5 或者更高版本,那么你可以这样来写代码了。<?phprequire_once("Hprose.php");use Hprose\Promise;function async_sum($a, $b, $callback) {$callback($a + $b);}Promise\co(function() {$sum = Promise\promisify('async_sum'); $a = (yield $sum(1, 2)); $b = (yield $sum($a, 3)); $c = (yield $sum($b, 4)); var_dump(array($a, $b, $c));});这代码比使用 Promise\wrap 的又要简单了。这里,代码中的变量 $a, $b, $c 不再是 Promise 变量,而是实实在在的整数变量。也就是说,yield 把一个 Promise 变量变成了一个普通变量。现在 Promise\co 中的代码已经被实实在在的同步化了。现在你可能有新的疑问了,异步不是为了高效吗?现在把原本的异步代码同步化了,那还会高效吗?当然,对这个例子上来说,效率肯定是没有提高,反而是严重降低的。甚至在这个例子中,最原始的那个形似异步的实现也不比同步实现更高效。因为在这个例子中,并没有涉及到并发和 IO 阻塞的情况。下面我们就放到真实场景下来看看 Promise 和 co/yield 协程是怎么用的。在 swoole 下使用 Promise 和 co/yield 协程我们知道在 PHP 中,如果要让程序延时可以使用 sleep 函数(或者 usleep, time_nanosleep 函数)来让程序阻塞一会儿,但是这个阻塞会让整个进程都阻塞,所以在阻塞期间,什么都不能干。下面我们来看看使用 swoole_timer_after 实现的延时执行:<?phprequire_once("Hprose.php");use Hprose\Future;date_default_timezone_set('UTC');function wait($time) {$wait = Future\promisify('swoole_timer_after'); for ($i = 0; $i < 5; $i++) { yield $wait($time); var_dump("wait ". ($time / 1000) . "s, now is " . date("H<img align="absmiddle" alt="i" class="emoji" src="https://static.hacpai.com/emoji/graphics/i.png" title="i"></img>s")); }}Future\co(wait(2000));Future\co(wait(1000));该程序执行结果如下:string(24) "wait 1s, now is 13:48:25"string(24) "wait 2s, now is 13:48:26"string(24) "wait 1s, now is 13:48:26"string(24) "wait 1s, now is 13:48:27"string(24) "wait 2s, now is 13:48:28"string(24) "wait 1s, now is 13:48:28"string(24) "wait 1s, now is 13:48:29"string(24) "wait 2s, now is 13:48:30"string(24) "wait 2s, now is 13:48:32"string(24) "wait 2s, now is 13:48:34"从结果中我们可以看出,wait(2000) 和 wait(1000) 各自都是顺序阻塞执行的,但是它们之间却是并发执行的。也就是说,协程之间并不会相互阻塞,虽然这几个并发的协程是在同一个进程内跑的。最后我们再来看一个用 co/yield 协程实现的并发抓图程序:<?phprequire_once("Hprose.php");use Hprose\Promise;function fetch($url) {$dns_lookup = Promise\promisify('swoole_async_dns_lookup'); $writefile = Promise\promisify('swoole_async_writefile'); $url = parse_url($url); list($host, $ip) = (yield $dns_lookup($url['host'])); $cli = new swoole_http_client($ip, isset($url['port']) ? $url['port'] : 80); $cli->setHeaders([ 'Host' => $host, "User-Agent" => 'Chrome/49.0.2587.3', ]); $get = Promise\promisify([$cli, 'get']); yield $get($url['path']); list($filename) = (yield $writefile(basename($url['path']), $cli->body)); echo "write $filename ok.\r\n"; $cli->close();}$urls = array('http://b.hiphotos.baidu.com/baike/c0%3Dbaike116%2C5%2C5%2C116%2C38/sign=5f4519ba037b020818c437b303b099b6/472309f790529822434d08dcdeca7bcb0a46d4b6.jpg', 'http://f.hiphotos.baidu.com/baike/c0%3Dbaike116%2C5%2C5%2C116%2C38/sign=1c37718b3cc79f3d9becec62dbc8a674/38dbb6fd5266d016dc2eaa5c902bd40735fa358a.jpg', 'http://h.hiphotos.baidu.com/baike/c0%3Dbaike116%2C5%2C5%2C116%2C38/sign=edd05c9c502c11dfcadcb771024e09b5/d6ca7bcb0a46f21f3100c52cf1246b600c33ae9d.jpg', 'http://a.hiphotos.baidu.com/baike/c0%3Dbaike92%2C5%2C5%2C92%2C30/sign=4693756e8094a4c21e2eef796f9d70b0/54fbb2fb43166d22df5181f5412309f79052d2a9.jpg', 'http://a.hiphotos.baidu.com/baike/c0%3Dbaike92%2C5%2C5%2C92%2C30/sign=9388507144a98226accc2375ebebd264/faf2b2119313b07eb2cc820c0bd7912397dd8c45.jpg',);foreach ($urls as $url) {Promise\co(fetch($url));}在这个程序中,fetch 函数内的代码是同步执行的,但是多个 fetch 之间却是并发执行的,从结果输出就可以看出来,输出顺序是不一定的。但最后,你总能得到所有的美图。总结:通过 swoole 跟 hprose 中的 Promise 和 co/yield 协程相结合,你可以方便的使用同步的方式来调用 swoole 中的异步函数和方法,并可以实现协程间的并发。因为篇幅所限,这里无法把 hprose 中 Promise 和 co/yield 协程的全部内容都介绍完,如果你想了解更多,可以参考下面两篇内容:Promise 异步编程co/yield 协程
2023年08月10日
12 阅读
0 评论
0 点赞
2023-08-10
php实现协程,真正的异步
php实现协程,真正的异步github上php的协程大部分是根据这篇文章实现的:http://nikic.github.io/2012/12/22/Cooperative-multitasking-using-coroutines-in-PHP.html。它们最终的结果都是把回调变成了优雅的顺序执行的代码,但还是阻塞的,不是真正的异步。比如最热门的:https://github.com/recoilphp/recoil先安装:composer require recoil/recoil:执行:<?php //recoil.php include __DIR__ . '/vendor/autoload.php'; use Recoil\React\ReactKernel; $i = 100000; ReactKernel::start(task1()); ReactKernel::start(task2()); function task1(){ global $i; echo "wait start" . PHP_EOL; while ($i-- > 0) { yield; } echo "wait end" . PHP_EOL; }; function task2(){ echo "Hello " . PHP_EOL; yield; echo "world!" . PHP_EOL; }结果:wait start//等待若干秒wait endHelloworld!我本来是想让两个任务并行,结果两个任务变成了串行,中间等待的时间什么事情都干不了。React响应式的编程是严格禁止这种等待的,所以我就参照unity3d的协程自己写了个php版本的。上代码:<?php //Coroutine.php //依赖swoole实现的定时器,也可以用其它方法实现定时器 class Coroutine { //可以根据需要更改定时器间隔,单位ms const TICK_INTERVAL = 1; private $routineList; private $tickId = -1; public function __construct() { $this->routineList = []; } public function start(Generator $routine) { $task = new Task($routine); $this->routineList[] = $task; $this->startTick(); } public function stop(Generator $routine) { foreach ($this->routineList as $k => $task) { if($task->getRoutine() == $routine){ unset($this->routineList[$k]); } } } private function startTick() { swoole_timer_tick(self::TICK_INTERVAL, function($timerId){ $this->tickId = $timerId; $this->run(); }); } private function stopTick() { if($this->tickId >= 0) { swoole_timer_clear($this->tickId); } } private function run() { if(empty($this->routineList)){ $this->stopTick(); return; } foreach ($this->routineList as $k => $task) { $task->run(); if($task->isFinished()){ unset($this->routineList[$k]); } } } } class Task { protected $stack; protected $routine; public function __construct(Generator $routine) { $this->routine = $routine; $this->stack = new SplStack(); } /** * [run 协程调度] * @return [type] [description] */ public function run() { $routine = &$this->routine; try { if(!$routine){ return; } $value = $routine->current(); //嵌套的协程 if ($value instanceof Generator) { $this->stack->push($routine); $routine = $value; return; } //嵌套的协程返回 if(!$routine->valid() && !$this->stack->isEmpty()) { $routine = $this->stack->pop(); } $routine->next(); } catch (Exception $e) { if ($this->stack->isEmpty()) { /* throw the exception */ return; } } } /** * [isFinished 判断该task是否完成] * @return boolean [description] */ public function isFinished() { return $this->stack->isEmpty() && !$this->routine->valid(); } public function getRoutine() { return $this->routine; } }测试代码:<?php //test.php require 'Coroutine.php'; $i = 10000; $c = new Coroutine(); $c->start(task1()); $c->start(task2()); function task1(){ global $i; echo "wait start" . PHP_EOL; while ($i-- > 0) { yield; } echo "wait end" . PHP_EOL; }; function task2(){ echo "Hello " . PHP_EOL; yield; echo "world!" . PHP_EOL; }结果:wait startHelloworld!//等待几秒,但不阻塞wait end注:此文章需要验证。
2023年08月10日
10 阅读
0 评论
0 点赞
1
...
60
61
62
...
112