首页
关于
Search
1
给你10个市场数据调研报告的免费下载网站!以后竞品数据就从这里找!
136 阅读
2
php接口优化 使用curl_multi_init批量请求
130 阅读
3
2024年备考系统架构设计师
102 阅读
4
《从菜鸟到大师之路 ElasticSearch 篇》
101 阅读
5
PHP 文件I/O
89 阅读
php
thinkphp
laravel
工具
开源
mysql
数据结构
总结
思维逻辑
令人感动的创富故事
读书笔记
前端
vue
js
css
书籍
开源之旅
架构
消息队列
docker
教程
代码片段
redis
服务器
nginx
linux
科普
java
c
ElasticSearch
测试
php进阶
php基础
登录
Search
标签搜索
php函数
php语法
性能优化
安全
错误和异常处理
问题
vue
Composer
Session
缓存
框架
Swoole
api
并发
异步
正则表达式
php-fpm
mysql 索引
开发规范
协程
dafenqi
累计撰写
785
篇文章
累计收到
5
条评论
首页
栏目
php
thinkphp
laravel
工具
开源
mysql
数据结构
总结
思维逻辑
令人感动的创富故事
读书笔记
前端
vue
js
css
书籍
开源之旅
架构
消息队列
docker
教程
代码片段
副业
redis
服务器
nginx
linux
科普
java
c
ElasticSearch
测试
php进阶
php基础
页面
关于
搜索到
785
篇与
的结果
2023-08-07
使用Apache Zookeeper分布式部署PHP应用程序
使用Apache Zookeeper分布式部署PHP应用程序原文:Distributed application in PHP with Apache Zookeeper地址: http://systemsarchitect.net/distributed-application-in-php-with-apache-zookeeper/ 这篇文章实在不错,实在忍不住翻译下来,希望对大家有用。Apache Zookeeper是我最近遇到的最酷的技术,我是在研究Solr Cloud功能的时候发现的。Solr的分布式计算让我印象深刻。你只要开启一个新的实例就能自动在Solr Cloud中找到。它会将自己分派到某个分片中,并确定出自己是一个Leader(源)还是一个副本。不一会儿,你就可以在你的那些服务器上查询到了。即便某些服务器宕机了也可以继续工作。非常动态、聪明、酷。将运行多个应用程序作为一个逻辑程序并不是什么新玩意。事实上,我在几年前就已写过类似的软件。这种架构比较让人迷惑,使用起来也费劲。为此Apache Zookeeper提供了一套工具用于管理这种软件。为什么叫Zoo?“因为要协调的分布式系统是一个动物园”。在本篇文章中,我将说明如何使用PHP安装和集成Apache ZooKeeper。我们将通过service来协调各个独立的PHP脚本,并让它们同意某个成为Leader(所以称作Leader选举)。当Leader退出(或崩溃)时,worker可检测到并再选出新的leader。ZooKeeper是一个中性化的Service,用于管理配置信息、命名、提供分布式同步,还能组合Service。所有这些种类的Service都会在分布式应用程序中使用到。每次编写这些Service都会涉及大量的修bug和竞争情况。正因为这种编写这些Service有一定难度,所以通常都会忽视它们,这就使得在应用程序有变化时变得难以管理应用程序。即使处理得当,实现这些服务的不同方法也会使得部署应用程序变得难以管理。虽然ZooKeeper是一个Java应用程序,但C也可以使用。这里就有个PHP的扩展,由Andrei Zmievski在2009创建并维护。你可以从PECL中下载,或从GitHub中直接获取PHP-ZooKeeper。要使用该扩展你首先要安装ZooKeeper。可以从官方网站下载。?12345$ tar zxfv zookeeper-3.4.5.tar.gz$ cd zookeeper-3.4.5/src/c$ ./configure --prefix=/usr/$ make$ sudo make install这样就会安装ZooKeeper的库和头文件。现在准备编译PHP扩展。?1234567$ cd$ git clone https://github.com/andreiz/php-zookeeper.git$ cd php-zookeeper$ phpize$ ./configure$ make$ sudo make install将“zookeeper.so”添加到PHP配置中。?1$ vim /etc/php5/cli/conf.d/20-zookeeper.ini因为我不需要运行在web服务环境下,所以这里我只编辑了CLI的配置。将下面的行复制到ini文件中。?1extension=zookeeper.so使用如下命令来确定扩展是否已起作用。?12$ php -m | grep zookeeperzookeeper现在是时候运行ZooKeeper了。目前唯一还没有做的是配置。创建一个用于存放所有service数据的目录。?12345$ mkdir /home/you-account/zoo$ cd$ cd zookeeper-3.4.5/$ cp conf/zoo_sample.cfg conf/zoo.cfg$ vim conf/zoo.cfg找到名为“dataDir”的属性,将其指向“/home/you-account/zoo”目录。?123456$ bin/zkServer.sh start$ bin/zkCli.sh -server 127.0.0.1:2181[zk: 127.0.0.1:2181(CONNECTED) 14] create /test 1Created /test[zk: 127.0.0.1:2181(CONNECTED) 19] ls /[test, zookeeper]此时,你已成功连到了ZooKeeper,并创建了一个名为“/test”的znode(稍后我们会用到)。ZooKeeper以树形结构保存数据。这很类似于文件系统,但“文件夹”(译者注:这里指非最底层的节点)又和文件很像。znode是ZooKeeper保存的实体。Node(节点)的说法很容易被混淆,所以为了避免混淆这里使用了znode。因为我们稍后还会使用,所以这里我们让客户端保持连接状态。开启一个新窗口,并创建一个zookeeperdemo1.php文件。?0102030405060708091011121314151617181920<?phpclass ZookeeperDemo extends Zookeeper { public function watcher( $i, $type, $key ) {echo "Insider Watcher\n"; // Watcher gets consumed so we need to set a new one $this->get( '/test', array($this, 'watcher' ) );}}$zoo = new ZookeeperDemo('127.0.0.1:2181');$zoo->get( '/test', array($zoo, 'watcher' ) );while( true ) { echo '.'; sleep(2);}现在运行该脚本。?1$ php zookeeperdemo1.php此处应该会每隔2秒产生一个点。现在切换到ZooKeeper客户端,并更新“/test”值。?1[zk: 127.0.0.1:2181(CONNECTED) 20] set /test foo这样就会静默触发PHP脚本中的“Insider Watcher”消息。怎么会这样的?ZooKeeper提供了可以绑定在znode的监视器。如果监视器发现znode发生变化,该service会立即通知所有相关的客户端。这就是PHP脚本如何知道变化的。Zookeeper::get方法的第二个参数是回调函数。当触发事件时,监视器会被消费掉,所以我们需要在回调函数中再次设置监视器。现在你可以准备创建分布式应用程序了。其中的挑战是让这些独立的程序决定哪个(是leader)协调它们的工作,以及哪些(是worker)需要执行。这个处理过程叫做leader选举,在ZooKeeper Recipes and Solutions你能看到相关的实现方法。这里简单来说就是,每个处理(或服务器)紧盯着相邻的那个处理(或服务器)。如果一个已被监视的处理(也即Leader)退出或者崩溃了,监视程序就会查找其相邻(此时最老)的那个处理作为Leader。在真实的应用程序中,leader会给worker分配任务、监控进程和保存结果。这里为了简化,我跳过了这些部分。创建一个新的PHP文件,命名为worker.php。?001002003004005006007008009010011012013014015016017018019020021022023024025026027028029030031032033034035036037038039040041042043044045046047048049050051052053054055056057058059060061062063064065066067068069070071072073074075076077078079080081082083084085086087088089090091092093094095096097098099100101102103104105106107108109110111<?phpclass Worker extends Zookeeper { const CONTAINER = '/cluster'; protected $acl = array( array( 'perms' => Zookeeper::PERM_ALL, 'scheme' => 'world', 'id' => 'anyone' ) );private $isLeader = false; private $znode; public function __construct( $host = '', $watcher_cb = null, $recv_timeout = 10000 ) {parent::__construct( $host, $watcher_cb, $recv_timeout );} public function register() {if( ! $this->exists( self::CONTAINER ) ) { $this->create( self::CONTAINER, null, $this->acl ); } $this->znode = $this->create( self::CONTAINER . '/w-', null, $this->acl, Zookeeper::<span class="KSFIND_CLASS" id="0KSFindDIV">EPHEMER</span>AL | Zookeeper::SEQUENCE ); $this->znode = str_replace( self::CONTAINER .'/', '', $this->znode ); printf( "I'm registred as: %s\n", $this->znode ); $watching = $this->watchPrevious(); if( $watching == $this->znode ) { printf( "Nobody here, I'm the leader\n" ); $this->setLeader( true ); } else { printf( "I'm watching %s\n", $watching ); }} public function watchPrevious() {$workers = $this->getChildren( self::CONTAINER ); sort( $workers ); $size = sizeof( $workers ); for( $i = 0 ; $i < $size ; $i++ ) { if( $this->znode == $workers[ $i ] ) { if( $i > 0 ) { $this->get( self::CONTAINER . '/' . $workers[ $i - 1 ], array( $this, 'watchNode' ) ); return $workers[ $i - 1 ]; } return $workers[ $i ]; } } throw new Exception( sprintf( "Something went very wrong! I can't find myself: %s/%s", self::CONTAINER, $this->znode ) );} public function watchNode( $i, $type, $name ) {$watching = $this->watchPrevious(); if( $watching == $this->znode ) { printf( "I'm the new leader!\n" ); $this->setLeader( true ); } else { printf( "Now I'm watching %s\n", $watching ); }} public function isLeader() {return $this->isLeader;} public function setLeader($flag) {$this->isLeader = $flag;} public function run() {$this->register(); while( true ) { if( $this->isLeader() ) { $this->doLeaderJob(); } else { $this->doWorkerJob(); } sleep( 2 ); }} public function doLeaderJob() {echo "Leading\n";} public function doWorkerJob() {echo "Working\n";}}$worker = new Worker( '127.0.0.1:2181' );$worker->run();打开至少3个终端,在每个终端中运行以下脚本?0102030405060708091011121314151617181920term1$ php worker.phpI'm registred as: w-0000000001Nobody here, I'm the leaderLeadingterm2$ php worker.phpI'm registred as: w-0000000002I'm watching w-0000000001Workingterm3$ php worker.phpI'm registred as: w-0000000003I'm watching w-0000000002Working现在模拟Leader崩溃的情形。使用Ctrl+c或其他方法退出第一个脚本。刚开始不会有任何变化,worker可以继续工作。后来,ZooKeeper会发现超时,并选举出新的leader。虽然这些脚本很容易理解,但是还是有必要对已使用的Zookeeper标志作注释。?1234$this->znode = $this->create( self::CONTAINER . '/w-', null, $this->acl, Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE );每个znode都是EPHEMERAL和SEQUENCE的。EPHEMRAL代表当客户端失去连接时移除该znode。这就是为何PHP脚本会知道超时。SEQUENCE代表在每个znode名称后添加顺序标识。我们通过这些唯一标识来标记worker。在PHP部分还有些问题要注意。该扩展目前还是beta版,如果使用不当很容易发生segmentation fault。比如,不能传入普通函数作为回调函数,传入的必须为方法。我希望更多PHP社区的同仁可以看到Apache ZooKeeper的好,同时该扩展也会获得更多的支持。ZooKeeper是一个强大的软件,拥有简洁和简单的API。由于文档和示例都做的很好,任何人都可以很容易的编写分布式软件。让我们开始吧,这会很有趣的。相关连接:Yahoo research on ZooKeeper. 非常好的阅读材料,拥有真实的应用程序示例。如果你只要阅读一份ZooKeeper的资料,那么就是这份了。PHP ZooKeeperPHP ZooKeeper APIPHP ZooKeeper example
2023年08月07日
23 阅读
0 评论
0 点赞
2023-08-07
基于linux和php的稳定的分布式数据采集架构
基于linux和php的稳定的分布式数据采集架构数据采集对于一些网站至关重要,在开发这种采集程序时会遇到如下一些问题:一、单进程采集慢,采集任务的间隔时间无法控制。二、数据下载部分和分析部分不分离导致可移植性不强,调试困难。三、采集程序受机器性能瓶颈和网速瓶颈限制。四、遭受数据源的封锁。等等。。。。这就要求采集程序必须足够智能化,有如下几点要求:一、可以多机器分布运行,以适应大量数据的采集。二、能够多并发采集,使采集任务的运行周期可控。三、下载程序和分析程序分离,不仅是程序上的分离,也需要机器上的分离。四、能够很容易的加入新的采集任务,很容易的调试。五、对采集页面内容的分析能够模糊匹配。六、下载时能够调用代理。七、长期自动维护一个有效的代理列表经过几次大的改动,我现在设计的基于linux和php的采集程序架构如下:Snatch(主目录)|-Lib(类库、函数、配置的目录)| |-Config.inc.php(主程序变量配置)| |-OtherConfig.inc.php(其他配置文件若干)| |-Functions.inc.php(函数文件若干)| |-Classes.inc.php(类库文件若干)| |-ClassLocalDB.inc.php(连接本地数据库的操作类)| |-ClassRemoteDB.inc.php(连接远程数据库的操作类)| |-ClassLog.inc.php(写下载分析的日志)|-Paser(分析器程序目录)| |-WebSite1(针对WebSite1的分析程序目录)| | |-WebSite1Paser1.php(针对WebSite1的分析程序1)| | |-WebSite1Paser2.php(针对WebSite1的分析程序2)| |-WebSite2(针对WebSite2的分析程序目录)| |-ProxyWebSite1(分析代理服务器列表的站点1,取得代理服务器地址并入库)| |-ProxyWebSite2(分析代理服务器列表的站点2,取得代理服务器地址并入库)| |-... ...|-Log(日志目录)| |-WebSite1.log(WebSite1的下载及数据分析日志)| |-WebSite2.log(WebSite2的下载及数据分析日志)| |-... ...|-Files(下载后的文件保存目录)|-Main.php(主入口程序,分配下载任务)|-Assign.php(取得下载任务,分配给Down.php执行)|-Down.php(进行下载并将下载保存的文件调出来分析)|-DelOvertimeData.php(清除很老的下载文件)|-ErrorNotice.php(监控下载程序,在其出错时发信通知相关人)|-Proxy.php(校验数据库中的代理列表,分析其有效性及连接速度)|-Fork(钩子程序,使下载和分析并发)|-Main.sh(封装Main.php,使其在shell下运行不出现包含路径错误)|-Assign.sh(封装Assign.php)|-DelOvertimeData.sh(封装DelOvertimeData.php)|-ErrorNotice.sh(封装ErrorNotice.php)|-Proxy.sh(封装Proxy.php)本地数据库表结构如下(简单介绍):DownloadList表:ID int(10) unsigned NOT NULL auto_increment, 自增IDParentID int(11) NOT NULL default '0', 父ID,也就是该记录由哪个下载记录衍生来的SiteName char(32) NOT NULL default '', 采集网站的名称或代号LocalServerName char(32) NOT NULL default '', 该采集任务由本地若干台机器里的哪一台来完成URL char(255) NOT NULL default '', 需要下载的数据页地址FileName char(64) NOT NULL default '', 下载后保存的文件名FileSize int(11) NOT NULL default '0', 下载后文件的大小Handler char(64) NOT NULL default '', 分析器的php文件路径,如./Paser/WebSite1/Paser1.phpStatus enum('Wait','Download','Doing','Done','Dead') NOT NULL default 'Wait', 该任务的状态ProxyID int(11) NOT NULL default '0', 该任务使用的代理ID,为0则不使用代理下载Remark char(100) NOT NULL default '', 备注字段WaitAddTime datetime NOT NULL default '0000-00-00 00:00:00', 记录加入进行等待的时间DownloadAddTime datetime NOT NULL default '0000-00-00 00:00:00', 记录开始下载的时间DoingAddTime datetime NOT NULL default '0000-00-00 00:00:00', 记录开始分析的时间DoneAddTime datetime NOT NULL default '0000-00-00 00:00:00', 记录完成的时间ProxyList表:ID int(11) NOT NULL auto_increment, 自增IDProxy char(30) NOT NULL default '', 代理地址,如: 127.0.0.1:8080Status enum('Bad','Good','Perfect') NOT NULL default 'Bad', 该代理状态SocketTime float NOT NULL default '3', 本地连接该代理socket时间UsedCount int(11) NOT NULL default '0', 被使用的次数AddTime datetime NOT NULL default '0000-00-00 00:00:00', 代理被加入列表的时间LastTime datetime NOT NULL default '0000-00-00 00:00:00', 代理被最后一次验证的时间其它相关表:(略)介绍几个文件(只介绍,不贴代码):一、Main.phpclose();?>二、Assign.php/dev/null";}$LocalDB->close();?>三、Down.php四、Proxy.php (维护有效的代理列表)方法有两种:1、对代理地址的代理端口进行socket连接。设定连接超时为3秒(3秒仍旧连不上的代理就别要了)如果连接上了,计算连接时间,并更新该代理记录的数据SocketTime字段,判断其Status是Bad, Good,还是Perfect2、对于非Bad的代理,进行下载文件的实验,如果没使用代理下载的文件和使用代理下载的文件一样,则该代理真实有效。程序略多台机器分布式采集:只有一台运行Main.sh,2分钟运行一次。其他机器运行Assign.sh,1分钟一次,Assign.php会根据DownloadList表里的LocalServerName字段来取回任务并完成它。LocalServerName值由Main.php加载采集任务时分配。这个也可以根据各采集机器负载情况来自动调整。日志:采集分析的日志写如Log目录,以便方便的查看到是否采集到数据,分析程序是否有效,在出现错误时也可以找到错误的可能地点和时间。有点复杂,我只写了大体思路,页面分析部分没有涉及,但是也非常重要。后台管理也没谈。架起来之后很爽,只要你采集的机器多,建一个qihoo没问题。我给以前公司做的采集就是这个架构,采集sina, tom, 163等等一共143个频道的内容。对某几个网站收费数据的精确采集和分析也用的这个(当然,需要模拟登录)。还是相当稳定的
2023年08月07日
8 阅读
0 评论
0 点赞
2023-08-07
php里设置浏览器缓存
php里设置浏览器缓存$lastModified = time() + 30;header('Last-Modified: ' . gmdate('D, d M Y H:i:s', $lastModified) . ' GMT'); 此行设置后,会去服务器检查是否过期(文件是否修改过),如果没过期,就取本地文件(服务器需要返回header)header('Expires: ' . gmdate('D, d M Y H:i:s', $lastModified) . ' GMT'); 此行设置后,不再去服务器检查是否过期,直接取本地文件。header('Cache-Control: max-age=1'); (相对于本地时间 )还有一个Etag,是比较字符串,而不是时间。下面这段代码是禁止点后退后,缓存.header('Expires: Thu, 01 Jan 1970 00:00:00 GMT');// HTTP/1.1header('Cache-Control: private, no-store, no-cache, must-revalidate');header('Cache-Control: post-check=0, pre-check=0, max-age=0', false);psheader("Content-Type: application/json");//有时要加上这句,不然不起作用(看nginx配置)<?php// 这是一个缓存测试 $cache_time = 60*10; //十分钟缓存$modified_time = @$_SERVER['HTTP_IF_MODIFIED_SINCE']; if( strtotime($modified_time)+$cache_time > time() ){header("HTTP/1.1 304"); exit; } header("Last-Modified: ".gmdate("D, d M Y H:i:s", time() )." GMT"); header("Content-Type: application/json");//有时要加上这句,不然不起作用(看nginx配置) echo ""; echo date('Y-m-d H:i:s',time()); echo 222222; ?>
2023年08月07日
13 阅读
0 评论
0 点赞
2023-08-07
PHP下的命令行执行
PHP下的命令行执行PHP 的命令行模式以下是 PHP 二进制文件(即 php.exe 程序)提供的命令行模式的选项参数,您随时可以通过 PHP -h 命令来查询这些参数。Usage: php [options] [-f] [args...]php [options] -r [args...]php [options] [-- args...]-s Display colour syntax highlighted source.-w Display source with stripped comments and whitespace.-f Parse .-v Version number-c | Look for php.ini file in this directory-a Run interactively-d foo[=bar] Define INI entry foo with value 'bar'-e Generate extended information for debugger/profiler-z Load Zend extension .-l Syntax check only (lint)-m Show compiled in modules-i PHP information-r Run PHP without using script tags <?..?>-h This helpargs... Arguments passed to script. Use -- args when first argumentstarts with - or script is read from stdinCLI SAPI 模块有以下三种不同的方法来获取您要运行的 PHP 代码:在windows环境下,尽量使用双引号, 在linux环境下则尽量使用单引号来完成。让 PHP 运行指定文件。php my_script.phpphp -f "my_script.php"以上两种方法(使用或不使用 -f 参数)都能够运行给定的 my_script.php 文件。您可以选择任何文件来运行,您指定的 PHP 脚本并非必须要以 .php 为扩展名,它们可以有任意的文件名和扩展名。在命令行直接运行 PHP 代码。php -r "print_r(get_defined_constants());"在使用这种方法时,请您注意外壳变量的替代及引号的使用。注: 请仔细阅读以上范例,在运行代码时没有开始和结束的标记符!加上 -r 参数后,这些标记符是不需要的,加上它们会导致语法错误。通过标准输入(stdin)提供需要运行的 PHP 代码。以上用法给我们提供了非常强大的功能,使得我们可以如下范例所示,动态地生成 PHP 代码并通过命令行运行这些代码:$ some_application | some_filter | php | sort -u >final_output.txt以上三种运行代码的方法不能同时使用。和所有的外壳应用程序一样,PHP 的二进制文件(php.exe 文件)及其运行的 PHP 脚本能够接受一系列的参数。PHP 没有限制传送给脚本程序的参数的个数(外壳程序对命令行的字符数有限制,但您通常都不会超过该限制)。传递给您脚本的参数可在全局变量 $argv 中获取。该数组中下标为零的成员为脚本的名称(当 PHP 代码来自标准输入获直接用 -r 参数以命令行方式运行时,该名称为“-”)。另外,全局变量 $argc 存有 $argv 数组中成员变量的个数(而非传送给脚本程序的参数的个数)。只要您传送给您脚本的参数不是以 - 符号开头,您就无需过多的注意什么。向您的脚本传送以 - 开头的参数会导致错误,因为 PHP 会认为应该由它自身来处理这些参数。您可以用参数列表分隔符 -- 来解决这个问题。在 PHP 解析完参数后,该符号后所有的参数将会被原样传送给您的脚本程序。以下命令将不会运行 PHP 代码,而只显示 PHP 命令行模式的使用说明:$ php -r 'var_dump($argv);' -hUsage: php [options] [-f] [args...][...]以下命令将会把“-h”参数传送给脚本程序,PHP 不会显示命令行模式的使用说明:$ php -r "var_dump($argv);" -- -harray(2) {[0]=>string(1) "-"[1]=>string(2) "-h"}除此之外,我们还有另一个方法将 PHP 用于外壳脚本。您可以在写一个脚本,并在第一行以 #!/usr/bin/php 开头,在其后加上以 PHP 开始和结尾标记符包含的正常的 PHP 代码,然后为该文件设置正确的运行属性。该方法可以使得该文件能够像外壳脚本或 PERL 脚本一样被直接执行。!/usr/bin/php<?phpvar_dump($argv);?>假设改文件名为 test 并被放置在当前目录下,我们可以做如下操作:$ chmod 755 test$ ./test -h -- fooarray(4) {[0]=>string(6) "./test"[1]=>string(2) "-h"[2]=>string(2) "--"[3]=>string(3) "foo"}正如您所看到的,在您向该脚本传送以 - 开头的参数时,脚本仍然能够正常运行。------------------------------------------------------------------------------命令选项-----------------------------------------------------表格 23-3. 命令行选项选项名称 描 述-s 显示有语法高亮色彩的源文件。该参数使用内建机制来解析文件并为其生成一个 HTML 高亮版本并将结果写到标准输出。请注意该过程所做的只是生成了一个 [...] 的 HTML 标记符块,并不包含任何的 HTML 头。注: 该选项不能和 -r 参数同时使用。-w 显示除去了注释和空格的源代码。注: 该选项不能和 -r 参数同时使用。-f 解析并运行给定的文件名。该参数为可选参数且可以不加,仅指明需要运行的文件名即可。-v 将 PHP、PHP SAPI 及 Zend 的版本信息写入的标准输出。例如:$ php -vPHP 4.3.0-dev (cli), Copyright (c) 1997-2002 The PHP GroupZend Engine v1.3.0, Copyright (c) 1998-2002 Zend Technologies-c 用该参数,您可以指定一个放置 php.ini 文件的目录,或者直接指定一个自定义的 INI 文件,其文件名可以不是 php.ini。例如:$ php -c /custom/directory/ my_script.php$ php -c /custom/directory/custom-file.ini my_script.php-a 交互地运行 PHP。-d 用该参数可以自行设置 php.ini 文件中设置变量的值,其语法为:-d configuration_directive[=value]范例:Ommiting the value part will set the given configuration directive to "1"$ php -d max_execution_time-r '$foo = ini_get("max_execution_time"); var_dump($foo);'string(1) "1"Passing an empty value part will set the configuration directive to ""php -d max_execution_time=-r '$foo = ini_get("max_execution_time"); var_dump($foo);'string(0) ""The configuration directive will be set to anything passed after the '=' character$ php -d max_execution_time=20-r '$foo = ini_get("max_execution_time"); var_dump($foo);'string(2) "20"$ php-d max_execution_time=doesntmakesense-r '$foo = ini_get("max_execution_time"); var_dump($foo);'string(15) "doesntmakesense"-e 为调试器等生成扩展信息。-z 加载 Zend 扩展库。如果仅给定一个文件名,PHP 将试图从您系统扩展库的默认路径(在 Linux 系统下,该路径通常由/etc/ld.so.conf 指定)加载该扩展库。如果您用一个绝对路径指定文件名,则系统的扩展库默认路径将不会被使用。如果用相对路径指定的文件名,PHP 则仅试图加载相对于当前目录的扩展库。-l 该参数提供了对指定 PHP 代码进行语法检查的方便的方法。如果成功,则向标准输出写入 No syntax errors detected in 字符串,并且外壳返回值为 0。如果失败,则 Errors parsing 以及内部解析器错误信息会一起被写入到标准输出,同时外壳返回值将别设置为 255。该参数将无法检查致命错误(如未定义函数),如果您希望检测之名错误,请使用 -f 参数。注: 该参数不能和 -r 一同使用。-m 使用该参数,PHP 将打印出内置以及已加载的 PHP 及 Zend 模块:$ php -m[PHP Modules]xmltokenizerstandardsessionposixpcreoverloadmysqlmbstringctype[Zend Modules]-i 该命令行参数会调用 phpinfo() 函数,并打印出结果。如果 PHP 没有正常工作,我们建议您执行 php -i 命令来查看在信息表格之前或者对应的地方是否有任何错误信息输出。请注意输出的内容为 HTML 格式,因此输出的信息篇幅较大。-r 使用该参数可以在命令行运行 PHP 代码。您无需加上 PHP 的起始和结束标识符(<?php 和 ?>),否则将会导致语法解析错误。注: 使用这种形式的 PHP 时,应个别注意避免和外壳环境进行的命令行参数替换相冲突。显示语法解析错误的范例$ php -r "$foo = get_defined_constants();"Command line code(1) : Parse error - parse error, unexpected '='这里的问题在于即时使用了双引号 ",sh/bash 仍然实行了参数替换。由于 $foo 没有被定义,被替换后它所在的位置变成了空字符,因此在运行时,实际被 PHP 读取的代码为:$ php -r " = get_defined_constants();"正确的方法是使用单引号 '。在用单引号引用的字符串中,变量不会被 sh/bash 还原成其原值。$ php -r '$foo = get_defined_constants(); var_dump($foo);'array(370) {["E_ERROR"]=>int(1)["E_WARNING"]=>int(2)["E_PARSE"]=>int(4)["E_NOTICE"]=>int(8)["E_CORE_ERROR"]=>[...]如果您使用的外壳不是 sh/bash,您可能会碰到其它的问题。请报告您碰到的 bug,或者发邮件到 phpdoc@lists.php.net。当您试图将外壳的环境变量引入到马或者用反斜线来转义字符时也可能碰到各种各样的问题,请您在使用时注意!注: -r 在 CLI SAPI 中有效,在 CGI SAPI 中无效。-h 使用该参数,您可以得到完整的命令行参数的列表及这些参数作用的简单描述。PHP 的命令行模式能使得 PHP 脚本能完全独立于 WEB 服务器单独运行。如果您使用 Unix 系统,您需要在您的 PHP 脚本的最前面加上一行特殊的代码,使得它能够被执行,这样系统就能知道用什么样的程序要运行该脚本。在 Windows 平台下您可以将 php.exe 和.php 文件的双击属性相关联,您也可以编写一个批处理文件来用 PHP 执行脚本。为 Unix 系统增加的第一行代码不会影响该脚本在 Windows 下的运行,因此您也可以用该方法编写跨平台的脚本程序。以下是一个简单的PHP 命令行程序的范例。例子 23-1. 试图以命令行方式运行的 PHP 脚本(script.php)!/usr/bin/php<?phpif ($argc != 2 || in_array($argv[1], array('--help', '-help', '-h', '-?'))) {?>This is a command line PHP script with one option.Usage:<?php echo $argv[0]; ?> can be some word you would liketo print out. With the --help, -help, -h,or -? options, you can get this help.<?php} else {echo $argv[1];}?>在以上脚本中,我们用第一行特殊的代码来指明该文件应该由 PHP 来执行。我们在这里使用 CLI 的版本,因此不会有 HTTP 头信息输出。在您用 PHP 编写命令行应用程序时,您可以使用两个参数:$argc 和 $argv。前面一个的值是比参数个数大 1 的整数(运行的脚本本身的名称也被当作一个参数)。第二个时包含有参数的数组,其第一个元素为脚本的名称,下标为数字 0($argv[0])。在以上程序中我们检查了参数的个数是大于 1 个还是小于 1 个。即时参数是 --help、-help、-h 或 -?,我们仍然打印出帮助信息,并同时动态输出脚本的名称。如果还收到了其它参数,我们也把它们显示出来。如果您希望在 Unix 下运行以上脚本,您需要使得它成为可执行脚本,然后简单的运行 script.php echothis 或 script.php -h。在 Windows 下,您可以为此编写一个批处理文件:@c:\php\cli\php.exe script.php %1 %2 %3 %4
2023年08月07日
13 阅读
0 评论
0 点赞
2023-08-07
在PHP中使用协程实现多任务调度
在PHP中使用协程实现多任务调度PHP5.5一个比较好的新功能是加入了对迭代生成器和协程的支持.对于生成器,PHP的文档和各种其他的博客文章已经有了非常详细的讲解.协程相对受到的关注就少了,因为协程虽然有很强大的功能但相对比较复杂, 也比较难被理解,解释起来也比较困难.这篇文章将尝试通过介绍如何使用协程来实施任务调度, 来解释在PHP中的协程.我将在前三节做一个简单的背景介绍.如果你已经有了比较好的基础,可以直接跳到“协同多任务处理”一节.迭代生成器(迭代)生成器也是一个函数,不同的是这个函数的返回值是依次返回,而不是只返回一个单独的值.或者,换句话说,生成器使你能更方便的实现了迭代器接口.下面通过实现一个xrange函数来简单说明:<?phpfunction xrange($start, $end, $step = 1) {for ($i = $start; $i <= $end; $i += $step) { yield $i; }}foreach (xrange(1, 1000000) as $num) {echo $num, "\n";}上面这个xrange()函数提供了和PHP的内建函数range()一样的功能.但是不同的是range()函数返回的是一个包含值从1到100万0的数组(注:请查看手册). 而xrange()函数返回的是依次输出这些值的一个迭代器, 而不会真正以数组形式返回.这种方法的优点是显而易见的.它可以让你在处理大数据集合的时候不用一次性的加载到内存中.甚至你可以处理无限大的数据流.当然,也可以不同通过生成器来实现这个功能,而是可以通过继承Iterator接口实现.但通过使用生成器实现起来会更方便,不用再去实现iterator接口中的5个方法了.生成器为可中断的函数要从生成器认识协程, 理解它内部是如何工作是非常重要的: 生成器是一种可中断的函数, 在它里面的yield构成了中断点.还是看上面的例子, 调用xrange(1,1000000)的时候, xrange()函数里代码其实并没有真正地运行. 它只是返回了一个迭代器:<?php$range = xrange(1, 1000000);var_dump($range); // object(Generator)#1var_dump($range instanceof Iterator); // bool(true)?>这也解释了为什么xrange叫做迭代生成器, 因为它返回一个迭代器, 而这个迭代器实现了Iterator接口.调用迭代器的方法一次, 其中的代码运行一次.例如, 如果你调用$range->rewind(), 那么xrange()里的代码就会运行到控制流第一次出现yield的地方. 而函数内传递给yield语句的返回值可以通过$range->current()获取.为了继续执行生成器中yield后的代码, 你就需要调用$range->next()方法. 这将再次启动生成器, 直到下一次yield语句出现. 因此,连续调用next()和current()方法, 你就能从生成器里获得所有的值, 直到再没有yield语句出现.对xrange()来说, 这种情形出现在$i超过$end时. 在这中情况下, 控制流将到达函数的终点,因此将不执行任何代码.一旦这种情况发生,vaild()方法将返回假, 这时迭代结束.协程协程的支持是在迭代生成器的基础上, 增加了可以回送数据给生成器的功能(调用者发送数据给被调用的生成器函数). 这就把生成器到调用者的单向通信转变为两者之间的双向通信.传递数据的功能是通过迭代器的send()方法实现的. 下面的logger()协程是这种通信如何运行的例子:<?phpfunction logger($fileName) {$fileHandle = fopen($fileName, 'a'); while (true) { fwrite($fileHandle, yield . "\n"); }}$logger = logger(__DIR__ . '/log');$logger->send('Foo');$logger->send('Bar')?>正如你能看到,这儿yield没有作为一个语句来使用, 而是用作一个表达式, 即它能被演化成一个值. 这个值就是调用者传递给send()方法的值. 在这个例子里, yield表达式将首先被"Foo"替代写入Log, 然后被"Bar"替代写入Log.上面的例子里演示了yield作为接受者, 接下来我们看如何同时进行接收和发送的例子:<?phpfunction gen() {$ret = (yield 'yield1'); var_dump($ret); $ret = (yield 'yield2'); var_dump($ret);}$gen = gen();var_dump($gen->current()); // string(6) "yield1"var_dump($gen->send('ret1')); // string(4) "ret1" (the first var_dump in gen) // string(6) "yield2" (the var_dump of the ->send() return value)var_dump($gen->send('ret2')); // string(4) "ret2" (again from within gen) // NULL (the return value of ->send())?>要很快的理解输出的精确顺序可能稍微有点困难, 但你确定要搞清楚为什按照这种方式输出. 以便后续继续阅读.另外, 我要特别指出的有两点:第一点,yield表达式两边的括号在PHP7以前不是可选的, 也就是说在PHP5.5和PHP5.6中圆括号是必须的.第二点,你可能已经注意到调用current()之前没有调用rewind().这是因为生成迭代对象的时候已经隐含地执行了rewind操作.多任务协作如果阅读了上面的logger()例子, 你也许会疑惑“为了双向通信我为什么要使用协程呢?我完全可以使用其他非协程方法实现同样的功能啊?", 是的, 你是对的, 但上面的例子只是为了演示了基本用法, 这个例子其实并没有真正的展示出使用协程的优点.正如上面介绍里提到的,协程是非常强大的概念,不过却应用的很稀少而且常常十分复杂.要给出一些简单而真实的例子很难.在这篇文章里,我决定去做的是使用协程实现多任务协作.我们要解决的问题是你想并发地运行多任务(或者“程序”).不过我们都知道CPU在一个时刻只能运行一个任务(不考虑多核的情况).因此处理器需要在不同的任务之间进行切换,而且总是让每个任务运行 “一小会儿”.多任务协作这个术语中的“协作”很好的说明了如何进行这种切换的:它要求当前正在运行的任务自动把控制传回给调度器,这样就可以运行其他任务了. 这与“抢占”多任务相反, 抢占多任务是这样的:调度器可以中断运行了一段时间的任务, 不管它喜欢还是不喜欢. 协作多任务在Windows的早期版本(windows95)和Mac OS中有使用, 不过它们后来都切换到使用抢先多任务了. 理由相当明确:如果你依靠程序自动交出控制的话, 那么一些恶意的程序将很容易占用整个CPU, 不与其他任务共享.现在你应当明白协程和任务调度之间的关系:yield指令提供了任务中断自身的一种方法, 然后把控制交回给任务调度器. 因此协程可以运行多个其他任务. 更进一步来说, yield还可以用来在任务和调度器之间进行通信.为了实现我们的多任务调度, 首先实现“任务” -- 一个用轻量级的包装的协程函数:<?phpclass Task {protected $taskId; protected $coroutine; protected $sendValue = null; protected $beforeFirstYield = true; public function __construct($taskId, Generator $coroutine) { $this->taskId = $taskId; $this->coroutine = $coroutine; } public function getTaskId() { return $this->taskId; } public function setSendValue($sendValue) { $this->sendValue = $sendValue; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } public function isFinished() { return !$this->coroutine->valid(); }}如代码, 一个任务就是用任务ID标记的一个协程(函数). 使用setSendValue()方法, 你可以指定哪些值将被发送到下次的恢复(在之后你会了解到我们需要这个), run()函数确实没有做什么, 除了调用send()方法的协同程序, 要理解为什么添加了一个 beforeFirstYieldflag变量, 需要考虑下面的代码片段:<?phpfunction gen() {yield 'foo'; yield 'bar';}$gen = gen();var_dump($gen->send('something'));// 如之前提到的在send之前, 当$gen迭代器被创建的时候一个renwind()方法已经被隐式调用// 所以实际上发生的应该类似://$gen->rewind();//var_dump($gen->send('something'));//这样renwind的执行将会导致第一个yield被执行, 并且忽略了他的返回值.//真正当我们调用yield的时候, 我们得到的是第二个yield的值! 导致第一个yield的值被忽略.//string(3) "bar"通过添加 beforeFirstYieldcondition 我们可以确定第一个yield的值能被正确返回.调度器现在不得不比多任务循环要做稍微多点了, 然后才运行多任务:<?phpclass Scheduler {protected $maxTaskId = 0; protected $taskMap = []; // taskId => task protected $taskQueue; public function __construct() { $this->taskQueue = new SplQueue(); } public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine); $this->taskMap[$tid] = $task; $this->schedule($task); return $tid; } public function schedule(Task $task) { $this->taskQueue->enqueue($task); } public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $task->run(); if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } }}?>newTask()方法(使用下一个空闲的任务id)创建一个新任务,然后把这个任务放入任务map数组里. 接着它通过把任务放入任务队列里来实现对任务的调度. 接着run()方法扫描任务队列, 运行任务.如果一个任务结束了, 那么它将从队列里删除, 否则它将在队列的末尾再次被调度.让我们看看下面具有两个简单(没有什么意义)任务的调度器:<?phpfunction task1() {for ($i = 1; $i <= 10; ++$i) { echo "This is task 1 iteration $i.\n"; yield; }}function task2() {for ($i = 1; $i <= 5; ++$i) { echo "This is task 2 iteration $i.\n"; yield; }}$scheduler = new Scheduler;$scheduler->newTask(task1());$scheduler->newTask(task2());$scheduler->run();两个任务都仅仅回显一条信息,然后使用yield把控制回传给调度器.输出结果如下:This is task 1 iteration 1.This is task 2 iteration 1.This is task 1 iteration 2.This is task 2 iteration 2.This is task 1 iteration 3.This is task 2 iteration 3.This is task 1 iteration 4.This is task 2 iteration 4.This is task 1 iteration 5.This is task 2 iteration 5.This is task 1 iteration 6.This is task 1 iteration 7.This is task 1 iteration 8.This is task 1 iteration 9.This is task 1 iteration 10.输出确实如我们所期望的:对前五个迭代来说,两个任务是交替运行的, 而在第二个任务结束后, 只有第一个任务继续运行.与调度器之间通信既然调度器已经运行了, 那么我们来看下一个问题:任务和调度器之间的通信.我们将使用进程用来和操作系统会话的同样的方式来通信:系统调用.我们需要系统调用的理由是操作系统与进程相比它处在不同的权限级别上. 因此为了执行特权级别的操作(如杀死另一个进程), 就不得不以某种方式把控制传回给内核, 这样内核就可以执行所说的操作了. 再说一遍, 这种行为在内部是通过使用中断指令来实现的. 过去使用的是通用的int指令, 如今使用的是更特殊并且更快速的syscall/sysenter指令.我们的任务调度系统将反映这种设计:不是简单地把调度器传递给任务(这样就允许它做它想做的任何事), 我们将通过给yield表达式传递信息来与系统调用通信. 这儿yield即是中断, 也是传递信息给调度器(和从调度器传递出信息)的方法.为了说明系统调用, 我们对可调用的系统调用做一个小小的封装:<?phpclass SystemCall {protected $callback; public function __construct(callable $callback) { $this->callback = $callback; } public function __invoke(Task $task, Scheduler $scheduler) { $callback = $this->callback; return $callback($task, $scheduler); }}它和其他任何可调用的对象(使用_invoke)一样的运行, 不过它要求调度器把正在调用的任务和自身传递给这个函数.为了解决这个问题我们不得不微微的修改调度器的run方法:<?phppublic function run() {while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $retval = $task->run(); if ($retval instanceof SystemCall) { $retval($task, $this); continue; } if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } }}第一个系统调用除了返回任务ID外什么都没有做:<?phpfunction getTaskId() {return new SystemCall(function(Task $task, Scheduler $scheduler) { $task->setSendValue($task->getTaskId()); $scheduler->schedule($task); });}这个函数设置任务id为下一次发送的值, 并再次调度了这个任务 .由于使用了系统调用, 所以调度器不能自动调用任务, 我们需要手工调度任务(稍后你将明白为什么这么做). 要使用这个新的系统调用的话, 我们要重新编写以前的例子:<?phpfunction task($max) {$tid = (yield getTaskId()); // <-- here's the syscall! for ($i = 1; $i <= $max; ++$i) { echo "This is task $tid iteration $i.\n"; yield; }}$scheduler = new Scheduler;$scheduler->newTask(task(10));$scheduler->newTask(task(5));$scheduler->run();?>这段代码将给出与前一个例子相同的输出. 请注意系统调用如何同其他任何调用一样正常地运行, 只不过预先增加了yield.要创建新的任务, 然后再杀死它们的话, 需要两个以上的系统调用:<?phpfunction newTask(Generator $coroutine) {return new SystemCall( function(Task $task, Scheduler $scheduler) use ($coroutine) { $task->setSendValue($scheduler->newTask($coroutine)); $scheduler->schedule($task); } );}function killTask($tid) {return new SystemCall( function(Task $task, Scheduler $scheduler) use ($tid) { $task->setSendValue($scheduler->killTask($tid)); $scheduler->schedule($task); } );}killTask函数需要在调度器里增加一个方法:<?phppublic function killTask($tid) {if (!isset($this->taskMap[$tid])) { return false; } unset($this->taskMap[$tid]); // This is a bit ugly and could be optimized so it does not have to walk the queue, // but assuming that killing tasks is rather rare I won't bother with it now foreach ($this->taskQueue as $i => $task) { if ($task->getTaskId() === $tid) { unset($this->taskQueue[$i]); break; } } return true;}用来测试新功能的微脚本:<?phpfunction childTask() {$tid = (yield getTaskId()); while (true) { echo "Child task $tid still alive!\n"; yield; }}function task() {$tid = (yield getTaskId()); $childTid = (yield newTask(childTask())); for ($i = 1; $i <= 6; ++$i) { echo "Parent task $tid iteration $i.\n"; yield; if ($i == 3) yield killTask($childTid); }}$scheduler = new Scheduler;$scheduler->newTask(task());$scheduler->run();?>这段代码将打印以下信息:Parent task 1 iteration 1.Child task 2 still alive!Parent task 1 iteration 2.Child task 2 still alive!Parent task 1 iteration 3.Child task 2 still alive!Parent task 1 iteration 4.Parent task 1 iteration 5.Parent task 1 iteration 6.经过三次迭代以后子任务将被杀死, 因此这就是"Child is still alive"消息结束的时候. 不过你要明白这还不是真正的父子关系. 因为在父任务结束后子任务仍然可以运行, 子任务甚至可以杀死父任务. 可以修改调度器使它具有更层级化的任务结构, 不过这个不是我们这个文章要继续讨论的范围了.现在你可以实现许多进程管理调用. 例如 wait(它一直等待到任务结束运行时), exec(它替代当前任务)和fork(它创建一个当前任务的克隆). fork非常酷,而 且你可以使用PHP的协程真正地实现它, 因为它们都支持克隆.让我们把这些留给有兴趣的读者吧,我们来看下一个议题.非阻塞IO很明显, 我们的任务管理系统的真正很酷的应用应该是web服务器. 它有一个任务是在套接字上侦听是否有新连接, 当有新连接要建立的时候, 它创建一个新任务来处理新连接.Web服务器最难的部分通常是像读数据这样的套接字操作是阻塞的. 例如PHP将等待到客户端完成发送为止. 对一个Web服务器来说, 这有点不太高效. 因为服务器在一个时间点上只能处理一个连接.解决方案是确保在真正对套接字读写之前该套接字已经“准备就绪”. 为了查找哪个套接字已经准备好读或者写了, 可以使用 流选择函数.首先,让我们添加两个新的 syscall, 它们将等待直到指定socket 准备好:<?phpfunction waitForRead($socket) {return new SystemCall( function(Task $task, Scheduler $scheduler) use ($socket) { $scheduler->waitForRead($socket, $task); } );}function waitForWrite($socket) {return new SystemCall( function(Task $task, Scheduler $scheduler) use ($socket) { $scheduler->waitForWrite($socket, $task); } );}这些 syscall 只是在调度器中代理其各自的方法:<?php// resourceID => [socket, tasks]protected $waitingForRead = [];protected $waitingForWrite = [];public function waitForRead($socket, Task $task) {if (isset($this->waitingForRead[(int) $socket])) { $this->waitingForRead[(int) $socket][1][] = $task; } else { $this->waitingForRead[(int) $socket] = [$socket, [$task]]; }}public function waitForWrite($socket, Task $task) {if (isset($this->waitingForWrite[(int) $socket])) { $this->waitingForWrite[(int) $socket][1][] = $task; } else { $this->waitingForWrite[(int) $socket] = [$socket, [$task]]; }}waitingForRead 及 waitingForWrite 属性是两个承载等待的socket 及等待它们的任务的数组. 有趣的部分在于下面的方法,它将检查 socket 是否可用, 并重新安排各自任务:<?phpprotected function ioPoll($timeout) {$rSocks = []; foreach ($this->waitingForRead as list($socket)) { $rSocks[] = $socket; } $wSocks = []; foreach ($this->waitingForWrite as list($socket)) { $wSocks[] = $socket; } $eSocks = []; // dummy if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) { return; } foreach ($rSocks as $socket) { list(, $tasks) = $this->waitingForRead[(int) $socket]; unset($this->waitingForRead[(int) $socket]); foreach ($tasks as $task) { $this->schedule($task); } } foreach ($wSocks as $socket) { list(, $tasks) = $this->waitingForWrite[(int) $socket]; unset($this->waitingForWrite[(int) $socket]); foreach ($tasks as $task) { $this->schedule($task); } }}stream_select 函数接受承载读取、写入以及待检查的socket的数组(我们无需考虑最后一类). 数组将按引用传递, 函数只会保留那些状态改变了的数组元素. 我们可以遍历这些数组, 并重新安排与之相关的任务.为了正常地执行上面的轮询动作, 我们将在调度器里增加一个特殊的任务:<?phpprotected function ioPollTask() {while (true) { if ($this->taskQueue->isEmpty()) { $this->ioPoll(null); } else { $this->ioPoll(0); } yield; }}?>需要在某个地方注册这个任务, 例如, 你可以在run()方法的开始增加$this->newTask($this->ioPollTask()). 然后就像其他任务一样每执行完整任务循环一次就执行轮询操作一次(这么做一定不是最好的方法), ioPollTask将使用0秒的超时来调用ioPoll, 也就是stream_select将立即返回(而不是等待).只有任务队列为空时,我们才使用null超时,这意味着它一直等到某个套接口准备就绪.如果我们没有这么做,那么轮询任务将一而再, 再而三的循环运行, 直到有新的连接建立. 这将导致100%的CPU利用率. 相反, 让操作系统做这种等待会更有效.现在编写服务器就相对容易了:<?phpfunction server($port) {echo "Starting server at port $port...\n"; $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); if (!$socket) throw new Exception($errStr, $errNo); stream_set_blocking($socket, 0); while (true) { yield waitForRead($socket); $clientSocket = stream_socket_accept($socket, 0); yield newTask(handleClient($clientSocket)); }}function handleClient($socket) {yield waitForRead($socket); $data = fread($socket, 8192); $msg = "Received following request:\n\n$data"; $msgLength = strlen($msg); $response = <<<resHTTP/1.1 200 OK\rContent-Type: text/plain\rContent-Length: $msgLength\rConnection: close\r\r$msgRES;yield waitForWrite($socket); fwrite($socket, $response); fclose($socket);}$scheduler = new Scheduler;$scheduler->newTask(server(8000));$scheduler->run();这段代码实现了接收localhost:8000上的连接, 然后返回发送来的内容作为HTTP响应. 当然它还能处理真正的复杂HTTP请求, 上面的代码片段只是演示了一般性的概念.你可以使用类似于ab -n 10000 -c 100 localhost:8000/这样命令来测试服务器. 这条命令将向服务器发送10000个请求, 并且其中100个请求将同时到达. 使用这样的数目, 我得到了处于中间的10毫秒的响应时间. 不过还有一个问题:有少数几个请求真正处理的很慢(如5秒), 这就是为什么总吞吐量只有2000请求/秒(如果是10毫秒的响应时间的话, 总的吞吐量应该更像是10000请求/秒)协程堆栈如果你试图用我们的调度系统建立更大的系统的话, 你将很快遇到问题:我们习惯了把代码分解为更小的函数, 然后调用它们. 然而, 如果使用了协程的话, 就不能这么做了. 例如,看下面代码:<?phpfunction echoTimes($msg, $max) {for ($i = 1; $i <= $max; ++$i) { echo "$msg iteration $i\n"; yield; }}function task() {echoTimes('foo', 10); // print foo ten times echo "---\n"; echoTimes('bar', 5); // print bar five times yield; // force it to be a coroutine}$scheduler = new Scheduler;$scheduler->newTask(task());$scheduler->run();这段代码试图把重复循环“输出n次“的代码嵌入到一个独立的协程里,然后从主任务里调用它. 然而它无法运行. 正如在这篇文章的开始所提到的, 调用生成器(或者协程)将没有真正地做任何事情, 它仅仅返回一个对象.这 也出现在上面的例子里:echoTimes调用除了放回一个(无用的)协程对象外不做任何事情.为了仍然允许这么做,我们需要在这个裸协程上写一个小小的封装.我们将调用它:“协程堆栈”. 因为它将管理嵌套的协程调用堆栈. 这将是通过生成协程来调用子协程成为可能:$retval = (yield someCoroutine($foo, $bar));使用yield,子协程也能再次返回值:yield retval("I'm a return value!");retval函数除了返回一个值的封装外没有做任何其他事情.这个封装将表示它是一个返回值.<?phpclass CoroutineReturnValue {protected $value; public function __construct($value) { $this->value = $value; } public function getValue() { return $this->value; }}function retval($value) {return new CoroutineReturnValue($value);}为了把协程转变为协程堆栈(它支持子调用),我们将不得不编写另外一个函数(很明显,它是另一个协程):<?phpfunction stackedCoroutine(Generator $gen) {$stack = new SplStack; for (;;) { $value = $gen->current(); if ($value instanceof Generator) { $stack->push($gen); $gen = $value; continue; } $isReturnValue = $value instanceof CoroutineReturnValue; if (!$gen->valid() || $isReturnValue) { if ($stack->isEmpty()) { return; } $gen = $stack->pop(); $gen->send($isReturnValue ? $value->getValue() : NULL); continue; } $gen->send(yield $gen->key() => $value); }}这个函数在调用者和当前正在运行的子协程之间扮演着简单代理的角色.在$gen->send(yield $gen->key()=>$value);这行完成了代理功能.另外它检查返回值是否是生成器,万一是生成器的话,它将开始运行这个生成器,并把前一个协程压入堆栈里.一旦它获得了CoroutineReturnValue的话,它将再次请求堆栈弹出,然后继续执行前一个协程.为了使协程堆栈在任务里可用,任务构造器里的$this-coroutine =$coroutine;这行需要替代为$this->coroutine = StackedCoroutine($coroutine);.现在我们可以稍微改进上面web服务器例子:把wait+read(和wait+write和warit+accept)这样的动作分组为函数.为了分组相关的 功能,我将使用下面类:<?phpclass CoSocket {protected $socket; public function __construct($socket) { $this->socket = $socket; } public function accept() { yield waitForRead($this->socket); yield retval(new CoSocket(stream_socket_accept($this->socket, 0))); } public function read($size) { yield waitForRead($this->socket); yield retval(fread($this->socket, $size)); } public function write($string) { yield waitForWrite($this->socket); fwrite($this->socket, $string); } public function close() { @fclose($this->socket); }}现在服务器可以编写的稍微简洁点了:<?phpfunction server($port) {echo "Starting server at port $port...\n"; $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); if (!$socket) throw new Exception($errStr, $errNo); stream_set_blocking($socket, 0); $socket = new CoSocket($socket); while (true) { yield newTask( handleClient(yield $socket->accept()) ); }}function handleClient($socket) {$data = (yield $socket->read(8192)); $msg = "Received following request:\n\n$data"; $msgLength = strlen($msg); $response = <<<resHTTP/1.1 200 OK\rContent-Type: text/plain\rContent-Length: $msgLength\rConnection: close\r\r$msgRES;yield $socket->write($response); yield $socket->close();}错误处理作为一个优秀的程序员, 相信你已经察觉到上面的例子缺少错误处理. 几乎所有的 socket 都是易出错的. 我没有这样做的原因一方面固然是因为错误处理的乏味(特别是 socket), 另一方面也在于它很容易使代码体积膨胀.不过, 我仍然想讲下常见的协程错误处理:协程允许使用 throw() 方法在其内部抛出一个错误.throw() 方法接受一个 Exception, 并将其抛出到协程的当前悬挂点, 看看下面代码:<?phpfunction gen() {echo "Foo\n"; try { yield; } catch (Exception $e) { echo "Exception: {$e->getMessage()}\n"; } echo "Bar\n";}$gen = gen();$gen->rewind(); // echos "Foo"$gen->throw(new Exception('Test')); // echos "Exception: Test" // and "Bar"这非常好, 有没有? 因为我们现在可以使用系统调用以及子协程调用异常抛出了.不过我们要对系统调用Scheduler::run() 方法做一些小调整:<?phpif ($retval instanceof SystemCall) {try { $retval($task, $this); } catch (Exception $e) { $task->setException($e); $this->schedule($task); } continue;}Task 类也要添加 throw 调用处理:<?phpclass Task {// ... protected $exception = null; public function setException($exception) { $this->exception = $exception; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } elseif ($this->exception) { $retval = $this->coroutine->throw($this->exception); $this->exception = null; return $retval; } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } // ...}现在, 我们已经可以在系统调用中使用异常抛出了!例如,要调用 killTask,让我们在传递 ID 不可用时抛出一个异常:<?phpfunction killTask($tid) {return new SystemCall( function(Task $task, Scheduler $scheduler) use ($tid) { if ($scheduler->killTask($tid)) { $scheduler->schedule($task); } else { throw new InvalidArgumentException('Invalid task ID!'); } } );}试试看:<?phpfunction task() {try { yield killTask(500); } catch (Exception $e) { echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n"; }}这些代码现在尚不能正常运作,因为 stackedCoroutine 函数无法正确处理异常.要修复需要做些调整:<?phpfunction stackedCoroutine(Generator $gen) {$stack = new SplStack; $exception = null; for (;;) { try { if ($exception) { $gen->throw($exception); $exception = null; continue; } $value = $gen->current(); if ($value instanceof Generator) { $stack->push($gen); $gen = $value; continue; } $isReturnValue = $value instanceof CoroutineReturnValue; if (!$gen->valid() || $isReturnValue) { if ($stack->isEmpty()) { return; } $gen = $stack->pop(); $gen->send($isReturnValue ? $value->getValue() : NULL); continue; } try { $sendValue = (yield $gen->key() => $value); } catch (Exception $e) { $gen->throw($e); continue; } $gen->send($sendValue); } catch (Exception $e) { if ($stack->isEmpty()) { throw $e; } $gen = $stack->pop(); $exception = $e; } }}结束语在这篇文章里,我使用多任务协作构建了一个任务调度器, 其中包括执行“系统调用”, 做非阻塞操作和处理错误. 所有这些里真正很酷的事情是任务的结果代码看起来完全同步, 甚至任务正在执行大量的异步操作的时候也是这样.如果你打算从套接口读取数据的话, 你将不需要传递某个回调函数或者注册一个事件侦听器. 相反, 你只要书写yield $socket->read(). 这儿大部分都是你常常也要编写的,只 在它的前面增加yield.当我第一次听到协程的时候, 我发现这个概念完全令人折服, 正是因为这个激励我在PHP中实现了它. 同时我发现协程真正非常的令人惊叹:在令人敬畏的代码和一大堆乱代码之间只有一线之隔, 我认为协程恰好处在这条线上, 不多不少. 不过, 要说使用上面所述的方法书写异步代码是否真的有益, 这个就见仁见智了.但, 不管咋样, 我认为这是一个有趣的话题, 而且我希望你也能找到它的乐趣. 欢迎评论:)
2023年08月07日
20 阅读
0 评论
0 点赞
1
...
125
126
127
...
157