且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

当使用ConcurrentQueue时,尝试在并行循环时出队

更新时间:2023-02-16 16:00:16

100%肯定你试图归档这里。你试图去除所有项目,直到没有剩下的东西?

Well I'm not 100% sure what you try to archive here. Are you trying to just dequeue all items until nothing is left? Or just dequeue lots of items in one go?

第一个大概意外的行为开始于这个语句:

The first probably unexpected behavior starts with this statement:

 theQueue.AsParallel()

对于ConcurrentQueue, '快照' - Enumerator。所以当你迭代一个并发堆栈,你只需要遍历快照,没有'活'队列。

For a ConcurrentQueue, you get a 'Snapshot'-Enumerator. So when you iterate over a concurrent stack, you only iterate over the snapshot, no the 'live' queue.

一般来说,我认为这不是一个好主意迭代

In general I think it's not a good idea to iterate over something you're changing during the iteration.

因此,另一种解决方案如下所示:

So another solution would look like this:

        // this way it's more clear, that we only deque for theQueue.Count items
        // However after this, the queue is probably not empty
        // or maybe the queue is also empty earlier   
        Parallel.For(0, theQueue.Count,
                     new ParallelOptions() {MaxDegreeOfParallelism = 20},
                     () => { 
                         theQueue.TryDequeue(); //and stuff
                     });

这样可以避免在迭代时操作。然而,在该语句之后,队列仍然可以包含在for循环期间添加的数据。

This avoids manipulation something while iterating over it. However, after that statement, the queue can still contain data, which was added during the for-loop.

为了获得队列空的时间,你可能需要一个多一点工作。这是一个非常丑的解决方案。当队列仍有项目时,创建新任务。每个任务开始从队列中出队,只要它可以。最后,我们等待所有任务结束。为了限制并行性,我们从不创建超过20个任务。

To get the queue empty for moment in time you probably need a little more work. Here's an really ugly solution. While the queue has still items, create new tasks. Each task start do dequeue from the queue as long as it can. At the end, we wait for all tasks to end. To limit the parallelism, we never create more than 20-tasks.

        // Probably a kitty died because of this ugly code ;)
        // However, this code tries to get the queue empty in a very aggressive way
        Action consumeFromQueue = () =>
                                      {
                                          while (tt.TryDequeue())
                                          {
                                              ; // do your stuff
                                          }
                                      };
        var allRunningTasks = new Task[MaxParallism];
        for(int i=0;i<MaxParallism && tt.Count>0;i++)
        {
            allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);  
        }
        Task.WaitAll(allRunningTasks);