更新时间:2023-01-19 08:58:03
我遇到了类似的情况,我从 Kafka 消费消息,不得不限制消费,因为我的消费服务依赖于有自己的第三方 API约束.
I had a similar situation where I was consuming messages from Kafka and had to throttle the consumption because my consumer service was dependent on a third party API which had its own constraints.
我使用了 async/queue
以及一个名为 asyncTimedCargo
的 async/cargo
包装器来进行批处理.货物从 kafka-consumer 获取所有消息,并在达到大小限制 batch_config.batch_size
或超时 batch_config.batch_timeout
时将其发送到队列.async/queue
提供了 saturated
和 未饱和
回调,如果您的队列任务工作人员很忙,您可以使用它们来停止消耗.这将阻止货物填满,您的应用程序不会耗尽内存.消耗将在不饱和时恢复.
I used async/queue
along with a wrapper of async/cargo
called asyncTimedCargo
for batching purpose.
The cargo gets all the messages from the kafka-consumer and sends it to queue upon reaching a size limit batch_config.batch_size
or timeout batch_config.batch_timeout
.
async/queue
provides saturated
and unsaturated
callbacks which you can use to stop the consumption if your queue task workers are busy. This would stop the cargo from filling up and your app would not run out of memory. The consumption would resume upon unsaturation.
//cargo-service.js
module.exports = function(key){
return new asyncTimedCargo(function(tasks, callback) {
var length = tasks.length;
var postBody = [];
for(var i=0;i<length;i++){
var message ={};
var task = JSON.parse(tasks[i].value);
message = task;
postBody.push(message);
}
var postJson = {
"json": {"request":postBody}
};
sms_queue.push(postJson);
callback();
}, batch_config.batch_size, batch_config.batch_timeout)
};
//kafka-consumer.js
cargo = cargo-service()
consumer.on('message', function (message) {
if(message && message.value && utils.isValidJsonString(message.value)) {
var msgObject = JSON.parse(message.value);
cargo.push(message);
}
else {
logger.error('Invalid JSON Message');
}
});
// sms-queue.js
var sms_queue = queue(
retryable({
times: queue_config.num_retries,
errorFilter: function (err) {
logger.info("inside retry");
console.log(err);
if (err) {
return true;
}
else {
return false;
}
}
}, function (task, callback) {
// your worker task for queue
callback()
}), queue_config.queue_worker_threads);
sms_queue.saturated = function() {
consumer.pause();
logger.warn('Queue saturated Consumption paused: ' + sms_queue.running());
};
sms_queue.unsaturated = function() {
consumer.resume();
logger.info('Queue unsaturated Consumption resumed: ' + sms_queue.running());
};