且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何在NodeJS 12 Lambda函数中使用async/await将读取的S3 JSON文件流式传输到postgreSQL?

更新时间:2022-11-07 07:58:50

TL; DR:

  • 使用异步迭代器从流管道的末尾拉出!
  • 请勿在您的任何流代码中使用异步功能!

详细信息:

关于async/await和流的生命奥秘的秘密似乎被包裹在Async Iterators中!

The secret to life's mystery regarding async/await and streams appears to be wrapped up in Async Iterators!

简而言之,我通过管道将一些流传输到一起,最后,我创建了一个异步迭代器以将内容从末尾拉出,以便可以异步调用db. ChunkStream对我唯一要做的就是最多排队1,000个调用db的队列,而不是针对每个项目.我是队列的新手,所以可能已经有了更好的方法.

In short, I piped some streams together and at the very end, I created an async iterator to pull stuff out of the end so that I could asynchronously call the db. The only thing ChunkStream does for me is to queue up to 1,000 to call the db with instead of for each item. I'm new to queues, so there may already be a better way of doing that.

// ...
const AWS = require('aws-sdk');
const s3 = new AWS.S3();
const JSONbigint = require('json-bigint');
JSON.parse = JSONbigint.parse; // Let there be proper bigint handling!
JSON.stringify = JSONbigint.stringify;
const stream = require('stream');
const JSONStream = require('JSONStream');

exports.handler = async (event, context) => {
    // ...
    let bucket, key;
    try {
        bucket = event.Records[0].s3.bucket.name;
        key = event.Records[0].s3.object.key;
        console.log(`Fetching S3 file: Bucket: ${bucket}, Key: ${key}`);
        const parser = JSONStream.parse('*'); // Converts file to JSON objects
        let chunkStream = new ChunkStream(1000); // Give the db a chunk of work instead of one item at a time
        let endStream = s3.getObject({ Bucket: bucket, Key: key }).createReadStream().pipe(parser).pipe(chunkStream);
        
        let totalProcessed = 0;
        async function processChunk(chunk) {
            let chunkString = JSON.stringify(chunk);
            console.log(`Upserting ${chunk.length} items (starting with index ${totalProcessed}) items to the db.`);
            await updateDb(chunkString, pool, 1000); // updateDb and pool are part of missing code
            totalProcessed += chunk.length;
        }
        
        // Async iterator
        for await (const batch of endStream) {
            // console.log(`Processing batch (${batch.length})`, batch);
            await processChunk(batch);
        }
    } catch (ex) {
        context.fail("stream S3 file failed");
        throw ex;
    }
};

class ChunkStream extends stream.Transform {
    constructor(maxItems, options = {}) {
        options.objectMode = true;
        super(options);
        this.maxItems = maxItems;
        this.batch = [];
    }
    _transform(item, enc, cb) {
        this.batch.push(item);
        if (this.batch.length >= this.maxItems) {
            // console.log(`ChunkStream: Chunk ready (${this.batch.length} items)`);
            this.push(this.batch);
            // console.log('_transform - Restarting the batch');
            this.batch = [];
        }
        cb();
    }
    _flush(cb) {
        // console.log(`ChunkStream: Flushing stream (${this.batch.length} items)`);
        if (this.batch.length > 0) {
            this.push(this.batch);
            this.batch = [];
        }
        cb();
    }
}