System.Threading.Channels 高性能异步队列

发布时间:2024年01月24日

引言

System.Threading.Channels 是.NET Core 3.0 后推出的新的集合类型, 具有异步API,高性能,线程安全等特点,它提供一个异步数据集合,可用于生产者和消费者之前的数据异步传递。

它提供如下方法:

BoundedChannelOptions

Provides options that control the behavior of bounded Channel<T> instances.

提供通道的行为控制 有限通道

Channel

Provides static methods for creating channels.

提供创建通道的静态方法

Channel<T>

Provides a base class for channels that support reading and writing elements of type T.

泛型通道,写入和读取方类型都为 T

Channel<TWrite,TRead>

Provides a base class for channels that support reading elements of type TRead and writing elements of type TWrite.

泛型通道,分别指定写入和读取方的类型

ChannelClosedException

Exception thrown when a channel is used after it's been closed.

通道在关闭后被调用时会抛出此异常

ChannelOptions

Provides options that control the behavior of channel instances.

提供通道的行为控制

ChannelReader<T>

Provides a base class for reading from a channel.

通道读取方的基类

ChannelWriter<T>

Provides a base class for writing to a channel.

通道写入方的基类

UnboundedChannelOptions

Provides options that control the behavior of unbounded Channel

提供通道的行为控制 无限制通道

BoundedChannelFullMode

Specifies the behavior to use when writing to a bounded channel that is already full.

当通道容量达到最大时 控制通道的写入规则 有限通道

静态类 Channel

通道的创建方法由静态类Channel提供,只需要指定通道类型、控制条件和数据类型

CreateBounded<T>(BoundedChannelOptions)

Creates a channel with the specified maximum capacity.

创建具有指定配置的通道

CreateBounded<T>(BoundedChannelOptions, Action<T>)

Creates a channel subject to the provided options.

通道容量已满,再写入时会触发Action<T>

CreateBounded<T>(Int32)

Creates a channel with the specified maximum capacity.

创建具有指定容量的通道

CreateUnbounded<T>()

Creates an unbounded channel usable by any number of readers and writers concurrently.

创建无限制通道,可供任意数量的读取和写入方同时操作

CreateUnbounded<T>(UnboundedChannelOptions)

Creates an unbounded channel subject to the provided options.

创建具有指定配置的无限制通道

BoundedChannelFullMode

DropNewest1

删除并忽略通道中的最新项,以便为要写入的项留出空间。

DropOldest2

删除并忽略通道中的最旧项,以便为要写入的项留出空间。

DropWrite3

删除要写入的项。

Wait0

等待空间可用以便完成写入操作。

?ChannelOption

AllowSynchronousContinuations如果通道上执行的操作能以同步方式调用已订阅挂起异步操作的通知的延续,则为 true;如果应以异步方式调用所有延续,则为 false
SingleReader

如果通道中的读取器需要保证一次最多仅执行一个读取操作,则为 true;如果不需要此类约束,则为 false

SingleWriter

如果写入到通道的编写器需要保证一次最多仅执行一个写入操作,则为 true;如果不需要此类约束,则为 false

BoundedChannelOptions

BoundedChannelOptions(int Capcity)

初始化选项。

AllowSynchronousContinuations

如果通道上执行的操作能以同步方式调用已订阅挂起异步操作的通知的延续,则为 true;如果应以异步方式调用所有延续,则为 false

(继承自 ChannelOptions)
Capacity

获取或设置有限通道可能会存储的最大项数。

FullMode

获取或设置通道已满时由写入操作引起的行为。BoundedChannelFullMode枚举

SingleReader

如果通道中的读取器需要保证一次最多仅执行一个读取操作,则为 true;如果不需要此类约束,则为 false

(继承自 ChannelOptions)
SingleWriter

如果写入到通道的编写器需要保证一次最多仅执行一个写入操作,则为 true;如果不需要此类约束,则为 false

(继承自 ChannelOptions)

?UnBoundedChannelOptions

AllowSynchronousContinuations如果通道上执行的操作能以同步方式调用已订阅挂起异步操作的通知的延续,则为 true;如果应以异步方式调用所有延续,则为 false
SingleReader

如果通道中的读取器需要保证一次最多仅执行一个读取操作,则为 true;如果不需要此类约束,则为 false

SingleWriter

如果写入到通道的编写器需要保证一次最多仅执行一个写入操作,则为 true;如果不需要此类约束,则为 false

Writer

Complete(Exception)

Mark the channel as being complete, meaning no more items will be written to it.

标记通道即将关闭,意味着没有更多的数据需要写入

TryComplete(Exception)

Attempts to mark the channel as being completed, meaning no more data will be written to it.

尝试关闭通道,意味着没有更多的数据需要写入,成功返回 true,反之false

TryWrite(T)

Attempts to write the specified item to the channel.

尝试写入数据,成功返回 true ,反之 false

WaitToWriteAsync(CancellationToken)

Returns a ValueTask<TResult> that will complete when space is available to write an item.

将等待有可用空间写入项时完成写入并返回 true, 或在通道关闭后返回 false

WriteAsync(T, CancellationToken)

Asynchronously writes an item to the channel.

Reader

ReadAllAsync(CancellationToken)

Creates an IAsyncEnumerable<T> that enables reading all of the data from the channel.创建的异步可枚举项

ReadAsync(CancellationToken)

Asynchronously reads an item from the channel.

TryPeek(T)

Attempts to peek at an item from the channel.

TryRead(T)如果项已读取到,则为 true;否则为 false
WaitToReadAsync(CancellationToken)

它将在有数据可供读取时完成,并返回 true 结果;或将在因通道成功完成而不再有数据可供读取时完成,并返回 false 结果

示例

// 创建有限容量的channel 
var channel1 = Channel.CreateBounded<string>(100);

var option = new BoundedChannelOptions(100) 
{ 
    FullMode = BoundedChannelFullMode.Wait//容量满时,等待空位再写入 
};
// 创建指定配置的有限通道
var channel2 = Channel.CreateBounded<string>(option);

await channel.Writer.WriteAsync("hello");//写入
string res = await channel.Reader.ReadAsync();//读取

//自动循环读取,内部阻塞,当写入方调用Complete时,会自动退出循环
await foreach(var item in channel.Reader.ReadAllAsync())
{
    Console.WriteLine(item); 
}

//捕获到异常 退出
try
{
    while(!channel.Reader.Completion.IsCompleted)
    {
        var message = await channel.Reader.ReadAsync();
        Console.WriteLine(message);
    }
}
catch(ChannelClosedException)
{
    Console.WriteLine("channel closed");
}

有意思的是当写入方Writer调用Complete方法时,Reader.Completion.Completed并不会立即被标记,而是通道内没有数据时再被标记。

此外,当写入方Writer调用Complete方法时,await foreach会自动退出循环。但上述第二种读取方式会报错ChannelClosedException,需要Try Catch包起来。

.Net Framework 4.8 没有ReadAllAsync方法,ReadAkkAsync的实现其实是:

while (await channel.Reader.WaitToReadAsync())
{
    while (channel.Reader.TryRead(out var item))
    {
        Console.WriteLine(item);
    }
}

可以用上述方式避免报错。

文章来源:https://blog.csdn.net/weixin_42253874/article/details/135827065
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。