System.Threading.Channels 是.NET Core 3.0 后推出的新的集合类型, 具有异步API,高性能,线程安全等特点,它提供一个异步数据集合,可用于生产者和消费者之前的数据异步传递。
它提供如下方法:
BoundedChannelOptions | Provides options that control the behavior of bounded Channel<T> instances. 提供通道的行为控制 有限通道 |
Channel | Provides static methods for creating channels. 提供创建通道的静态方法 |
Channel<T> | Provides a base class for channels that support reading and writing elements of type 泛型通道,写入和读取方类型都为 T |
Channel<TWrite,TRead> | Provides a base class for channels that support reading elements of type 泛型通道,分别指定写入和读取方的类型 |
ChannelClosedException | Exception thrown when a channel is used after it's been closed. 通道在关闭后被调用时会抛出此异常 |
ChannelOptions | Provides options that control the behavior of channel instances. 提供通道的行为控制 |
ChannelReader<T> | Provides a base class for reading from a channel. 通道读取方的基类 |
ChannelWriter<T> | Provides a base class for writing to a channel. 通道写入方的基类 |
UnboundedChannelOptions | Provides options that control the behavior of unbounded Channel 提供通道的行为控制 无限制通道 |
BoundedChannelFullMode | Specifies the behavior to use when writing to a bounded channel that is already full. 当通道容量达到最大时 控制通道的写入规则 有限通道 |
通道的创建方法由静态类Channel提供,只需要指定通道类型、控制条件和数据类型
CreateBounded<T>(BoundedChannelOptions) | Creates a channel with the specified maximum capacity. 创建具有指定配置的通道 |
CreateBounded<T>(BoundedChannelOptions, Action<T>) | Creates a channel subject to the provided options. 通道容量已满,再写入时会触发Action<T> |
CreateBounded<T>(Int32) | Creates a channel with the specified maximum capacity. 创建具有指定容量的通道 |
CreateUnbounded<T>() | Creates an unbounded channel usable by any number of readers and writers concurrently. 创建无限制通道,可供任意数量的读取和写入方同时操作 |
CreateUnbounded<T>(UnboundedChannelOptions) | Creates an unbounded channel subject to the provided options. 创建具有指定配置的无限制通道 |
DropNewest | 1 | 删除并忽略通道中的最新项,以便为要写入的项留出空间。 |
DropOldest | 2 | 删除并忽略通道中的最旧项,以便为要写入的项留出空间。 |
DropWrite | 3 | 删除要写入的项。 |
Wait | 0 | 等待空间可用以便完成写入操作。 |
AllowSynchronousContinuations | 如果通道上执行的操作能以同步方式调用已订阅挂起异步操作的通知的延续,则为 true ;如果应以异步方式调用所有延续,则为 false 。 |
SingleReader | 如果通道中的读取器需要保证一次最多仅执行一个读取操作,则为 |
SingleWriter | 如果写入到通道的编写器需要保证一次最多仅执行一个写入操作,则为 |
BoundedChannelOptions(int Capcity) | 初始化选项。 |
AllowSynchronousContinuations | 如果通道上执行的操作能以同步方式调用已订阅挂起异步操作的通知的延续,则为 |
Capacity | 获取或设置有限通道可能会存储的最大项数。 |
FullMode | 获取或设置通道已满时由写入操作引起的行为。BoundedChannelFullMode枚举 |
SingleReader | 如果通道中的读取器需要保证一次最多仅执行一个读取操作,则为 |
SingleWriter | 如果写入到通道的编写器需要保证一次最多仅执行一个写入操作,则为 |
AllowSynchronousContinuations | 如果通道上执行的操作能以同步方式调用已订阅挂起异步操作的通知的延续,则为 true ;如果应以异步方式调用所有延续,则为 false 。 |
SingleReader | 如果通道中的读取器需要保证一次最多仅执行一个读取操作,则为 |
SingleWriter | 如果写入到通道的编写器需要保证一次最多仅执行一个写入操作,则为 |
Complete(Exception) | Mark the channel as being complete, meaning no more items will be written to it. 标记通道即将关闭,意味着没有更多的数据需要写入 |
TryComplete(Exception) | Attempts to mark the channel as being completed, meaning no more data will be written to it. 尝试关闭通道,意味着没有更多的数据需要写入,成功返回 true,反之false |
TryWrite(T) | Attempts to write the specified item to the channel. 尝试写入数据,成功返回 true ,反之 false |
WaitToWriteAsync(CancellationToken) | Returns a ValueTask<TResult> that will complete when space is available to write an item. 将等待有可用空间写入项时完成写入并返回 |
WriteAsync(T, CancellationToken) | Asynchronously writes an item to the channel. |
ReadAllAsync(CancellationToken) | Creates an IAsyncEnumerable<T> that enables reading all of the data from the channel.创建的异步可枚举项 |
ReadAsync(CancellationToken) | Asynchronously reads an item from the channel. |
TryPeek(T) | Attempts to peek at an item from the channel. |
TryRead(T) | 如果项已读取到,则为 true ;否则为 false |
WaitToReadAsync(CancellationToken) | 它将在有数据可供读取时完成,并返回 |
// 创建有限容量的channel
var channel1 = Channel.CreateBounded<string>(100);
var option = new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait//容量满时,等待空位再写入
};
// 创建指定配置的有限通道
var channel2 = Channel.CreateBounded<string>(option);
await channel.Writer.WriteAsync("hello");//写入
string res = await channel.Reader.ReadAsync();//读取
//自动循环读取,内部阻塞,当写入方调用Complete时,会自动退出循环
await foreach(var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine(item);
}
//捕获到异常 退出
try
{
while(!channel.Reader.Completion.IsCompleted)
{
var message = await channel.Reader.ReadAsync();
Console.WriteLine(message);
}
}
catch(ChannelClosedException)
{
Console.WriteLine("channel closed");
}
有意思的是当写入方Writer调用Complete方法时,Reader.Completion.Completed并不会立即被标记,而是通道内没有数据时再被标记。
此外,当写入方Writer调用Complete方法时,await foreach会自动退出循环。但上述第二种读取方式会报错ChannelClosedException,需要Try Catch包起来。
.Net Framework 4.8 没有ReadAllAsync方法,ReadAkkAsync的实现其实是:
while (await channel.Reader.WaitToReadAsync())
{
while (channel.Reader.TryRead(out var item))
{
Console.WriteLine(item);
}
}
可以用上述方式避免报错。