Concurrency in C# Cookbook中文翻译第六章System.Reactive基础知识

发布时间:2024年01月17日
LINQ is a set of language features that enable developers to query sequences. The two most common LINQ providers are the built-in LINQ to Objects (which is based on IEnumerable<T>) and LINQ to Entities (based on IQueryable<T>). There are many other providers available, and most providers have the same general structure. Queries are lazily evaluated, and the sequences produce values as necessary. Conceptually, this is a pull model; during evaluation, value items are pulled from the query one at a time.
LINQ是一组语言特性,使开发人员能够查询序列。两个最常见的LINQ提供程序是内置的LINQ to Objects(基于IEnumerable<T>)和LINQ to Entities(基于IQueryable<T>)。还有许多其他提供程序可用,并且大多数提供程序具有相同的一般结构。查询是惰性求值的,序列根据需要生成值。从概念上讲,这是一个拉动模型;在求值期间,每次从查询中提取一个值项。
System.Reactive (Rx) treats events as sequences of data that arrive over time. As such, you can think of Rx as LINQ to Events (based on IObservable<T>). The main difference between observables and other LINQ providers is that Rx is a “push” model, meaning that the query defines how the program reacts as events arrive. Rx builds on top of LINQ, adding some powerful new operators as extension methods.
System.Reactive(Rx)将事件视为随时间到达的数据序列。因此,您可以将Rx视为事件的LINQ(基于IObservable<T>)。可观察对象和其他LINQ提供程序之间的主要区别在于Rx是一个“推送”模型,这意味着查询定义了程序在事件到达时的反应。Rx构建在LINQ之上,添加了一些强大的新操作符作为扩展方法。
This chapter looks at some of the more common Rx operations. Bear in mind that all of the LINQ operators are also available, so simple operations, such as filtering (Where) and projection (Select), work conceptually the same as they do with any other LINQ provider. We won’t cover these common LINQ operations here; we’ll focus on the new capabilities that Rx builds on top of LINQ, particularly those dealing with time.
本章介绍一些比较常见的Rx操作。请记住,所有的LINQ操作符都是可用的,所以简单的操作,比如过滤(Where)和投影(Select),在概念上与使用其他LINQ提供程序是一样的。我们不会在这里讨论这些常见的LINQ操作;我们将重点关注Rx在LINQ之上构建的新功能,特别是那些处理时间的功能。
To use System.Reactive, install the NuGet package for System.Reactive into your application.
使用System.Reactive,安装NuGet包System.Reactive到应用程序中。

6.1. Converting .NET Events

Problem 问题

You have an event that you need to treat as a System.Reactive input stream,producing some data via OnNext each time the event is raised.
您需要将一个事件视为一个系统。响应性输入流,每次触发事件时通过OnNext产生一些数据。

Solution 解决方案

The Observable class defines several event converters. Most .NET framework events are compatible with FromEventPattern, but if you have events that don’t follow the common pattern, you can use FromEvent instead.
Observable类定义了几个事件转换器。大多数.NET框架事件都与FromEventPattern兼容,但是如果你有不遵循通用模式的事件,你可以使用FromEvent来代替。
FromEventPattern works best if the event delegate type is EventHandler<T>. Many newer framework types use this event delegate type. For example, the Progress<T> type defines a ProgressChanged event, which is of type EventHandler<T>, so it can be easily wrapped with FromEventPattern:
如果事件委托类型是EventHandler<T>,则FromEventPattern工作得最好。许多较新的框架类型使用此事件委托类型。例如,Progress<T>定义了一个ProgressChanged事件,它的类型是EventHandler<T>,所以它可以很容易地用FromEventPattern包装:
var progress = new Progress(); 
IObservable> progressReports = Observable.FromEventPattern( 
	handler => progress.ProgressChanged += handler, 
	handler => progress.ProgressChanged -= handler); progressReports.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs));
Note here that the data.EventArgs is strongly typed to be an int. The type argument to FromEventPattern (int in the previous example) is the same as the type T in EventHandler<T>. The two lambda arguments to FromEventPattern enable System.Reactive to subscribe and unsubscribe from the event.
注意这里的数据。EventArgs是强类型的int类型。FromEventPattern的类型参数(在前面的例子中是int)与EventHandler<T>中的类型T相同。FromEventPattern的两个lambda参数启用System.Reactive响应订阅和取消订阅事件。
The newer user interface frameworks use EventHandler<T>, and can easily be used with FromEventPattern, but older types often define a unique delegate type for each event. These can also be used with FromEventPattern, but it takes a bit more work. For example, the System.Timers.Timer type defines an Elapsed event, which is of type ElapsedEventHandler. You can wrap older events like this with FromEventPattern:
较新的用户界面框架使用EventHandler<T>,并且可以很容易地与FromEventPattern一起使用,但是旧的类型通常为每个事件定义唯一的委托类型。这些也可以与FromEventPattern一起使用,但它需要更多的工作。例如,System.Timers.Timer类型定义了一个Elapsed事件,其类型为ElapsedEventHandler。你可以像这样用FromEventPattern包装旧的事件:
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true }; IObservable> ticks = Observable.FromEventPattern( 
	handler => (s, a) => handler(s, a), 
	handler => timer.Elapsed += handler, 
	handler => timer.Elapsed -= handler); 

ticks.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs.SignalTime));
Note that in this example that data.EventArgs is still strongly typed. The type arguments to FromEventPattern are now the unique handler type and the derived EventArgs type. The first lambda argument to FromEventPattern is a converter from EventHandler<ElapsedEventArgs> to ElapsedEventHandler; the converter should do nothing more than pass along the event.
注意,在本例中,data.EventArgs仍然是强类型的。FromEventPattern的类型参数现在是唯一的处理程序类型和派生的EventArgs类型。fromventpattern的第一个lambda参数是EventHandler<ElapsedEventArgs> ElapsedEventHandler;转换器应该做的只是传递事件。
That syntax is definitely getting awkward. Here’s another option, which uses reflection:

这种语法肯定会变得很尴尬。这里有另一个选择,它使用反射:
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true }; IObservable> ticks = Observable.FromEventPattern(timer, nameof(Timer.Elapsed)); 
ticks.Subscribe(data => Trace.WriteLine("OnNext: " + ((ElapsedEventArgs)data.EventArgs).SignalTime));
With this approach, the call to FromEventPattern is much easier. Note that there’s one drawback to this approach: the consumer doesn’t get strongly typed data. Because data.EventArgs is of type object, you have to cast it to ElapsedEventArgs yourself.
使用这种方法,调用FromEventPattern要容易得多。注意,这种方法有一个缺点:使用者不能获得强类型数据。因为data.EventArgs是object类型,你必须自己将它强制转换为ElapsedEventArgs。

Discussion 讨论

Events are a common source of data for System.Reactive streams. This recipe covers wrapping any events that conform to the standard event pattern (where the first argument is the sender and the second argument is the event arguments type). If you have unusual event types, you can still use the Observable.FromEvent method overloads to wrap them into an observable.
事件是公共的数据System.Reactive流。此配方涵盖了包装符合标准事件模式的任何事件(其中第一个参数是发送方,第二个参数是事件参数类型)。如果你有不寻常的事件类型,你仍然可以使用Observable.FromEvent方法重载,将它们封装到一个可观察对象中。
When events are wrapped into an observable, OnNext is called each time the event is raised. When you’re dealing with AsyncCompletedEventArgs, this can cause surprising behavior, because any exception is passed along as data (OnNext), not as an error (OnError). Consider this wrapper for WebClient.DownloadStringCompleted, for example:
当事件被包装到一个可观察对象中时,每次引发事件时都会调用OnNext。当你处理AsyncCompletedEventArgs时,这可能会导致令人惊讶的行为,因为任何异常都作为数据(OnNext)传递,而不是作为错误(OnError)传递。考虑WebClient的这个包装器。DownloadStringCompleted,例如:
var client = new WebClient(); 
IObservable> downloadedStrings = Observable.FromEventPattern(client, 
	nameof(WebClient.DownloadStringCompleted)); downloadedStrings.Subscribe( data => { var eventArgs = (DownloadStringCompletedEventArgs)data.EventArgs; 
	if (eventArgs.Error != null)
		Trace.WriteLine("OnNext: (Error) " + eventArgs.Error); 
	else 
		Trace.WriteLine("OnNext: " + eventArgs.Result); 
	}, 
	ex => Trace.WriteLine("OnError: " + ex.ToString()), 
	() => Trace.WriteLine("OnCompleted")); 
	client.DownloadStringAsync(new Uri("http://invalid.example.com/"));
When WebClient.DownloadStringAsync completes with an error, the event is raised with an exception in AsyncCompletedEventArgs.Error. Unfortunately, System.Reactive sees this as a data event, so if you then run the preceding code you will see OnNext: (Error) printed instead of OnError:.
当WebClient.DownloadStringAsync以错误完成,该事件将在AsyncCompletedEventArgs.Error中引发异常。不幸的是,System.Reactive认为这是一个数据事件,所以如果你运行前面的代码,你会看到OnNext: (Error)而不是OnError:。
Some event subscriptions and unsubscriptions must be done from a particular context. For example, events on many UI controls must be subscribed to from the UI thread. System.Reactive provides an operator that will control the context for subscribing and unsubscribing: SubscribeOn. The SubscribeOn operator isn’t necessary in most situations because most of the time a UI-based subscription is done from the UI thread.
某些事件订阅和取消订阅必须从特定上下文中完成。例如,许多UI控件上的事件必须从UI线程订阅。System.Reactive提供了一个操作符来控制订阅和取消订阅的上下文:SubscribeOn。在大多数情况下,SubscribeOn操作符是不必要的,因为大多数情况下,基于UI的订阅是从UI线程完成的。
SubscribeOn controls the context for the code that adds and removes the event handlers. Don’t confuse this with ObserveOn, which controls the context for the observable notifications (the delegates passed to Subscribe).
SubscribeOn控制添加和删除事件处理程序的代码的上下文。不要将它与ObserveOn混淆,后者控制可观察通知(传递给Subscribe的委托)的上下文。

6.2. Sending Notifications to a Context 向上下文发送通知

Problem 问题

System.Reactive does its best to be thread agnostic. So, it’ll raise notifications (e.g., OnNext) in whatever thread happens to be current. Each OnNext notification will happen sequentially, but not necessarily on the same thread. You often want these notifications raised in a particular context. For example, UI elements should only be manipulated from the UI thread that owns them, so if you’re updating a UI in response to a notification that is arriving on a threadpool thread, then you’ll need to move over to the UI thread.
System.Reactive尽量做到线程不可知。因此,它会在当前线程中引发通知(例如,OnNext)。每个OnNext通知将按顺序发生,但不一定在同一线程上。您经常希望在特定上下文中引发这些通知。例如,UI元素应该只从拥有它们的UI线程中进行操作,所以如果你正在更新UI以响应线程池线程上到达的通知,那么你需要转移到UI线程。

Solution 解决方案

System.Reactive provides the ObserveOn operator to move notifications to another scheduler.

Consider the following example, which uses the Interval operator to create OnNext notifications once a second:
System.Reactive提供了ObserveOn操作符来将通知移动到另一个调度器。

考虑下面的例子,它使用Interval操作符每秒创建一次OnNext通知:
private void Button_Click(object sender, RoutedEventArgs e) 
{ 
	Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}"); 
	Observable.Interval(TimeSpan.FromSeconds(1)) .Subscribe(x => 
		Trace.WriteLine( $"Interval {x} on thread 
		{Environment.CurrentManagedThreadId}")); }
On my machine, the output looks like the following

在我的机器上,输出如下所示
UI thread is 9 
Interval 0 on thread 10 
Interval 1 on thread 10 
Interval 2 on thread 11 
Interval 3 on thread 11 
Interval 4 on thread 10 
Interval 5 on thread 11 
Interval 6 on thread 11
Since Interval is based on a timer (without a specific thread), the notifications are raised on a threadpool thread, rather than the UI thread. If you need to update a UI element, you can pipe those notifications through ObserveOn and pass a synchronization context representing the UI thread:
由于Interval基于计时器(没有特定的线程),因此在线程池线程上引发通知,而不是在UI线程上。如果你需要更新一个UI元素,你可以通过ObserveOn传递这些通知,并传递一个表示UI线程的同步上下文:
private void Button_Click(object sender, RoutedEventArgs e) 
{ 
	SynchronizationContext uiContext = SynchronizationContext.Current; 
	Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}"); 
	Observable.Interval(TimeSpan.FromSeconds(1)) .ObserveOn(uiContext) 
		.Subscribe(x => Trace.WriteLine(
			$"Interval {x} on thread {Environment.CurrentManagedThreadId}")); 
}
Another common usage of ObserveOn is to move off the UI thread when necessary. Consider a situation where you need to do some CPU-intensive computation whenever the mouse moves. By default, all mouse moves are raised on the UI thread, so you can use ObserveOn to move those notifications to a threadpool thread, do the computation, and then move the result notifications back to the UI thread:
ObserveOn的另一个常见用法是在必要时移出UI线程。考虑这样一种情况:每当鼠标移动时,都需要执行一些cpu密集型计算。默认情况下,所有的鼠标移动都是在UI线程中引发的,所以你可以使用ObserveOn将这些通知移动到线程池线程中,进行计算,然后将结果通知移动回UI线程:
SynchronizationContext uiContext = SynchronizationContext.Current; 
Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}"); 
Observable.FromEventPattern( 
		handler => (s, a) => handler(s, a), 
		handler => MouseMove += handler, 
		handler => MouseMove -= handler) 
	.Select(evt => evt.EventArgs.GetPosition(this)) 
	.ObserveOn(Scheduler.Default) 
	.Select(position => 
	{ 
		// Complex calculation Thread.Sleep(100); 
		var result = position.X + position.Y; 
		var thread = Environment.CurrentManagedThreadId; 
		Trace.WriteLine($"Calculated result {result} on thread {thread}"); 
		return result; 
	}) 
	.ObserveOn(uiContext) 
	.Subscribe(x => Trace.WriteLine( $"Result {x} on thread {Environment.CurrentManagedThreadId}"));
If you execute this sample, you’ll see the calculations done on a threadpool thread and the results printed on the UI thread. However, you’ll also notice that the calculations and results will lag behind the input; they’ll queue up because the mouse location updates more often than every 100 ms. System.Reactive has several techniques for handling this situation; one common one covered in Recipe 6.4 is throttling the input.
如果执行这个示例,您将看到在线程池线程上完成的计算和在UI线程上打印的结果。然而,你也会注意到计算和结果会滞后于输入;它们会排队,因为鼠标位置更新的频率比每100毫秒要高。系统。Reactive有几种技术来处理这种情况;菜谱6.4中介绍的一种常见方法是限制输入。

Discussion 讨论

ObserveOn actually moves notifications to a System.Reactive scheduler. This recipe covered the default (thread pool) scheduler and one way of creating a UI scheduler. The most common uses for the ObserveOn operator are moving on or off the UI thread, but schedulers are also useful in other scenarios. A more advanced scenario where schedulers are useful is faking the passage of time when unit testing, which you’ll find covered in Recipe 7.6.
ObserveOn实际上将通知移动到System.Reactive调度程序。本文介绍了默认(线程池)调度器和创建UI调度器的一种方法。ObserveOn操作符最常见的用途是移动UI线程,但调度器在其他场景中也很有用。在更高级的场景中,调度器很有用,这是在单元测试时计时,您将在Recipe 7.6中了解到。
ObserveOn controls the context for the observable notifications. This is not to be confused with SubscribeOn, which controls the context for the code that adds and removes the event handlers.
ObserveOn控制可观察对象通知的上下文。不要将其与SubscribeOn混淆,后者控制添加和删除事件处理程序的代码的上下文。

6.3.Grouping Event Data with Windows and Buffers用Windows和缓冲区分组事件数据

Problem 问题

You have a sequence of events, and you want to group the incoming events as they arrive. As an example, you need to react to pairs of inputs. As another example, you need to react to all inputs within a two-second window.
您有一个事件序列,并且希望在传入事件到达时对它们进行分组。例如,您需要对输入对做出反应。作为另一个例子,您需要在两秒钟的窗口内对所有输入作出反应。

Solution 解决方案

System.Reactive provides a pair of operators that group incoming sequences: Buffer and Window. Buffer will hold on to the incoming events until the group is complete, at which time it forwards them all at once as a collection of events. Window will logically group the incoming events but will pass them along as they arrive. The return type of Buffer is IObservable<IList<T>> (an event stream of collections); the return type of Window is IObservable<IObservable<T>> (an event stream of event streams).
System.Reactive提供了一对对传入序列进行分组的操作符:Buffer和Window。缓冲区将保留传入的事件,直到组完成,此时它将它们作为事件集合一起转发。窗口将对传入的事件进行逻辑分组,但在它们到达时将它们传递下去。Buffer的返回类型是IObservable<IList<T>> (集合的事件流);Window的返回类型是IObservable<IObservable<T>>(事件流的事件流)。
The following example uses the Interval operator to create OnNext notifications once a second and then buffers them two at a time:
下面的例子使用Interval操作符每秒创建一次OnNext通知,然后一次缓冲两个通知:
Observable.Interval(TimeSpan.FromSeconds(1)) 
	.Buffer(2) 
	.Subscribe(x => Trace.WriteLine( $"{DateTime.Now.Second}: Got {x[0]} and {x[1]}"));
On my machine, this code produces a pair of outputs every two seconds:
在我的机器上,这段代码每两秒钟产生一对输出:
13: Got 0 and 1 
15: Got 2 and 3 
17: Got 4 and 5 
19: Got 6 and 7 
21: Got 8 and 9
The following is a similar example of using Window to create groups of two events:
下面是一个使用窗口创建两个事件组的类似示例:
Observable.Interval(TimeSpan.FromSeconds(1)) 
	.Window(2) 
	.Subscribe(group => 
	{ 
		Trace.WriteLine($"{DateTime.Now.Second}: Starting new group"); 
		group.Subscribe( 
			x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x}"), 
			() => Trace.WriteLine($"{DateTime.Now.Second}: Ending group")); 
	});
On my machine, this Window example produces this output:
在我的机器上,这个窗口示例产生如下输出:
17: Starting new group 
18: Saw 0 
19: Saw 1 
19: Ending group 
19: Starting new group 
20: Saw 2 
21: Saw 3 
21: Ending group 
21: Starting new group 
22: Saw 4 
23: Saw 5 
23: Ending group 
23: Starting new group
These examples illustrate the difference between Buffer and Window. Buffer waits for all the events in its group and then publishes a single collection. Window groups events the same way, but publishes the events as they come in; Window immediately publishes an observable that will publish the events for that window.
这些例子说明了缓冲区和窗口之间的区别。Buffer等待其组中的所有事件,然后发布单个集合。窗口以相同的方式对事件进行分组,但在事件进入时发布事件;窗口立即发布一个可观察对象,该可观察对象将发布该窗口的事件。
Both Buffer and Window also work with time spans. The following code is an example where all mouse move events are collected in windows of one second:
Buffer和Window都可以处理时间跨度。下面的代码是一个例子,其中所有的鼠标移动事件收集在一秒钟的窗口:
private void Button_Click(object sender, RoutedEventArgs e) 
{
	Observable.FromEventPattern( 
		handler => (s, a) => handler(s, a), 
		handler => MouseMove += handler, 
		handler => MouseMove -= handler) 
			.Buffer(TimeSpan.FromSeconds(1)) 
			.Subscribe(x => Trace.WriteLine( $"{DateTime.Now.Second}: Saw {x.Count} items.")); 
}
Depending on how you move the mouse, you should see output like the following:
根据您移动鼠标的方式,您应该看到如下输出:
49: Saw 93 items. 
50: Saw 98 items. 
51: Saw 39 items. 
52: Saw 0 items. 
53: Saw 4 items. 
54: Saw 0 items. 
55: Saw 58 items.

Discussion 讨论

Buffer and Window are some of the tools you have for taming input and shaping it the way you want it to look. Another useful technique is throttling, which you’ll learn about in Recipe 6.4.
缓冲器和窗口是一些工具,您可以用来驯服输入并将其塑造成您想要的样子。另一种有用的技术是节流,您将在菜谱6.4中了解它。
Both Buffer and Window have other overloads that can be used in more advanced scenarios. The overloads with skip and timeShift parameters enable you to create groups that overlap other groups or skip elements in between groups. There are also overloads that take delegates, which enable you to dynamically define the boundary of the groups.
Buffer和Window都有其他重载,可以在更高级的场景中使用。使用skip和timshift参数的重载使您能够创建与其他组重叠的组,或者在组之间跳过元素。还有接受委托的重载,这使您能够动态定义组的边界。

6.4. Taming Event Streams with Throttling and Sampling使用节流和采样来驯服事件流

Problem 问题

A common problem with writing reactive code is when the events come in too quickly. A fast-moving stream of events can overwhelm your program’s processing.
编写响应式代码的一个常见问题是事件来得太快。快速移动的事件流可能会压倒程序的处理能力。

Solution 解决方案

System.Reactive provides operators specifically for dealing with a flood of event data. The Throttle and Sample operators give us two different ways to tame fast input events.
System.Reactive提供了专门用于处理大量事件数据的操作符。Throttle和Sample操作符为我们提供了两种不同的方法来驯服快速输入事件。
The Throttle operator establishes a sliding timeout window. When an incoming event arrives, it resets the timeout window. When the timeout window expires, it publishes the last event value that arrived within the window.
节流操作器建立一个滑动超时窗口。当传入事件到达时,它会重置超时窗口。当超时窗口到期时,它将发布该窗口内到达的最后一个事件值。
The following example monitors mouse movements and uses Throttle to only report updates once the mouse has stayed still for a full second:
下面的例子监控鼠标的移动,并使用油门只报告更新,一旦鼠标已经保持静止了整整一秒钟:
private void Button_Click(object sender, RoutedEventArgs e) 
{
	Observable.FromEventPattern(
		handler => (s, a) => handler(s, a), 
		handler => MouseMove += handler, 
		handler => MouseMove -= handler) 
	.Select(x => x.EventArgs.GetPosition(this)) 
	.Throttle(TimeSpan.FromSeconds(1)) 
	.Subscribe(x => Trace.WriteLine( $"{DateTime.Now.Second}: Saw {x.X + x.Y}")); 
}
The output varies considerably based on mouse movement, but one example run on my machine looked like this:
根据鼠标的移动,输出有很大的不同,但是在我的机器上运行的一个例子是这样的:
47: Saw 139 
49: Saw 137 
51: Saw 424 
56: Saw 226
Throttle is often used in situations such as autocomplete, when the user is typing text into a text box, and you don’t want to do the actual lookup until the user stops typing.
Throttle通常用于自动补全等情况,此时用户正在向文本框中输入文本,而您不希望在用户停止输入之前执行实际查找。
Sample takes a different approach to taming fast-moving sequences. Sample establishes a regular timeout period and publishes the most recent value within that window each time the timeout expires. If no values were received within the sample period, then no results are published for that period.
Sample采用不同的方法来驯服快速移动的序列。Sample建立一个常规的超时周期,并在每次超时到期时在该窗口内发布最新的值。如果在样本周期内没有收到任何值,则不会发布该周期的结果。
The following example captures mouse movements and samples them in onesecond intervals. Unlike the Throttle example, this Sample example doesn’t require you to hold the mouse still to see data:
下面的示例捕获鼠标移动并以一秒的间隔对其进行采样。与Throttle示例不同,这个示例示例不需要您按住鼠标不动即可查看数据:
private void Button_Click(object sender, RoutedEventArgs e) 
{ 
	Observable.FromEventPattern( 
		handler => (s, a) => handler(s, a), 
		handler => MouseMove += handler, 
		handler => MouseMove -= handler) 
	.Select(x => x.EventArgs.GetPosition(this)) 
	.Sample(TimeSpan.FromSeconds(1)) 
	.Subscribe(x => Trace.WriteLine( $"{DateTime.Now.Second}: Saw {x.X + x.Y}")); 
}
Here’s the output on my machine when I first left the mouse still for a few seconds and then continuously moved it:
以下是我的机器上的输出,当我第一次让鼠标静止几秒钟,然后不断移动它:
12: Saw 311 
17: Saw 254 
18: Saw 269 
19: Saw 342 
20: Saw 224 
21: Saw 277

Discussion 讨论

Throttling and sampling are essential tools for taming the flood of input. Don’t forget that you can also easily do filtering with the standard LINQ Where operator. You can think of the Throttle and Sample operators as similar to Where, only they filter on time windows instead of filtering on event data. All three of these operators help you tame fast-moving input streams in different ways.
节流和采样是控制大量输入的必要工具。不要忘记,您还可以使用标准的LINQ Where操作符轻松地进行过滤。您可以将Throttle和Sample操作符看作类似于Where,只是它们对时间窗口进行过滤,而不是对事件数据进行过滤。这三种操作符都可以帮助您以不同的方式控制快速移动的输入流。

6.5.Timeouts 超时

Problem 问题

You expect an event to arrive within a certain time and need to ensure that your program will respond in a timely fashion, even if the event doesn’t arrive. Most commonly, this kind of expected event is a single asynchronous operation (e.g.,expecting the response from a web service request).
您希望事件在一定时间内到达,并且需要确保您的程序能够及时响应,即使事件没有到达。最常见的是,这种期望事件是单个异步操作(例如,期望来自web服务请求的响应)。

Solution 解决方案

The Timeout operator establishes a sliding timeout window on its input stream. Whenever a new event arrives, the timeout window is reset. If the timeout expires without seeing an event in that window, the Timeout operator will end the stream with an OnError notification containing a TimeoutException.
Timeout操作符在其输入流上建立一个滑动超时窗口。每当有新事件到达时,超时窗口将被重置。如果超时到期而未在该窗口中看到事件,则timeout操作符将使用包含TimeoutException的OnError通知结束流。
The following example issues a web request for the example domain and applies a timeout of one second. To get the web request started, the code uses ToObservable to convert a Task<T> to an IObservable<T> (see Recipe 8.6):
下面的示例为示例域发出一个web请求,并应用1秒的超时。为了启动web请求,代码使用ToObservable来转换Task<T>到一个IObservable<T>(参见配方8.6):
void GetWithTimeout(HttpClient client) 
{ 
	client.GetStringAsync("http://www.example.com/")
		.ToObservable() 
		.Timeout(TimeSpan.FromSeconds(1)) 
		.Subscribe( 
			x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.Length}"), 
			ex => Trace.WriteLine(ex)); 
}
Timeout is ideal for asynchronous operations, such as web requests, but it can be applied to any event stream. The following example applies Timeout to mouse movements, which are easier to play around with:
超时是异步操作(如web请求)的理想选择,但它也可以应用于任何事件流。下面的例子将Timeout应用于鼠标移动,这更容易玩:
private void Button_Click(object sender, RoutedEventArgs e) 
{
	Observable.FromEventPattern( 
		handler => (s, a) => handler(s, a), 
		handler => MouseMove += handler, 
		handler => MouseMove -= handler) 
	.Select(x => x.EventArgs.GetPosition(this)) 
	.Timeout(TimeSpan.FromSeconds(1)) 
	.Subscribe( 
		x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.X + x.Y}"), 
		ex => Trace.WriteLine(ex)); 
}
On my machine, I moved the mouse a bit and then kept it still for a second, and got these results:
在我的机器上,我移动了一下鼠标,然后让它静止一秒钟,得到了这些结果:
16: Saw 180 
16: Saw 178 
16: Saw 177 
16: Saw 176 
System.TimeoutException: The operation has timed out.
Note that once the TimeoutException is sent to OnError, the stream is finished. No more mouse movements come through. You may not want exactly this behavior, so the Timeout operator has overloads that substitute a second stream when the timeout occurs instead of ending the stream with an exception.
注意,一旦TimeoutException被发送到OnError,流就结束了。没有更多的鼠标移动。您可能不希望完全发生这种行为,因此Timeout操作符具有重载,在超时发生时替换第二个流,而不是以异常结束流。
The code in the following example observes mouse movements until there’s a timeout. After the timeout, the code observes mouse clicks:
以下示例中的代码观察鼠标移动,直到出现超时。超时后,代码观察鼠标点击:
private void Button_Click(object sender, RoutedEventArgs e) 
{ 
	IObservable clicks = Observable.FromEventPattern( 
		handler => (s, a) => handler(s, a), 
		handler => MouseDown += handler, 
		handler => MouseDown -= handler) 
	.Select(x => x.EventArgs.GetPosition(this));
	
	Observable.FromEventPattern( 
	handler => (s, a) => handler(s, a),
		handler => MouseMove += handler, 
		handler => MouseMove -= handler) 
	.Select(x => x.EventArgs.GetPosition(this)) 
	.Timeout(TimeSpan.FromSeconds(1), clicks) 
	.Subscribe(
		x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.X}, {x.Y}"), 
		ex => Trace.WriteLine(ex)); 
}
On my machine, I moved the mouse a bit, then held it still for a second, and then clicked a couple of different points. The following outputs shows the mouse movements quickly moving through until the timeout, and then the two clicks:
在我的电脑上,我移动了一下鼠标,然后让它静止一秒钟,然后点击了几个不同的点。下面的输出显示了鼠标快速移动通过,直到超时,然后两次点击:
49: Saw 95,39 
49: Saw 94,39 
49: Saw 94,38 
49: Saw 94,37 
53: Saw 130,141 
55: Saw 469,4

Discussion 讨论

Timeout is an essential operator in nontrivial applications because you always want your program to be responsive even if the rest of the world isn’t. It’s particularly useful when you have asynchronous operations, but it can be applied to any event stream. Note that the underlying operation is not actually canceled; in the case of a timeout, the operation will continue executing until it succeeds or fails.
超时是重要应用程序中必不可少的操作符,因为您总是希望您的程序能够响应,即使其他程序没有响应。当您有异步操作时,它特别有用,但它可以应用于任何事件流。注意,底层操作实际上并没有被取消;在超时的情况下,操作将继续执行,直到成功或失败。
文章来源:https://blog.csdn.net/a13407142317/article/details/135616601
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。