这些消息队列都有各自的优点和适用场景,需要根据具体的业务需求选择合适的消息队列系统。
使用了PhpAmqpLib库,该库提供了在PHP中操作RabbitMQ消息队列的API。首先,我们创建了连接对象,然后使用
c
o
n
n
e
c
t
i
o
n
?
>
c
h
a
n
n
e
l
(
)
方法创建了一个通道。在通道上使用
connection->channel()方法创建了一个通道。在通道上使用
connection?>channel()方法创建了一个通道。在通道上使用channel->queue_declare()方法声明了队列,如果队列不存在则会被创建。接下来,我们使用AMQPMessage类创建了一个消息对象,将消息内容传递给它。最后,使用$channel->basic_publish()方法将消息发送到队列中。
当我们运行上述代码时,消息将被发送到队列中,并输出“[x] Sent Hello World!”。我们可以使用相同的连接对象和通道对象来接收消息。
发送消息:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
接收消息:
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
发送消息:
require_once __DIR__ . '/vendor/autoload.php';
use RdKafka\Conf;
use RdKafka\Producer;
use RdKafka\ProducerTopic;
$conf = new Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$producer = new Producer($conf);
$topic = $producer->newTopic('test');
$topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Hello, Kafka!');
$producer->poll(0);
$producer->flush(10000);
echo 'Message sent to Kafka' . PHP_EOL;
接收消息:
require_once __DIR__ . '/vendor/autoload.php';
use RdKafka\Conf;
use RdKafka\Consumer;
use RdKafka\ConsumerTopic;
use RdKafka\Message;
$conf = new Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$consumer = new Consumer($conf);
$consumer->subscribe(['test']);
echo "Waiting for messages...\n";
while (true) {
$message = $consumer->consume(120 * 1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo "Received message: " . $message->payload . PHP_EOL;
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
连接到Redis
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
设置和获取值
$redis->set('key', 'value');
$value = $redis->get('key');
存储和获取哈希表
$redis->hSet('hash', 'field1', 'value1');
$value = $redis->hGet('hash', 'field1');
存储和获取列表
$redis->lPush('list', 'value1');
$value = $redis->rPop('list');
存储和获取集合
$redis->sAdd('set', 'value1');
$value = $redis->sMembers('set');
存储和获取有序集合
$redis->zAdd('sorted_set', 1, 'value1');
$value = $redis->zRange('sorted_set', 0, -1);
订阅和发布消息
$redis->subscribe(['channel'], function($redis, $channel, $message) {
echo "Received message $message from channel $channel\n";
});
$redis->publish('channel', 'message');
这些示例只是Redis的基本操作,还有其他更高级的功能,如事务、管道和Lua脚本。
下面是 PHP 操作 Beanstalkd 的示例:
安装 beanstalkd-php 库
composer require pda/pheanstalk
生产者示例
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1'); // 连接 Beanstalkd 服务器
$jobData = ['id' => 1, 'name' => 'test']; // 定义要发送的消息
// 将消息加入队列
$pheanstalk->useTube('test-tube')->put(json_encode($jobData));
echo 'Job has been created successfully.' . PHP_EOL;
消费者示例
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1'); // 连接 Beanstalkd 服务器
$pheanstalk->watch('test-tube'); // 监听队列
while ($job = $pheanstalk->reserve()) { // 循环获取队列中的消息
$data = json_decode($job->getData(), true); // 解析消息
echo sprintf("Job id: %d, name: %s" . PHP_EOL, $data['id'], $data['name']);
$pheanstalk->delete($job); // 处理完消息后将其从队列中删除
}
以上示例演示了如何使用 beanstalkd-php 库连接 Beanstalkd 服务器,并在生产者中将消息加入队列,在消费者中循环获取队列中的消息,并在处理完消息后将其从队列中删除。
Regenerate response
操作ActiveMQ需要使用ActiveMQ的PHP客户端库,官方提供了一个叫做Stomp的PHP客户端库。下面是使用Stomp客户端库操作ActiveMQ的示例代码:
安装依赖库
使用 PHP 操作 ActiveMQ,需要安装 stomp 扩展库。可以通过以下命令安装:
pecl install stomp
连接 ActiveMQ
$stomp = new Stomp('tcp://localhost:61613');
$stomp->connect('username', 'password');
发送消息
$stomp->send('/queue/test', 'Hello, ActiveMQ!');
订阅消息
$stomp->subscribe('/queue/test');
while ($frame = $stomp->read()) {
echo $frame->body . PHP_EOL;
$stomp->ack($frame);
}
这里订阅 /queue/test 队列,当队列中有消息时,会打印消息内容并确认消息已经被消费。
断开连接
$stomp->disconnect();
这里是 PHP 操作 ActiveMQ 的一个简单示例,可以根据自己的需求进行修改。