且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

合并数据流结果

更新时间:2022-11-02 23:22:59

正确实现Splitter和Aggregator块将过于复杂,难以实现,而且使用起来也很麻烦.因此,我想出了一个更简单的API,该API封装了两个块,一个主块和一个细节块.每个块的处理选项是不同的.主模块执行拆分和汇总操作,而详细信息模块执行每个详细信息的转换.关于两个单独的选项集的唯一要求是,CancellationToken对于两个选项都必须相同.可以为每个块分别设置所有其他选项(MaxDegreeOfParallelismBoundedCapacityEnsureOrderedTaskScheduler等).

A proper implementation of a Splitter and an Aggregator block would be way too complex to implement, and too cumbersome to use. So I came up with a simpler API, that encapsulates two blocks, a master block and a detail block. The processing options for each block are different. The master block executes the splitting and the aggregating actions, while the detail block executes the transformation of each detail. The only requirement regarding the two separate sets of options is that the CancellationToken must be the same for both. All other options (MaxDegreeOfParallelism, BoundedCapacity, EnsureOrdered, TaskScheduler etc) can be set independently for each block.

public static TransformBlock<TInput, TOutput>
    CreateSplitterAggregatorBlock<TInput, TDetail, TDetailResult, TOutput>(
    Func<TInput, Task<IEnumerable<TDetail>>> split,
    Func<TDetail, Task<TDetailResult>> transformDetail,
    Func<TInput, TDetailResult[], TOutput> aggregate,
    ExecutionDataflowBlockOptions splitAggregateOptions = null,
    ExecutionDataflowBlockOptions transformDetailOptions = null)
{
    if (split == null) throw new ArgumentNullException(nameof(split));
    if (aggregate == null) throw new ArgumentNullException(nameof(aggregate));
    if (transformDetail == null)
        throw new ArgumentNullException(nameof(transformDetail));
    splitAggregateOptions = splitAggregateOptions ??
        new ExecutionDataflowBlockOptions();
    var cancellationToken = splitAggregateOptions.CancellationToken;
    transformDetailOptions = transformDetailOptions ??
        new ExecutionDataflowBlockOptions() { CancellationToken = cancellationToken };
    if (transformDetailOptions.CancellationToken != cancellationToken)
        throw new ArgumentException("Incompatible options", "CancellationToken");

    var detailTransformer = new ActionBlock<Task<Task<TDetailResult>>>(async task =>
    {
        try
        {
            task.RunSynchronously();
            await task.Unwrap().ConfigureAwait(false);
        }
        catch { } // Suppress exceptions (errors are propagated through the task)
    }, transformDetailOptions);

    return new TransformBlock<TInput, TOutput>(async item =>
    {
        IEnumerable<TDetail> details = await split(item); //continue on captured context
        TDetailResult[] detailResults = await Task.Run(async () =>
        {
            var tasks = new List<Task<TDetailResult>>();
            foreach (var detail in details)
            {
                var taskFactory = new Task<Task<TDetailResult>>(
                    () => transformDetail(detail), cancellationToken);
                var accepted = await detailTransformer.SendAsync(taskFactory,
                    cancellationToken).ConfigureAwait(false);
                if (!accepted)
                {
                    cancellationToken.ThrowIfCancellationRequested();
                    throw new InvalidOperationException("Unexpected detail rejection.");
                }
                var task = taskFactory.Unwrap();
                // Assume that the detailTransformer will never fail, and so the task
                // will eventually complete. Guarding against this unlikely scenario
                // with Task.WhenAny(task, detailTransformer.Completion) seems overkill.
                tasks.Add(task);
            }
            return await Task.WhenAll(tasks).ConfigureAwait(false);
        }); // continue on captured context
        return aggregate(item, detailResults);
    }, splitAggregateOptions);
}

// Overload with synchronous lambdas
public static TransformBlock<TInput, TOutput>
    CreateSplitterAggregatorBlock<TInput, TDetail, TDetailResult, TOutput>(
    Func<TInput, IEnumerable<TDetail>> split,
    Func<TDetail, TDetailResult> transformDetail,
    Func<TInput, TDetailResult[], TOutput> aggregate,
    ExecutionDataflowBlockOptions splitAggregateOptions = null,
    ExecutionDataflowBlockOptions transformDetailOptions = null)
{
    return CreateSplitterAggregatorBlock(
        item => Task.FromResult(split(item)),
        detail => Task.FromResult(transformDetail(detail)),
        aggregate, splitAggregateOptions, transformDetailOptions);
}

下面是此块的用法示例.输入是包含逗号分隔数字的字符串.将每个字符串分割,然后将每个数字加倍,最后将每个输入字符串的加倍数字相加.

Below is a usage example of this block. The input is strings containing comma-separated numbers. Each string is splitted, then each number is doubled, and finally the doubled numbers of each input string are summed.

var processor = CreateSplitterAggregatorBlock<string, int, int, int>(split: str =>
{
    var parts = str.Split(',');
    return parts.Select(part => Int32.Parse(part));
}, transformDetail: number =>
{
    return number * 2;
}, aggregate: (str, numbersArray) =>
{
    var sum = numbersArray.Sum();
    Console.WriteLine($"[{str}] => {sum}");
    return sum;
});

processor.Post("1, 2, 3");
processor.Post("4, 5");
processor.Post("6, 7, 8, 9");
processor.Complete();
processor.LinkTo(DataflowBlock.NullTarget<int>());
processor.Completion.Wait();

输出:

[1, 2, 3] => 12
[4, 5] => 18
[6, 7, 8, 9] => 60