更新时间:2023-10-15 14:55:22
我首先建议你阅读我的async
intro,它准确地解释了 await
将如何捕获上下文并使用它来恢复执行.简而言之,它将捕获当前的SynchronizationContext
(如果SynchronizationContext.Current
为null
,则捕获当前的TaskScheduler
).
I first recommend that you read my async
intro, which explains in precise terms how await
will capture a context and use that to resume execution. In short, it will capture the current SynchronizationContext
(or the current TaskScheduler
if SynchronizationContext.Current
is null
).
另一个重要的细节是 async
延续是用 TaskContinuationOptions.ExecuteSynchronously
安排的(正如@svick 在评论中指出的那样).我有一篇关于此的博文,但 AFAIK它在任何地方都没有正式记录.这个细节确实让编写async
生产者/消费者队列变得困难.
The other important detail is that async
continuations are scheduled with TaskContinuationOptions.ExecuteSynchronously
(as @svick pointed out in a comment). I have a blog post about this but AFAIK it is not officially documented anywhere. This detail does make writing an async
producer/consumer queue difficult.
await
不是切换回原始上下文"的原因(可能)是因为 RabbitMQ 线程没有 SynchronizationContext
或 TaskScheduler
- 因此,当您调用 TrySetResult
时直接执行延续,因为这些线程看起来就像常规线程池线程.
The reason await
isn't "switching back to the original context" is (probably) because the RabbitMQ threads don't have a SynchronizationContext
or TaskScheduler
- thus, the continuation is executed directly when you call TrySetResult
because those threads look just like regular thread pool threads.
顺便说一句,通读您的代码,我怀疑您使用读取器/写入器锁和并发队列不正确.如果没有看到整个代码,我无法确定,但这是我的印象.
BTW, reading through your code, I suspect your use of a reader/writer lock and concurrent queues are incorrect. I can't be sure without seeing the whole code, but that's my impression.
我强烈建议您使用现有的 async
队列并围绕它构建一个使用者(换句话说,让其他人来做最困难的部分 :).BufferBlock<T>
类型输入 TPL 数据流 可以充当 async
队列;如果您的平台上有可用的 Dataflow,那将是我的第一个建议.否则,我的 AsyncEx 库中有一个 AsyncProducerConsumerQueue
类型,或者你可以自己写(正如我在博客中描述的那样).
I strongly recommend you use an existing async
queue and build a consumer around that (in other words, let someone else do the hard part :). The BufferBlock<T>
type in TPL Dataflow can act as an async
queue; that would be my first recommendation if you have Dataflow available on your platform. Otherwise, I have an AsyncProducerConsumerQueue
type in my AsyncEx library, or you could write your own (as I describe on my blog).
这是一个使用 BufferBlock<T>
的示例:
Here's an example using BufferBlock<T>
:
private readonly BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs> _queue = new BufferBlock<RabbitMQ.Client.Events.BasicDeliverEventArgs>();
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
_queue.Post(e);
}
public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
return _queue.ReceiveAsync(cancellationToken);
}
在此示例中,我将保留您的 DequeueAsync
API.但是,一旦您开始使用 TPL 数据流,请考虑在其他地方也使用它.当您需要这样的队列时,通常会发现代码的其他部分也将从数据流方法中受益.例如,您可以将 BufferBlock
链接到 ActionBlock
,而不是使用一堆方法调用 DequeueAsync
.
In this example, I'm keeping your DequeueAsync
API. However, once you start using TPL Dataflow, consider using it elsewhere as well. When you need a queue like this, it's common to find other parts of your code that would also benefit from a dataflow approach. E.g., instead of having a bunch of methods calling DequeueAsync
, you could link your BufferBlock
to an ActionBlock
.