且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

带节流的异步任务队列,支持多线程

更新时间:2021-12-14 22:15:44

所以我们将从一个更简单的问题的解决方案开始,即创建一个最多同时处理 N 个任务的队列,而不是限制到 N 个任务每秒启动,并以此为基础:

So we'll start out with a solution to a simpler problem, that of creating a queue that process up to N tasks concurrently, rather than throttling to N tasks started per second, and build on that:

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }
    public TaskQueue(int concurrentRequests)
    {
        semaphore = new SemaphoreSlim(concurrentRequests);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

我们还将使用以下辅助方法将 TaskCompletionSource 的结果与 `Task:

We'll also use the following helper methods to match the result of a TaskCompletionSource to a `Task:

public static void Match<T>(this TaskCompletionSource<T> tcs, Task<T> task)
{
    task.ContinueWith(t =>
    {
        switch (t.Status)
        {
            case TaskStatus.Canceled:
                tcs.SetCanceled();
                break;
            case TaskStatus.Faulted:
                tcs.SetException(t.Exception.InnerExceptions);
                break;
            case TaskStatus.RanToCompletion:
                tcs.SetResult(t.Result);
                break;
        }

    });
}

public static void Match<T>(this TaskCompletionSource<T> tcs, Task task)
{
    Match(tcs, task.ContinueWith(t => default(T)));
}

现在对于我们的实际解决方案,我们可以做的是每次需要执行节流操作时,我们创建一个 TaskCompletionSource,然后进入我们的 TaskQueue 并添加一个item 启动任务,将 TCS 与其结果匹配,不等待,然后将任务队列延迟 1 秒.然后任务队列将不允许任务启动,直到过去一秒内不再有 N 个任务启动,而操作本身的结果与 create Task 相同:

Now for our actual solution what we can do is each time we need to perform a throttled operation we create a TaskCompletionSource, and then go into our TaskQueue and add an item that starts the task, matches the TCS to its result, doesn't await it, and then delays the task queue for 1 second. The task queue will then not allow a task to start until there are no longer N tasks started in the past second, while the result of the operation itself is the same as the create Task:

public class Throttler
{
    private TaskQueue queue;
    public Throttler(int requestsPerSecond)
    {
        queue = new TaskQueue(requestsPerSecond);
    }
    public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
    public Task Enqueue<T>(Func<Task> taskGenerator)
    {
        TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
}