PHP消息队列RabbitMQ介绍与示例

dafenqi
2023-12-29 / 0 评论 / 8 阅读 / 正在检测是否收录...

PHP消息队列RabbitMQ介绍与示例

消息队列是一种异步通讯协议,它可以在分布式架构中实现解耦和异步通信。在Web开发中,消息队列通常用于处理异步任务、负载均衡和事件驱动的应用程序设计。在本文中,我们将介绍如何使用PHP和RabbitMQ实现消息队列的开发及其详细代码。

RabbitMQ是一个消息队列中间件,它在消息处理过程中具有高度的可靠性和可扩展性。在RabbitMQ中,消息生产者将消息发布到交换机,然后消息被路由到相应的队列中。消费者从队列中读取消息并处理它们。下面是使用PHP和RabbitMQ实现消息队列的详细步骤及代码。

环境要求

在开始之前,请确保您已经安装并配置了以下环境。

  1. PHP 5.4或以上版本。
  2. RabbitMQ服务。
  3. AMQP扩展。

RabbitMQ的安装和配置

在安装RabbitMQ之前,您需要确保您的服务器上已安装Erlang。要同时安装Erlang和RabbitMQ,请按照以下步骤进行操作。

在CentOS中,使用以下命令安装Erlang。

sudo yum install erlang
在Ubuntu中,使用以下命令安装Erlang。

sudo apt-get install erlang
使用以下命令下载RabbitMQ。

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.7/rabbitmq-server-generic-unix-3.7.7.tar.xz
将解压后的文件移动到/usr/local/rabbitmq目录下。

sudo mv rabbitmq_server-3.7.7/ /usr/local/rabbitmq/
设置环境变量并启动RabbitMQ服务。

export PATH=/usr/local/rabbitmq/sbin:$PATH
rabbitmq-server -detached
检查RabbitMQ是否正在运行。

sudo rabbitmqctl status
应该看到如下输出。

{mem_limit,268435456}
{disk_free_limit,50000000}
{pid,6062}
{running_applications,[{rabbit,"RabbitMQ","3.7.7"},{os_mon,"CPO CXC 138 46","2.4"},{sasl,"SASL CXC 138 11","3.2"},{stdlib,"ERTS CXC 138 10","2.8"},{kernel,"ERTS CXC 138 10","5.4"}]}
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]}
{vm_memory_high_watermark,0.4}
{vm_memory_limit,1073741824}
{hostname,"localhost"}
{memory,{total,37862280},{connection_readers,0},{connection_writers,0},{connection_channels,0},{connection_other,0},{queue_procs,0},{queue_slave_procs,0},{plugins,0},{other_proc,7288784},{mnesia,0},{mgmt_db,0},{msg_index,0},{other_ets,7431024}}
{alarms,[{rabbitmq_management,"management plugin status is ok",ok}]}
{vm_args,[{extra_applications,[kernel,stdlib]},{kernel,[{inet_default_connect_options,[{nodelay,true}]},{start_purge_timer,false},{inet_dist_listen_min,25672},{inet_dist_listen_max,25672}]},{rabbit,[{loopback_users,[<<"guest">>]}]}]}
RabbitMQ的配置文件可以在/etc/rabbitmq目录中找到。

生产者代码

创建一个名为“producer.php”的文件并添加以下内容。

<?php
require_once DIR . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

$message = new AMQPMessage('Hello World!');
$channel->basic_publish($message, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();
将以上代码保存为“producer.php”文件。

运行以下命令以运行生产者脚本。

php producer.php
您应该会看到以下输出。

[x] Sent 'Hello World!'
消费者代码

创建一个名为“consumer.php”的文件并添加以下内容。

<?php
require_once DIR . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
$channel->wait();
}

$channel->close();
$connection->close();

将以上代码保存为“consumer.php”文件。

运行以下命令以运行消费者脚本。

php consumer.php
您应该会看到以下输出。

[*] Waiting for messages. To exit press CTRL+C
发生在“生产者”代码的情况是它将消息发送到名为“hello”的队列中。在这个过程中发生的第一件事是我们建立了一个到RabbitMQ服务器的连接。然后我们声明了一个名为“hello”的队列。接下来创建了一个包含字符串“Hello World!”的AMQPMessage。最后我们通过调用basic_publish()函数来发布消息。

"生产者"将向RabbitMQ服务器发送消息,但如果没有“消费者”来读取消息,那么它将保持在队列中。这是因为RabbitMQ采用了一种类似于“推”的方式来传递消息,而不是像HTTP协议那样的“拉”方式。这就是我们需要也实现一个“消费者”的原因。我们创建了一个名为“hello”的队列,并等待“生产者”发送消息。最后我们使用basic_consume()方法订阅了这个队列,每当消息到达时调用$callback()函数来处理它。

总结

在本文中,我们讨论了如何使用PHP和RabbitMQ开发消息队列。我们先介绍了RabbitMQ的基本概念,然后介绍了如何为生产者和消费者编写代码。在我们的示例代码中,我们实现了一个简单的“Hello World!”消息队列,但是使用相同的原则可以扩展到更复杂的应用程序。

0

Deprecated: strtolower(): Passing null to parameter #1 ($string) of type string is deprecated in /www/wwwroot/testblog.58heshihu.com/var/Widget/Archive.php on line 1032

评论 (0)

取消