返回

c#-如何为 Channels 实现 BlockingCollection.TakeFromAny 等价物?

发布时间:2022-06-19 23:44:37 284
# flask

我正在尝试实现一个异步方法,该方法采用一个ChannelReader<T>s 数组,并从任何具有可用项目的通道中获取一个值。它是一种与该方法具有类似功能的BlockingCollection<T>.TakeFromAny方法,具有以下签名:

public static int TakeFromAny(BlockingCollection[] collections, out T item,
    CancellationToken cancellationToken);

collections从中删除项目的数组。一async方法不能具有out参数,所以我尝试实现的API是:

public static Task<(T Item, int Index)> TakeFromAnyAsync(
    ChannelReader[] channelReaders,
    CancellationToken cancellationToken = default);

该方法应该异步读取一个项目,并返回消耗的项目以及数组TakeFromAnyAsync<T>中关联通道的索引。channelReaders如果所有通道都已完成(成功或有错误),或者在 期间全部完成await,则该方法应异步抛出ChannelClosedException.

我的问题是:如何实现该TakeFromAnyAsync<T>方法?实现看起来相当棘手。很明显,在任何情况下,该方法都不应从渠道中消耗超过一项。此外,它不应该留下一劳永逸的任务,或者让一次性资源闲置。该方法通常会在循环中调用,因此它也应该相当有效。它的复杂度不应该比 O(n) 差,其中n通道的数量。

为了深入了解此方法的有用之处,您可以查看select声明语言从旅行:

这个select语句允许goroutine等待多个通信操作。

select块,直到其中一个案例可以运行,然后执行该案例。如果有多个准备就绪,它会随机选择一个。

select {
case msg1 := <-c1:
    fmt.Println("received", msg1)
case msg2 := <-c2:
    fmt.Println("received", msg2)
}

在上述示例中,将从通道中获取一个值c1并分配给变量msg1,或将从通道中获取值c2并分配给变量msg2. 围棋select语句不限于从通道读取。它可以包括多个异构情况,如写入有界通道、等待计时器等。复制Go的全部功能select声明超出了这个问题的范围。

特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报
评论区(1)
按点赞数排序
用户头像