且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

我可以限制 kafka 节点消费者的消费吗?

更新时间:2023-01-19 08:58:03

我遇到了类似的情况,我从 Kafka 消费消息,不得不限制消费,因为我的消费服务依赖于有自己的第三方 API约束.

I had a similar situation where I was consuming messages from Kafka and had to throttle the consumption because my consumer service was dependent on a third party API which had its own constraints.

我使用了 async/queue 以及一个名为 asyncTimedCargoasync/cargo 包装器来进行批处理.货物从 kafka-consumer 获取所有消息,并在达到大小限制 batch_config.batch_size 或超时 batch_config.batch_timeout 时将其发送到队列.async/queue 提供了 saturated未饱和 回调,如果您的队列任务工作人员很忙,您可以使用它们来停止消耗.这将阻止货物填满,您的应用程序不会耗尽内存.消耗将在不饱和时恢复.

I used async/queue along with a wrapper of async/cargo called asyncTimedCargo for batching purpose. The cargo gets all the messages from the kafka-consumer and sends it to queue upon reaching a size limit batch_config.batch_size or timeout batch_config.batch_timeout. async/queue provides saturated and unsaturated callbacks which you can use to stop the consumption if your queue task workers are busy. This would stop the cargo from filling up and your app would not run out of memory. The consumption would resume upon unsaturation.

//cargo-service.js
module.exports = function(key){
    return new asyncTimedCargo(function(tasks, callback) {
        var length = tasks.length;
        var postBody = [];
        for(var i=0;i<length;i++){
            var message ={};
            var task = JSON.parse(tasks[i].value);
            message = task;
            postBody.push(message);
        }
        var postJson = {
            "json": {"request":postBody}
        };
        sms_queue.push(postJson);
        callback();
    }, batch_config.batch_size, batch_config.batch_timeout)
};

//kafka-consumer.js
cargo = cargo-service()
consumer.on('message', function (message) {
    if(message && message.value && utils.isValidJsonString(message.value)) {
        var msgObject = JSON.parse(message.value);        
        cargo.push(message);
    }
    else {
        logger.error('Invalid JSON Message');
    }
});

// sms-queue.js
var sms_queue = queue(
retryable({
    times: queue_config.num_retries,
    errorFilter: function (err) {
        logger.info("inside retry");
        console.log(err);
        if (err) {
            return true;
        }
        else {
            return false;
        }
    }
}, function (task, callback) {
// your worker task for queue
  callback()
}), queue_config.queue_worker_threads);

sms_queue.saturated = function() {
    consumer.pause();
    logger.warn('Queue saturated Consumption paused: ' + sms_queue.running());
};
sms_queue.unsaturated = function() {
    consumer.resume();
    logger.info('Queue unsaturated Consumption resumed: ' + sms_queue.running());
};