对于遇到速率限制的应用程序来说也是一个挑战,因为它需要“放慢速度”或暂停。这是一个典型的场景:
为了在速率限制内有效运行,应用程序通常采用以下策略:
由于队列能够系统地处理任务,因此可以作为出色的“助手”或工具来帮助服务管理速率限制。然而,虽然它提供了显着的好处,但它并不是用于此目的的独立解决方案。
在构建健壮的架构时,用于与受速率限制的外部 API 交互的服务或应用程序通常会异步处理任务。该服务通常由从队列派生的任务启动。当服务遇到速率限制时,它可以轻松地将作业返回到主队列或将其分配到指定用于延迟任务的单独队列,并在特定的等待时间(例如 X 秒)后重新访问它。
这种对队列系统的依赖是非常有利的,主要是因为它的临时性质和排序。然而,仅靠队列并不能完全解决速率限制问题;它需要额外的功能或服务本身的帮助才能有效地处理这些限制。
const amqp = require('amqplib');
const axios = require('axios');
// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {
try {
const response = await axios.get(url);
console.log('API Response:', response.data);
} catch (error) {
console.error('API Error:', error.message);
}
}
// Connect to RabbitMQ server
async function connect() {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'rateLimitedQueue';
channel.assertQueue(queue, { durable: true });
// Consume messages from the queue
channel.consume(queue, async msg => {
const { url, delayInSeconds } = JSON.parse(msg.content.toString());
// Simulating rate limitation
await new Promise(resolve => setTimeout(resolve, delayInSeconds * 1000));
await makeAPICall(url); // Make the API call
channel.ack(msg); // Acknowledge message processing completion
});
} catch (error) {
console.error('RabbitMQ Connection Error:', error.message);
}
}
// Function to send a message to the queue
async function addToQueue(url, delayInSeconds) {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'rateLimitedQueue';
channel.assertQueue(queue, { durable: true });
const message = JSON.stringify({ url, delayInSeconds });
channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
console.log('Task added to the queue');
} catch (error) {
console.error('RabbitMQ Error:', error.message);
}
}
// Usage example
addToQueue('https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds
// Start the consumer
connect();
const { Kafka } = require('kafkajs');
const axios = require('axios');
// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {
try {
const response = await axios.get(url);
console.log('API Response:', response.data);
} catch (error) {
console.error('API Error:', error.message);
}
}
// Kafka configuration
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // Replace with your Kafka broker address
});
// Create a Kafka producer
const producer = kafka.producer();
// Connect to Kafka and send messages
async function produceToKafka(topic, message) {
await producer.connect();
await producer.send({
topic,
messages: [{ value: message }],
});
await producer.disconnect();
}
// Create a Kafka consumer
const consumer = kafka.consumer({ groupId: 'my-group' });
// Consume messages from Kafka topic
async function consumeFromKafka(topic) {
await consumer.connect();
await consumer.subscribe({ topic });
await consumer.run({
eachMessage: async ({ message }) => {
const { url, delayInSeconds } = JSON.parse(message.value.toString());
// Simulating rate limitation
await new Promise(resolve => setTimeout(resolve, delayInSeconds * 1000));
await makeAPICall(url); // Make the API call
},
});
}
// Usage example - Sending messages to Kafka topic
async function addToKafka(topic, url, delayInSeconds) {
const message = JSON.stringify({ url, delayInSeconds });
await produceToKafka(topic, message);
console.log('Message added to Kafka topic');
}
// Start consuming messages from Kafka topic
const kafkaTopic = 'rateLimitedTopic';
consumeFromKafka(kafkaTopic);
// Usage example - Adding messages to Kafka topic
addToKafka('rateLimitedTopic', 'https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds
这两种方法都是合法的,但它们需要您的服务包含“睡眠”机制。
借助 Memphis,您可以使用专门为此目的而设计的称为“延迟消息”的简单功能,将延迟从客户端转移到队列。当您的消费者应用程序需要额外的处理时间时,延迟消息允许您将收到的消息发送回代理。
孟菲斯实施的独特之处在于消费者能够独立且原子地控制这种延迟。
在站内,未消费消息的计数不会影响延迟消息的消费。例如,如果需要 60 秒的延迟,它会精确配置该特定消息的不可见时间。
maxMsgDeliveries
尚未达到其限制,消费者将激活message.delay(delayInMilliseconds)
,绕过消息。代理不会立即重新处理同一消息,而是将其保留指定的持续时间。delayInMilliseconds
通过,代理将停止主要消息流并将延迟的消息重新引入循环。const { memphis } = require('memphis-dev');
// Function to make API requests, simulating rate limitations
async function makeAPICall(message)
{
try {
const response = await axios.get(message.getDataAsJson()['url']);
console.log('API Response:', response.data);
message.ack();
} catch (error) {
console.error('API Error:', error.message);
console.log("Delaying message for 1 minute");
message.delay(60000);
}
}
(async function () {
let memphisConnection;
try {
memphisConnection = await memphis.connect({
host: '<broker-hostname>',
username: '<application-type username>',
password: '<password>'
});
const consumer = await memphisConnection.consumer({
stationName: '<station-name>',
consumerName: '<consumer-name>',
consumerGroup: ''
});
consumer.setContext({ key: "value" });
consumer.on('message', (message, context) => {
await makeAPICall(url, message);
});
consumer.on('error', (error) => { });
} catch (ex) {
console.log(ex);
if (memphisConnection) memphisConnection.close();
}
})();
了解并遵守速率限制对于使用 API 的应用程序开发人员至关重要。它涉及管理请求频率、达到限制时处理错误、实施退避策略以防止 API 服务器过载以及利用 API 提供的速率限制信息来优化应用程序性能,现在您也知道如何使用队列来做到这一点!
作者:Idan?Asulin
更多技术干货请关注公号【云原生数据库】
squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。
irds.cn,多数据库管理平台(私有云)。