且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

阻止任务在某个线程上运行

更新时间:2023-10-15 14:55:22

我首先建议你阅读我的async intro,它准确地解释了 await 将如何捕获上下文并使用它来恢复执行.简而言之,它将捕获当前的SynchronizationContext(如果SynchronizationContext.Currentnull,则捕获当前的TaskScheduler).

I first recommend that you read my async intro, which explains in precise terms how await will capture a context and use that to resume execution. In short, it will capture the current SynchronizationContext (or the current TaskScheduler if SynchronizationContext.Current is null).

另一个重要的细节是 async 延续是用 TaskContinuationOptions.ExecuteSynchronously 安排的(正如@svick 在评论中指出的那样).我有一篇关于此的博文,但 AFAIK它在任何地方都没有正式记录.这个细节确实让编写async生产者/消费者队列变得困难.

The other important detail is that async continuations are scheduled with TaskContinuationOptions.ExecuteSynchronously (as @svick pointed out in a comment). I have a blog post about this but AFAIK it is not officially documented anywhere. This detail does make writing an async producer/consumer queue difficult.

await 不是切换回原始上下文"的原因(可能)是因为 RabbitMQ 线程没有 SynchronizationContextTaskScheduler - 因此,当您调用 TrySetResult 时直接执行延续,因为这些线程看起来就像常规线程池线程.

The reason await isn't "switching back to the original context" is (probably) because the RabbitMQ threads don't have a SynchronizationContext or TaskScheduler - thus, the continuation is executed directly when you call TrySetResult because those threads look just like regular thread pool threads.

顺便说一句,通读您的代码,我怀疑您使用读取器/写入器锁和并发队列不正确.如果没有看到整个代码,我无法确定,但这是我的印象.

BTW, reading through your code, I suspect your use of a reader/writer lock and concurrent queues are incorrect. I can't be sure without seeing the whole code, but that's my impression.

我强烈建议您使用现有的 async 队列并围绕它构建一个使用者(换句话说,让其他人来做最困难的部分 :).BufferBlock<T> 类型输入 TPL 数据流 可以充当 async 队列;如果您的平台上有可用的 Dataflow,那将是我的第一个建议.否则,我的 AsyncEx 库中有一个 AsyncProducerConsumerQueue 类型,或者你可以自己写(正如我在博客中描述的那样).

I strongly recommend you use an existing async queue and build a consumer around that (in other words, let someone else do the hard part :). The BufferBlock<T> type in TPL Dataflow can act as an async queue; that would be my first recommendation if you have Dataflow available on your platform. Otherwise, I have an AsyncProducerConsumerQueue type in my AsyncEx library, or you could write your own (as I describe on my blog).

这是一个使用 BufferBlock<T> 的示例:

Here's an example using BufferBlock<T>:

private readonly BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs> _queue = new BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs>();

public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
    RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
    _queue.Post(e);
}

public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
    return _queue.ReceiveAsync(cancellationToken);
}

在此示例中,我将保留您的 DequeueAsync API.但是,一旦您开始使用 TPL 数据流,请考虑在其他地方也使用它.当您需要这样的队列时,通常会发现代码的其他部分也将从数据流方法中受益.例如,您可以将 BufferBlock 链接到 ActionBlock,而不是使用一堆方法调用 DequeueAsync.

In this example, I'm keeping your DequeueAsync API. However, once you start using TPL Dataflow, consider using it elsewhere as well. When you need a queue like this, it's common to find other parts of your code that would also benefit from a dataflow approach. E.g., instead of having a bunch of methods calling DequeueAsync, you could link your BufferBlock to an ActionBlock.