[每周一更]-(第37期):PHP常见的操作消息队列

发布时间:2023年12月22日

在这里插入图片描述

在 PHP 中,常见的消息队列包括:

  • RabbitMQ:RabbitMQ 是一个功能强大的开源消息队列系统,被广泛应用于分布式系统的消息传递和异步处理。在 PHP 中,可以使用 AMQP 扩展来连接和操作 RabbitMQ。使用 RabbitMQ,可以实现任务分发、异步处理、发布/订阅等场景。
  • Kafka:Kafka 是另一个流行的消息队列系统,也被广泛应用于分布式系统的消息传递和异步处理。在 PHP 中,可以使用 rdkafka 扩展来连接和操作 Kafka。使用 Kafka,可以实现大规模数据处理、实时数据流处理等场景。
  • Redis:Redis 也可以作为一个消息队列使用,通过 Redis 的 Pub/Sub 功能实现消息的发布和订阅。在 PHP 中,可以使用 Predis 或 PhpRedis 扩展来连接和操作 Redis。使用 Redis,可以实现简单的消息发布/订阅场景、实时聊天应用等。
  • Beanstalkd:Beanstalkd 是一个轻量级的消息队列系统,也被广泛应用于异步任务处理等场景。在 PHP 中,可以使用 Pheanstalk 扩展来连接和操作 Beanstalkd。使用 Beanstalkd,可以实现任务队列、异步任务处理、延时任务等场景。
  • ActiveMQ:ActiveMQ 是一个基于 JMS(Java 消息服务)规范的消息队列系统,但同时也支持多种协议和语言。在 PHP 中,可以使用 Stomp 扩展来连接和操作 ActiveMQ。使用 ActiveMQ,可以实现分布式系统的消息传递、异步处理等场景。

这些消息队列都有各自的优点和适用场景,需要根据具体的业务需求选择合适的消息队列系统。

以下是PHP消息队列的一些使用场景:

  • 订单处理:当用户下单后,可以把订单信息放入消息队列中,然后异步地进行订单处理。这样可以减轻主服务器的负载,提高系统的性能和可靠性。
  • 日志处理:将网站的访问日志放入消息队列中,异步地进行日志处理和分析。这样可以降低日志处理对网站访问的影响,并且可以提高日志处理的效率。
  • 实时通知:将需要实时通知的消息放入消息队列中,例如用户注册、订单支付等。然后通过订阅者来接收消息,并及时发送通知。
  • 异步任务处理:将需要异步处理的任务放入消息队列中,例如生成缩略图、发送邮件等。这样可以降低主服务器的负载,并提高任务处理的效率。

以下是几个 PHP 消息队列的常见操作示例:

1、 PHP 操作RabbitMQ 的示例

使用了PhpAmqpLib库,该库提供了在PHP中操作RabbitMQ消息队列的API。首先,我们创建了连接对象,然后使用 c o n n e c t i o n ? > c h a n n e l ( ) 方法创建了一个通道。在通道上使用 connection->channel()方法创建了一个通道。在通道上使用 connection?>channel()方法创建了一个通道。在通道上使用channel->queue_declare()方法声明了队列,如果队列不存在则会被创建。接下来,我们使用AMQPMessage类创建了一个消息对象,将消息内容传递给它。最后,使用$channel->basic_publish()方法将消息发送到队列中。
当我们运行上述代码时,消息将被发送到队列中,并输出“[x] Sent Hello World!”。我们可以使用相同的连接对象和通道对象来接收消息。

发送消息:
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();

接收消息:
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

2、PHP 操作Kafka 的示例

发送消息:
require_once __DIR__ . '/vendor/autoload.php';

use RdKafka\Conf;
use RdKafka\Producer;
use RdKafka\ProducerTopic;

$conf = new Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$producer = new Producer($conf);

$topic = $producer->newTopic('test');
$topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Hello, Kafka!');

$producer->poll(0);
$producer->flush(10000);

echo 'Message sent to Kafka' . PHP_EOL;

接收消息:
require_once __DIR__ . '/vendor/autoload.php';

use RdKafka\Conf;
use RdKafka\Consumer;
use RdKafka\ConsumerTopic;
use RdKafka\Message;

$conf = new Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$consumer = new Consumer($conf);
$consumer->subscribe(['test']);

echo "Waiting for messages...\n";

while (true) {
	$message = $consumer->consume(120 * 1000);

	switch ($message->err) {
		case RD_KAFKA_RESP_ERR_NO_ERROR:
			echo "Received message: " . $message->payload . PHP_EOL;
			break;
		case RD_KAFKA_RESP_ERR__PARTITION_EOF:
			echo "No more messages; will wait for more\n";
			break;
		case RD_KAFKA_RESP_ERR__TIMED_OUT:
			echo "Timed out\n";
			break;
		default:
			throw new \Exception($message->errstr(), $message->err);
			break;
	}
}

3、PHP 操作Redis 的示例

连接到Redis
$redis = new Redis();

$redis->connect('127.0.0.1', 6379);

设置和获取值
$redis->set('key', 'value');
$value = $redis->get('key');

存储和获取哈希表
$redis->hSet('hash', 'field1', 'value1');
$value = $redis->hGet('hash', 'field1');

存储和获取列表
$redis->lPush('list', 'value1');
$value = $redis->rPop('list');

存储和获取集合
$redis->sAdd('set', 'value1');
$value = $redis->sMembers('set');

存储和获取有序集合
$redis->zAdd('sorted_set', 1, 'value1');
$value = $redis->zRange('sorted_set', 0, -1);

订阅和发布消息
$redis->subscribe(['channel'], function($redis, $channel, $message) {
echo "Received message $message from channel $channel\n";
});

$redis->publish('channel', 'message');

这些示例只是Redis的基本操作,还有其他更高级的功能,如事务、管道和Lua脚本。

4、PHP 操作Beanstalkd 的示例

下面是 PHP 操作 Beanstalkd 的示例:
安装 beanstalkd-php 库
composer require pda/pheanstalk

生产者示例
<?php
require_once __DIR__ . '/vendor/autoload.php';

use Pheanstalk\Pheanstalk;

$pheanstalk = new Pheanstalk('127.0.0.1'); // 连接 Beanstalkd 服务器
$jobData = ['id' => 1, 'name' => 'test']; // 定义要发送的消息

// 将消息加入队列
$pheanstalk->useTube('test-tube')->put(json_encode($jobData));

echo 'Job has been created successfully.' . PHP_EOL;


消费者示例
<?php
require_once __DIR__ . '/vendor/autoload.php';

use Pheanstalk\Pheanstalk;

$pheanstalk = new Pheanstalk('127.0.0.1'); // 连接 Beanstalkd 服务器
$pheanstalk->watch('test-tube'); // 监听队列

while ($job = $pheanstalk->reserve()) { // 循环获取队列中的消息
    $data = json_decode($job->getData(), true); // 解析消息
    echo sprintf("Job id: %d, name: %s" . PHP_EOL, $data['id'], $data['name']);
    $pheanstalk->delete($job); // 处理完消息后将其从队列中删除
}

以上示例演示了如何使用 beanstalkd-php 库连接 Beanstalkd 服务器,并在生产者中将消息加入队列,在消费者中循环获取队列中的消息,并在处理完消息后将其从队列中删除。
Regenerate response

5、PHP 操作ActiveMQ 的示例

操作ActiveMQ需要使用ActiveMQ的PHP客户端库,官方提供了一个叫做Stomp的PHP客户端库。下面是使用Stomp客户端库操作ActiveMQ的示例代码:

安装依赖库
使用 PHP 操作 ActiveMQ,需要安装 stomp 扩展库。可以通过以下命令安装:
pecl install stomp

连接 ActiveMQ
$stomp = new Stomp('tcp://localhost:61613');
$stomp->connect('username', 'password');

发送消息
$stomp->send('/queue/test', 'Hello, ActiveMQ!');

订阅消息
$stomp->subscribe('/queue/test');
while ($frame = $stomp->read()) {
    echo $frame->body . PHP_EOL;
    $stomp->ack($frame);
}

这里订阅 /queue/test 队列,当队列中有消息时,会打印消息内容并确认消息已经被消费。
断开连接
$stomp->disconnect();

这里是 PHP 操作 ActiveMQ 的一个简单示例,可以根据自己的需求进行修改。

文章来源:https://blog.csdn.net/hmx224_2014/article/details/135154805
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。