Deprecated
: htmlspecialchars(): Passing null to parameter #1 ($string) of type string is deprecated in
/www/wwwroot/testblog.58heshihu.com/var/Widget/Archive.php
on line
1057
首页
关于
Search
1
给你10个市场数据调研报告的免费下载网站!以后竞品数据就从这里找!
182 阅读
2
php接口优化 使用curl_multi_init批量请求
144 阅读
3
《从菜鸟到大师之路 ElasticSearch 篇》
107 阅读
4
2024年备考系统架构设计师
104 阅读
5
PHP 文件I/O
92 阅读
php
thinkphp
laravel
工具
开源
mysql
数据结构
总结
思维逻辑
令人感动的创富故事
读书笔记
前端
vue
js
css
书籍
开源之旅
架构
消息队列
docker
教程
代码片段
redis
服务器
nginx
linux
科普
java
c
ElasticSearch
测试
php进阶
php基础
登录
Search
标签搜索
php函数
php语法
性能优化
安全
错误和异常处理
问题
vue
Composer
Session
缓存
框架
Swoole
api
并发
异步
正则表达式
php-fpm
mysql 索引
开发规范
协程
dafenqi
累计撰写
786
篇文章
累计收到
28
条评论
首页
栏目
php
thinkphp
laravel
工具
开源
mysql
数据结构
总结
思维逻辑
令人感动的创富故事
读书笔记
前端
vue
js
css
书籍
开源之旅
架构
消息队列
docker
教程
代码片段
副业
redis
服务器
nginx
linux
科普
java
c
ElasticSearch
测试
php进阶
php基础
页面
关于
搜索到
5
篇与
的结果
2023-08-09
PHP 使用数组模拟 队列(Queue)
异步并发的服务器里经常使用队列实现生产者消费者模型,解决并发排队问题。 PHP的SPL标准库中提供了 SplQueue 扩展内置的队列数据结构。另外 PHP的数组也提供了array_pop和array_shift可以使用数组模拟队列数据结构 。SplQueue$queue = new SplQueue; //入队 $queue->push($data); //出队 $data = $queue->shift(); //查询队列中的排队数量 $n = count($queue); Array模拟队列 $queue = array(); //入队 $queue[] = $data; //出队 $data = array_shift($queue); //查询队列中的排队数量 $n = count($queue);性能对比虽然使用Array可以实现队列,但实际上性能会非常差。在一个大并发的服务器程序上,建议使用SplQueue作为队列数据结构。100万条数据随机入队、出队,使用SplQueue仅用2312.345ms即可完成,而使用Array模拟的队列的程序根本无法完成测试,CPU一直持续在100%。降低数据条数到1万条后(100倍),也需要260ms才能完成测试。SplQueue$splq = new SplQueue; for($i = 0; $i < 1000000; $i++) { $data = "hello $i\n"; $splq->push($data); if ($i % 100 == 99 and count($splq) > 100) { $popN = rand(10, 99); for ($j = 0; $j < $popN; $j++) { $splq->shift(); } } } $popN = count($splq); for ($j = 0; $j < $popN; $j++) { $splq->pop(); } Array队列 $arrq = array(); for($i = 0; $i <1000000; $i++) { $data = "hello $i\n"; $arrq[] = $data; if ($i % 100 == 99 and count($arrq) > 100) { $popN = rand(10, 99); for ($j = 0; $j < $popN; $j++) { array_shift($arrq); } } } $popN = count($arrq); for ($j = 0; $j < $popN; $j++) { array_shift($arrq); }
2023年08月09日
13 阅读
0 评论
0 点赞
2023-08-08
定时任务与队列
1.出队,入队。2异步tast,3.消息队列。4.Job | 任务 : 一个Job就是一个需要在后台完成的任务,比如本文举例的发送邮件,就可以抽象为一个Job。在Resque中一个Job就是一个Class。Queue | 队列 : 也就是上文的消息队列,在Resque中,队列则是由Redis实现的。Resque还提供了一个简单的队列管理器,可以实现将Job插入/取出队列等功能。Worker | 执行者 : 负责从队列中取出Job并执行,可以以守护进程的方式运行在后台。三 :队列总体设计1:需要队列程序,提供加入队列接口和取队列接口等2:需要存储队列,文件或者数据库3:需要定时程序取出队列并执行4:其它扩展功能:优先级,日志,定时等
2023年08月08日
18 阅读
0 评论
0 点赞
2023-08-08
php 进程通信系列 (二)消息队列
php 进程通信系列 (二)消息队列
2023年08月08日
19 阅读
0 评论
0 点赞
2023-08-08
PHP高级编程之消息队列原理与实现方法详解
PHP高级编程之消息队列原理与实现方法详解这篇文章主要介绍了PHP高级编程之消息队列原理与实现方法,结合实例形式详细分析了PHP消息队列相关概念、原理、使用场景及相关操作注意事项,需要的朋友可以参考下本文实例讲述了PHP高级编程之消息队列原理与实现方法。分享给大家供大家参考,具体如下:什么是消息队列消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式为什么使用消息队列消息队列技术是分布式应用间交换信息的一种技术。消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读出。通过消息队列,应用程序可独立地执行,它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。什么场合使用消息队列你首先需要弄清楚,消息队列与远程过程调用的区别,在很多读者咨询我的时候,我发现他们需要的是RPC(远程过程调用),而不是消息队列。消息队列有同步或异步实现方式,通常我们采用异步方式使用消息队列,远程过程调用多采用同步方式。MQ与RPC有什么不同? MQ通常传递无规则协议,这个协议由用户定义并且实现存储转发;而RPC通常是专用协议,调用过程返回结果。什么时候使用消息队列同步需求,远程过程调用(PRC)更适合你。异步需求,消息队列更适合你。目前很多消息队列软件同时支持RPC功能,很多RPC系统也能异步调用。消息队列用来实现下列需求① 存储转发② 分布式事务③ 发布订阅④ 基于内容的路由⑤ 点对点连接谁负责处理消息队列通常的做法,如果小的项目团队可以有一个人实现,包括消息的推送,接收处理。如果大型团队,通常是定义好消息协议,然后各自开发各自的部分,例如一个团队负责写推送协议部分,另一个团队负责写接收与处理部分。那么为什么我们不讲消息队列框架化呢?框架化有几个好处:① 开发者不用学习消息队列接口② 开发者不需要关心消息推送与接收③ 开发者通过统一的API推送消息④ 开发者的重点是实现业务逻辑功能怎么实现消息队列框架下面是作者开发的一个SOA框架,该框架提供了三种接口,分别是SOAP,RESTful,AMQP(RabbitMQ),理解了该框架思想,你很容易进一步扩展,例如增加XML-RPC, ZeroMQ等等支持。https://github.com/netkiller/SOA本文只讲消息队列框架部分。6.1. 守护进程消息队列框架是本地应用程序(命令行程序),我们为了让他在后台运行,需要实现守护进程。https://github.com/netkiller/SOA/blob/master/bin/rabbitmq.php每个实例处理一组队列,实例化需要提供三个参数,$queueName = '队列名', $exchangeName = '交换名', $routeKey = '路由'1$daemon = new \framework\RabbitDaemon($queueName = 'email', $exchangeName = 'email', $routeKey = 'email');守护进程需要使用root用户运行,运行后会切换到普通用户,同时创建进程ID文件,以便进程停止的时候使用。守护进程核心代码https://github.com/netkiller/SOA/blob/master/system/rabbitdaemon.class.php6.2. 消息队列协议消息协议是一个数组,将数组序列化或者转为JSON推送到消息队列服务器,这里使用json格式的协议。12345678$msg = array('Namespace'=>'namespace', "Class"=>"Email", "Method"=>"smtp", "Param" => array( $mail, $subject, $message, null ));序列化后的协议1{"Namespace":"single","Class":"Email","Method":"smtp","Param":["netkiller@msn.com","Hello"," TestHelloWorld",null]}使用json格式是考虑到通用性,这样推送端可以使用任何语言。如果不考虑兼容,建议使用二进制序列化,例如msgpack效率更好。6.3. 消息队列处理消息队列处理核心代码https://github.com/netkiller/SOA/blob/master/system/rabbitmq.class.php所以消息的处理在下面一段代码中进行12345678910$this->queue->consume(function($envelope, $queue) {$speed = microtime(true); $msg = $envelope->getBody(); $result = $this->loader($msg); $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 //$this->logging->info(''.$msg.' '.$result) $this->logging->debug('Protocol: '.$msg.' '); $this->logging->debug('Result: '. $result.' '); $this->logging->debug('Time: '. (microtime(true) - $speed) .'');});public function loader($msg = null) 负责拆解协议,然后载入对应的类文件,传递参数,运行方法,反馈结果。Time 可以输出程序运行所花费的时间,对于后期优化十分有用。提示loader() 可以进一步优化,使用多线程每次调用loader将任务提交到线程池中,这样便可以多线程处理消息队列。6.4. 测试测试代码 https://github.com/netkiller/SOA/blob/master/test/queue/email.php123456789101112131415161718192021222324252627282930313233<?php$queueName = 'example';$exchangeName = 'email';$routeKey = 'email';$mail = $argv[1];$subject = $argv[2];$message = empty($argv[3]) ? 'Hello World!' : ' '.$argv[3];$connection = new AMQPConnection(array('host' => '192.168.4.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest' ));$connection->connect() or die("Cannot connect to the broker!\n");$channel = new AMQPChannel($connection);$exchange = new AMQPExchange($channel);$exchange->setName($exchangeName);$queue = new AMQPQueue($channel);$queue->setName($queueName);$queue->setFlags(AMQP_DURABLE);$queue->declareQueue();$msg = array('Namespace'=>'namespace', "Class"=>"Email", "Method"=>"smtp", "Param" => array( $mail, $subject, $message, null ));$exchange->publish(json_encode($msg), $routeKey);printf("[x] Sent %s \r\n", json_encode($msg));$connection->disconnect();这里只给出了少量测试与演示程序,如有疑问请到渎者群,或者公众号询问。多线程上面消息队列 核心代码如下12345$this->queue->consume(function($envelope, $queue) {$msg = $envelope->getBody(); $result = $this->loader($msg); $queue->ack($envelope->getDeliveryTag());});这段代码生产环境使用了半年,发现效率比较低。有些业务场入队非常快,但处理起来所花的时间就比较长,容易出现队列堆积现象。增加多线程可能更有效利用硬件资源,提高业务处理能力。代码如下123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117<?phpnamespace framework;require_once( __DIR__.'/autoload.class.php' );class RabbitThread extends \Threaded {private $queue; public $classspath; protected $msg; public function __construct($queue, $logging, $msg) { $this->classspath = __DIR__.'/../queue'; $this->msg = $msg; $this->logging = $logging; $this->queue = $queue; } public function run() { $speed = microtime(true); $result = $this->loader($this->msg); $this->logging->debug('Result: '. $result.' '); $this->logging->debug('Time: '. (microtime(true) - $speed) .''); } // private public function loader($msg = null){ $protocol = json_decode($msg,true); $namespace = $protocol['Namespace']; $class = $protocol['Class']; $method = $protocol['Method']; $param = $protocol['Param']; $result = null; $classspath = $this->classspath.'/'.$this->queue.'/'.$namespace.'/'.strtolower($class) . '.class.php'; if( is_file($classspath) ){ require_once($classspath); //$class = ucfirst(substr($request_uri, strrpos($request_uri, '/')+1)); if (class_exists($class)) { if(method_exists($class, $method)){ $obj = new $class; if (!$param){ $tmp = $obj->$method(); $result = json_encode($tmp); $this->logging->info($class.'->'.$method.'()'); }else{ $tmp = call_user_func_array(array($obj, $method), $param); $result = (json_encode($tmp)); $this->logging->info($class.'->'.$method.'("'.implode('","', $param).'")'); } }else{ $this->logging->error('Object '. $class. '->' . $method. ' is not exist.'); } }else{ $msg = sprintf("Object is not exist. (%s)", $class); $this->logging->error($msg); } }else{ $msg = sprintf("Cannot loading interface! (%s)", $classspath); $this->logging->error($msg); } return $result; }}class RabbitMQ {const loop = 10; protected $queue; protected $pool; public function __construct($queueName = '', $exchangeName = '', $routeKey = '') { $this->config = new \framework\Config('rabbitmq.ini'); $this->logfile = __DIR__.'/../log/rabbitmq.%s.log'; $this->logqueue = __DIR__.'/../log/queue.%s.log'; $this->logging = new \framework\log\Logging($this->logfile, $debug=true);//.H:i:s $this->queueName = $queueName; $this->exchangeName = $exchangeName; $this->routeKey = $routeKey; $this->pool = new \Pool($this->config->get('pool')['thread']); } public function main(){ $connection = new \AMQPConnection($this->config->get('rabbitmq')); try { $connection->connect(); if (!$connection->isConnected()) { $this->logging->exception("Cannot connect to the broker!".PHP_EOL); } $this->channel = new \AMQPChannel($connection); $this->exchange = new \AMQPExchange($this->channel); $this->exchange->setName($this->exchangeName); $this->exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $this->exchange->setFlags(AMQP_DURABLE); //持久�? $this->exchange->declareExchange(); $this->queue = new \AMQPQueue($this->channel); $this->queue->setName($this->queueName); $this->queue->setFlags(AMQP_DURABLE); //持久�? $this->queue->declareQueue(); $this->queue->bind($this->exchangeName, $this->routeKey); $this->queue->consume(function($envelope, $queue) { $msg = $envelope->getBody(); $this->logging->debug('Protocol: '.$msg.' '); //$result = $this->loader($msg); $this->pool->submit(new RabbitThread($this->queueName,new \framework\log\Logging($this->logqueue, $debug=true), $msg)); $queue->ack($envelope->getDeliveryTag()); }); $this->channel->qos(0,1); } catch(\AMQPConnectionException $e){ $this->logging->exception($e->__toString()); } catch(\Exception $e){ $this->logging->exception($e->__toString()); $connection->disconnect(); $this->pool->shutdown(); } } private function fault($tag, $msg){ $this->logging->exception($msg); throw new \Exception($tag.': '.$msg); } public function __destruct() { }}
2023年08月08日
11 阅读
0 评论
0 点赞
2023-08-08
PHP实现队列及队列原理
队列是一种线性表,按照先进先出的原则进行的:PHP实现队列:第一个元素作为队头,最后一个元素作为队尾<?php /** * 队列就是这么简单 * * @link */ $array = array('PHP', 'JAVA'); array_push($array, 'PYTHON'); //入队列 array_shift($array); //出队列什么是双端队列(或双向队列)Deque,全名double-ended queue?即元素可以在队列的任意一段入队或出队,如果我们把这些方法叫做insertLeft()和insertRight(),以及removeLeft()和removeRight()。如果严格禁止调用insertLeft()和removeLeft()方法(或禁用右段的操作),双端队列功能就和栈一样。禁止调用insertLeft()和removeRight()(或相反的另一对方法),它的功能就和队列一样了。双端队列与栈或队列相比,是一种多用途的数据结构。PHP实现双端队列<?php class Deque { public $queue = array(); /**(尾部)入队 **/ public function addLast($value) { return array_push($this->queue,$value); } /**(尾部)出队**/ public function removeLast() { return array_pop($this->queue); } /**(头部)入队**/ public function addFirst($value) { return array_unshift($this->queue,$value); } /**(头部)出队**/ public function removeFirst() { return array_shift($this->queue); } /**清空队列**/ public function makeEmpty() { unset($this->queue); } /**获取列头**/ public function getFirst() { return reset($this->queue); } /** 获取列尾 **/ public function getLast() { return end($this->queue); } /** 获取长度 **/ public function getLength() { return count($this->queue); } }队列的用途:队列可以很好地异步处理数据传送和存储,当你频繁地向数据库中插入数据、频繁地向搜索引擎提交数据,就可采取队列来异步插入。另外,还可以将较慢的处理逻辑、有并发数量限制的处理逻辑,通过消息队列放在后台处理,例如FLV视频转换、发送手机短信、发送电子邮件等。
2023年08月08日
17 阅读
0 评论
0 点赞