package main
import (
"fmt"
"log"
"sync"
"time"
"github.com/streadway/amqp"
)
func declareQueue(ch *amqp.Channel, queueName string) error {
_, err := ch.QueueDeclare(
queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
return err
}
func producer(ch *amqp.Channel, wg *sync.WaitGroup, queueName string) {
defer wg.Done()
for i := 1; i <= 5; i++ {
message := fmt.Sprintf("Message %d", i)
err := ch.Publish(
"", // exchange
queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
if err != nil {
log.Fatalf("Failed to publish a message: %v", err)
}
fmt.Printf("Sent: %s\n", message)
time.Sleep(time.Second)
}
}
func consumer(ch *amqp.Channel, wg *sync.WaitGroup, queueName string) {
defer wg.Done()
msgs, err := ch.Consume(
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
for msg := range msgs {
fmt.Printf("Received: %s\n", msg.Body)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
queueName := "your_queue_name"
// Declare the queue before using it
err = declareQueue(ch, queueName)
if err != nil {
log.Fatalf("Failed to declare queue: %v", err)
}
var wg sync.WaitGroup
wg.Add(2)
go producer(ch, &wg, queueName)
go consumer(ch, &wg, queueName)
wg.Wait()
}
运行结果
Sent: Message 1
Received: Message 1
Sent: Message 2
Received: Message 2
Sent: Message 3
Received: Message 3
Sent: Message 4
Received: Message 4
Sent: Message 5
Received: Message 5