且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

异步在后台工作器中等待中介程序死锁-如何检测调用自身的线程

更新时间:2021-07-17 09:37:28

感谢对于斯蒂芬(Stephen)的回答和彼得(Peter)的评论,当您说谢谢的时候,确实确实让人眼花clear乱。

Thanks for Stephen for the answer and Peter for the comments, it indeed blindingly clear when stated thank you,


只有一个代码循环(不是特定线程;请参阅下文),由
负责处理队列。在处理每个命令时,
会等待该命令的处理程序。

There is one code loop (not a specific thread; see below) that is responsible for processing the queue. As it processes each command, it awaits that command's handler.

有一个命令处理程序正在等待另一个命令进行处理。
但是,这将无法工作,因为将不会再处​​理
进一步的命令;在此
完成之前,代码循环不会使下一个命令出队。

There is a command handler that awaits another command to be handled. However, this cannot work because no further commands will be processed; the code loop will not dequeue the next command until this one completes.

考虑到上述情况,我发现了一个

With the above in mind I have found a way to handle without any threading hacks (detecting stack/re-entrance depth etc) or schedulers.

在下面的示例中,我将注入到处理程序中而不是循环调用类,但不执行任何排队的另一种命令处理程序分派器,而是直接在线程内处理。

In the example below I "inject" into the handler not the looping calling class, but a different type of command handler dispatcher which does not do any queuing, it instead processes directly within the thread.

下面的内容是在线程循环内调用的,那么就没有相互依赖关系了。

The below is called from within the thread loop, then there is no inter-dependency:

public class DispatchInCallingThread: ICommandBus
{
    public async Task<object> Send(object request, CancellationToken cancellationToken)
    {
        // simplified DI container magic to static invocation
        if (request is BoringCommand boringCommand)
        {
            var handler = new BoringCommandHandler();
            return await handler.Handle(boringCommand, cancellationToken);
        }
        else if (request is LockMeGoodCommand lockMeGoodCommand)
        {
            var handler = new LockMeGoodCommandHandler(this);
            return await handler.Handle(lockMeGoodCommand, cancellationToken);
        }
        else
            throw new Exception("cough furball");
    }

    public void Start(CancellationToken cancellationToken) { }

    public Task StopAsync() { return Task.CompletedTask; }
}

在后台线程中,这是注入到实例化的命令处理程序中:

And within the background thread, this is the injection into the instantiated command handler:

else if (item.Command is LockMeGoodCommand lockMeGoodCommand)
{
    var handler = new LockMeGoodCommandHandler(this.dispatchInCallingThread);
    var result = await handler.Handle(lockMeGoodCommand, item.Ct);
    item.Tcs.SetResult(result);
}

现在代码将永久运行(将需要为取消令牌实施正确的关闭逻辑

Now the code runs forever (will need to implement proper shutdown logic for cancellation token source being set):

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace TestDeadlock
{
    class BoringCommand { }
    class LockMeGoodCommand { }    

    class BoringCommandHandler
    {
        public Task<int> Handle(BoringCommand command, CancellationToken cancellationToken)
        {
            Console.WriteLine(command.GetType().Name);         
            return Task.FromResult(1);
        }
    }

    class LockMeGoodCommandHandler
    {
        private readonly ICommandBus commandBus;

        public LockMeGoodCommandHandler(ICommandBus commandBus) => this.commandBus = commandBus;

        public async Task<int> Handle(LockMeGoodCommand command, CancellationToken cancellationToken)
        {            
            Console.WriteLine(command.GetType().Name);
            var otherResult =  await this.commandBus.Send(new BoringCommand(), cancellationToken);
            var otherResult2 = await this.commandBus.Send(new BoringCommand(), cancellationToken);
            return 3;
        }
    }

    public interface ICommandBus
    {
        Task<object> Send(object request, CancellationToken cancellationToken);
        void Start(CancellationToken cancellationToken);
        Task StopAsync();
    }

    public class DispatchOnBackgroundThread : ICommandBus
    {
        private readonly Channel<CommandItem> queue = Channel.CreateUnbounded<CommandItem>();
        private Task worker = null;
        private readonly DispatchInCallingThread dispatchInCallingThread = new DispatchInCallingThread();

        class CommandItem
        {
            public object Command { get; set; }
            public CancellationToken Ct { get; set; }
            public TaskCompletionSource<object> Tcs { get; set; }
        }

        public Task<object> Send(object command, CancellationToken cancellationToken)
        {
            var item = new CommandItem() { Command = command, Tcs = new TaskCompletionSource<object>(), Ct = cancellationToken };
            this.queue.Writer.WriteAsync(item, cancellationToken); // just write and immediatly return the cts
            return item.Tcs.Task;            
        }

        public void Start(CancellationToken cancellationToken)
        {
            var scheduler = new ConcurrentExclusiveSchedulerPair();

            this.worker = Task.Factory.StartNew(async () =>
            {
                CommandItem item = null;
                try
                {                
                    while (cancellationToken.IsCancellationRequested == false)
                    {
                        item = await this.queue.Reader.ReadAsync(cancellationToken);

                        // simplified DI container magic to static invocation
                        if (item.Command is BoringCommand boringCommand)
                        {
                            var handler = new BoringCommandHandler();
                            var result = handler.Handle(boringCommand, item.Ct);
                            item.Tcs.SetResult(result);

                        }
                        else if (item.Command is LockMeGoodCommand lockMeGoodCommand)
                        {
                            var handler = new LockMeGoodCommandHandler(this.dispatchInCallingThread);
                            var result = await handler.Handle(lockMeGoodCommand, item.Ct);
                            item.Tcs.SetResult(result);
                        }
                        else
                            throw new Exception("unknown");
                    }
                }
                catch (TaskCanceledException)
                {
                    if (item != null)
                        item.Tcs.SetCanceled();
                }
                Console.WriteLine("exit background thread");
            })
            .Unwrap();  

        }

        public async Task StopAsync()
        {
            this.queue.Writer.Complete();
            await this.worker;
        }
    }

    public class DispatchInCallingThread: ICommandBus
    {
        public async Task<object> Send(object request, CancellationToken cancellationToken)
        {
            // simplified DI container magic to static invocation
            if (request is BoringCommand boringCommand)
            {
                var handler = new BoringCommandHandler();
                return await handler.Handle(boringCommand, cancellationToken);
            }
            else if (request is LockMeGoodCommand lockMeGoodCommand)
            {
                var handler = new LockMeGoodCommandHandler(this);
                return await handler.Handle(lockMeGoodCommand, cancellationToken);
            }
            else
                throw new Exception("unknown");
        }

        public void Start(CancellationToken cancellationToken) { }
        public Task StopAsync() { return Task.CompletedTask; }
    }

    class Program
    {
        static async Task Main(string[] args)
        {
            await TestDispatchOnBackgroundThread();
        }

        static async Task TestDispatchOnBackgroundThread()
        {
            var cts = new CancellationTokenSource();

            Console.CancelKeyPress += delegate {
                Console.WriteLine("setting cts.Cancel()");
                cts.Cancel();
            };

            var threadStrategy = new DispatchOnBackgroundThread();
            threadStrategy.Start(cts.Token);

            while (cts.IsCancellationRequested == false)
            {
                Console.WriteLine("***************** sending new batch ****************");                
                var result1 = await threadStrategy.Send(new BoringCommand(), cts.Token);
                var result3 = await threadStrategy.Send(new LockMeGoodCommand(), cts.Token);
                Thread.Sleep(1000);
            }
            await threadStrategy.StopAsync();
        }
    }
}

有关详细信息,请参见 https://***.com/a/61791817/915839 可以通过依赖项注入实现工作线程内的线程内分派

For further info, the actual implementation with dependency injection is here https://***.com/a/61791817/915839 which was able to dynamically switch to in-thread dispatch within the worker thread