Concurrency in C# Cookbook中文翻译第六章System.Reactive基础知识

LINQ is a set of language features that enable developers to query sequences. The two most common LINQ providers are the built-in LINQ to Objects (which is based on IEnumerable<T>) and LINQ to Entities (based on IQueryable<T>). There are many other providers available, and most providers have the same general structure. Queries are lazily evaluated, and the sequences produce values as necessary. Conceptually, this is a pull model; during evaluation, value items are pulled from the query one at a time.
LINQ是一组语言特性,使开发人员能够查询序列。两个最常见的LINQ提供程序是内置的LINQ to Objects(基于IEnumerable<T>)和LINQ to Entities(基于IQueryable<T>)。还有许多其他提供程序可用,并且大多数提供程序具有相同的一般结构。查询是惰性求值的,序列根据需要生成值。从概念上讲,这是一个拉动模型;在求值期间,每次从查询中提取一个值项。
System.Reactive (Rx) treats events as sequences of data that arrive over time. As such, you can think of Rx as LINQ to Events (based on IObservable<T>). The main difference between observables and other LINQ providers is that Rx is a “push” model, meaning that the query defines how the program reacts as events arrive. Rx builds on top of LINQ, adding some powerful new operators as extension methods.
This chapter looks at some of the more common Rx operations. Bear in mind that all of the LINQ operators are also available, so simple operations, such as filtering (Where) and projection (Select), work conceptually the same as they do with any other LINQ provider. We won’t cover these common LINQ operations here; we’ll focus on the new capabilities that Rx builds on top of LINQ, particularly those dealing with time.
To use System.Reactive, install the NuGet package for System.Reactive into your application.

6.1. Converting .NET Events

Problem 问题

You have an event that you need to treat as a System.Reactive input stream,producing some data via OnNext each time the event is raised.

Solution 解决方案

The Observable class defines several event converters. Most .NET framework events are compatible with FromEventPattern, but if you have events that don’t follow the common pattern, you can use FromEvent instead.
FromEventPattern works best if the event delegate type is EventHandler<T>. Many newer framework types use this event delegate type. For example, the Progress<T> type defines a ProgressChanged event, which is of type EventHandler<T>, so it can be easily wrapped with FromEventPattern:
var progress = new Progress(); 
IObservable> progressReports = Observable.FromEventPattern( 
	handler => progress.ProgressChanged += handler, 
	handler => progress.ProgressChanged -= handler); progressReports.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs));
Note here that the data.EventArgs is strongly typed to be an int. The type argument to FromEventPattern (int in the previous example) is the same as the type T in EventHandler<T>. The two lambda arguments to FromEventPattern enable System.Reactive to subscribe and unsubscribe from the event.
The newer user interface frameworks use EventHandler<T>, and can easily be used with FromEventPattern, but older types often define a unique delegate type for each event. These can also be used with FromEventPattern, but it takes a bit more work. For example, the System.Timers.Timer type defines an Elapsed event, which is of type ElapsedEventHandler. You can wrap older events like this with FromEventPattern:
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true }; IObservable> ticks = Observable.FromEventPattern( 
	handler => (s, a) => handler(s, a), 
	handler => timer.Elapsed += handler, 
	handler => timer.Elapsed -= handler); 

ticks.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs.SignalTime));
Note that in this example that data.EventArgs is still strongly typed. The type arguments to FromEventPattern are now the unique handler type and the derived EventArgs type. The first lambda argument to FromEventPattern is a converter from EventHandler<ElapsedEventArgs> to ElapsedEventHandler; the converter should do nothing more than pass along the event.
注意,在本例中,data.EventArgs仍然是强类型的。FromEventPattern的类型参数现在是唯一的处理程序类型和派生的EventArgs类型。fromventpattern的第一个lambda参数是EventHandler<ElapsedEventArgs> ElapsedEventHandler;转换器应该做的只是传递事件。
That syntax is definitely getting awkward. Here’s another option, which uses reflection:

var timer = new System.Timers.Timer(interval: 1000) { Enabled = true }; IObservable> ticks = Observable.FromEventPattern(timer, nameof(Timer.Elapsed)); 
ticks.Subscribe(data => Trace.WriteLine("OnNext: " + ((ElapsedEventArgs)data.EventArgs).SignalTime));
With this approach, the call to FromEventPattern is much easier. Note that there’s one drawback to this approach: the consumer doesn’t get strongly typed data. Because data.EventArgs is of type object, you have to cast it to ElapsedEventArgs yourself.

Discussion 讨论

Events are a common source of data for System.Reactive streams. This recipe covers wrapping any events that conform to the standard event pattern (where the first argument is the sender and the second argument is the event arguments type). If you have unusual event types, you can still use the Observable.FromEvent method overloads to wrap them into an observable.
When events are wrapped into an observable, OnNext is called each time the event is raised. When you’re dealing with AsyncCompletedEventArgs, this can cause surprising behavior, because any exception is passed along as data (OnNext), not as an error (OnError). Consider this wrapper for WebClient.DownloadStringCompleted, for example:
var client = new WebClient(); 
IObservable> downloadedStrings = Observable.FromEventPattern(client, 
	nameof(WebClient.DownloadStringCompleted)); downloadedStrings.Subscribe( data => { var eventArgs = (DownloadStringCompletedEventArgs)data.EventArgs; 
	if (eventArgs.Error != null)
		Trace.WriteLine("OnNext: (Error) " + eventArgs.Error); 
		Trace.WriteLine("OnNext: " + eventArgs.Result); 
	ex => Trace.WriteLine("OnError: " + ex.ToString()), 
	() => Trace.WriteLine("OnCompleted")); 
	client.DownloadStringAsync(new Uri(""));
When WebClient.DownloadStringAsync completes with an error, the event is raised with an exception in AsyncCompletedEventArgs.Error. Unfortunately, System.Reactive sees this as a data event, so if you then run the preceding code you will see OnNext: (Error) printed instead of OnError:.
当WebClient.DownloadStringAsync以错误完成,该事件将在AsyncCompletedEventArgs.Error中引发异常。不幸的是,System.Reactive认为这是一个数据事件,所以如果你运行前面的代码,你会看到OnNext: (Error)而不是OnError:。
Some event subscriptions and unsubscriptions must be done from a particular context. For example, events on many UI controls must be subscribed to from the UI thread. System.Reactive provides an operator that will control the context for subscribing and unsubscribing: SubscribeOn. The SubscribeOn operator isn’t necessary in most situations because most of the time a UI-based subscription is done from the UI thread.
SubscribeOn controls the context for the code that adds and removes the event handlers. Don’t confuse this with ObserveOn, which controls the context for the observable notifications (the delegates passed to Subscribe).

6.2. Sending Notifications to a Context 向上下文发送通知

Problem 问题

System.Reactive does its best to be thread agnostic. So, it’ll raise notifications (e.g., OnNext) in whatever thread happens to be current. Each OnNext notification will happen sequentially, but not necessarily on the same thread. You often want these notifications raised in a particular context. For example, UI elements should only be manipulated from the UI thread that owns them, so if you’re updating a UI in response to a notification that is arriving on a threadpool thread, then you’ll need to move over to the UI thread.

Solution 解决方案

System.Reactive provides the ObserveOn operator to move notifications to another scheduler.

Consider the following example, which uses the Interval operator to create OnNext notifications once a second:

private void Button_Click(object sender, RoutedEventArgs e) 
	Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}"); 
	Observable.Interval(TimeSpan.FromSeconds(1)) .Subscribe(x => 
		Trace.WriteLine( $"Interval {x} on thread 
		{Environment.CurrentManagedThreadId}")); }
On my machine, the output looks like the following

UI thread is 9 
Interval 0 on thread 10 
Interval 1 on thread 10 
Interval 2 on thread 11 
Interval 3 on thread 11 
Interval 4 on thread 10 
Interval 5 on thread 11 
Interval 6 on thread 11
Since Interval is based on a timer (without a specific thread), the notifications are raised on a threadpool thread, rather than the UI thread. If you need to update a UI element, you can pipe those notifications through ObserveOn and pass a synchronization context representing the UI thread:
private void Button_Click(object sender, RoutedEventArgs e) 
	SynchronizationContext uiContext = SynchronizationContext.Current; 
	Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}"); 
	Observable.Interval(TimeSpan.FromSeconds(1)) .ObserveOn(uiContext) 
		.Subscribe(x => Trace.WriteLine(
			$"Interval {x} on thread {Environment.CurrentManagedThreadId}")); 
Another common usage of ObserveOn is to move off the UI thread when necessary. Consider a situation where you need to do some CPU-intensive computation whenever the mouse moves. By default, all mouse moves are raised on the UI thread, so you can use ObserveOn to move those notifications to a threadpool thread, do the computation, and then move the result notifications back to the UI thread:
SynchronizationContext uiContext = SynchronizationContext.Current; 
Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}"); 
		handler => (s, a) => handler(s, a), 
		handler => MouseMove += handler, 
		handler => MouseMove -= handler) 
	.Select(evt => evt.EventArgs.GetPosition(this)) 
	.Select(position => 
		// Complex calculation Thread.Sleep(100); 
		var result = position.X + position.Y; 
		var thread = Environment.CurrentManagedThreadId; 
		Trace.WriteLine($"Calculated result {result} on thread {thread}"); 
		return result; 
	.Subscribe(x => Trace.WriteLine( $"Result {x} on thread {Environment.CurrentManagedThreadId}"));
If you execute this sample, you’ll see the calculations done on a threadpool thread and the results printed on the UI thread. However, you’ll also notice that the calculations and results will lag behind the input; they’ll queue up because the mouse location updates more often than every 100 ms. System.Reactive has several techniques for handling this situation; one common one covered in Recipe 6.4 is throttling the input.

Discussion 讨论

ObserveOn actually moves notifications to a System.Reactive scheduler. This recipe covered the default (thread pool) scheduler and one way of creating a UI scheduler. The most common uses for the ObserveOn operator are moving on or off the UI thread, but schedulers are also useful in other scenarios. A more advanced scenario where schedulers are useful is faking the passage of time when unit testing, which you’ll find covered in Recipe 7.6.
ObserveOn实际上将通知移动到System.Reactive调度程序。本文介绍了默认(线程池)调度器和创建UI调度器的一种方法。ObserveOn操作符最常见的用途是移动UI线程,但调度器在其他场景中也很有用。在更高级的场景中,调度器很有用,这是在单元测试时计时,您将在Recipe 7.6中了解到。
ObserveOn controls the context for the observable notifications. This is not to be confused with SubscribeOn, which controls the context for the code that adds and removes the event handlers.

6.3.Grouping Event Data with Windows and Buffers用Windows和缓冲区分组事件数据

Problem 问题

You have a sequence of events, and you want to group the incoming events as they arrive. As an example, you need to react to pairs of inputs. As another example, you need to react to all inputs within a two-second window.

Solution 解决方案

System.Reactive provides a pair of operators that group incoming sequences: Buffer and Window. Buffer will hold on to the incoming events until the group is complete, at which time it forwards them all at once as a collection of events. Window will logically group the incoming events but will pass them along as they arrive. The return type of Buffer is IObservable<IList<T>> (an event stream of collections); the return type of Window is IObservable<IObservable<T>> (an event stream of event streams).
System.Reactive提供了一对对传入序列进行分组的操作符:Buffer和Window。缓冲区将保留传入的事件,直到组完成,此时它将它们作为事件集合一起转发。窗口将对传入的事件进行逻辑分组,但在它们到达时将它们传递下去。Buffer的返回类型是IObservable<IList<T>> (集合的事件流);Window的返回类型是IObservable<IObservable<T>>(事件流的事件流)。
The following example uses the Interval operator to create OnNext notifications once a second and then buffers them two at a time:
	.Subscribe(x => Trace.WriteLine( $"{DateTime.Now.Second}: Got {x[0]} and {x[1]}"));
On my machine, this code produces a pair of outputs every two seconds:
13: Got 0 and 1 
15: Got 2 and 3 
17: Got 4 and 5 
19: Got 6 and 7 
21: Got 8 and 9
The following is a similar example of using Window to create groups of two events:
	.Subscribe(group => 
		Trace.WriteLine($"{DateTime.Now.Second}: Starting new group"); 
			x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x}"), 
			() => Trace.WriteLine($"{DateTime.Now.Second}: Ending group")); 
On my machine, this Window example produces this output:
17: Starting new group 
18: Saw 0 
19: Saw 1 
19: Ending group 
19: Starting new group 
20: Saw 2 
21: Saw 3 
21: Ending group 
21: Starting new group 
22: Saw 4 
23: Saw 5 
23: Ending group 
23: Starting new group
These examples illustrate the difference between Buffer and Window. Buffer waits for all the events in its group and then publishes a single collection. Window groups events the same way, but publishes the events as they come in; Window immediately publishes an observable that will publish the events for that window.
Both Buffer and Window also work with time spans. The following code is an example where all mouse move events are collected in windows of one second:
private void Button_Click(object sender, RoutedEventArgs e) 
		handler => (s, a) => handler(s, a), 
		handler => MouseMove += handler, 
		handler => MouseMove -= handler) 
			.Subscribe(x => Trace.WriteLine( $"{DateTime.Now.Second}: Saw {x.Count} items.")); 
Depending on how you move the mouse, you should see output like the following:
49: Saw 93 items. 
50: Saw 98 items. 
51: Saw 39 items. 
52: Saw 0 items. 
53: Saw 4 items. 
54: Saw 0 items. 
55: Saw 58 items.

Discussion 讨论

Buffer and Window are some of the tools you have for taming input and shaping it the way you want it to look. Another useful technique is throttling, which you’ll learn about in Recipe 6.4.
Both Buffer and Window have other overloads that can be used in more advanced scenarios. The overloads with skip and timeShift parameters enable you to create groups that overlap other groups or skip elements in between groups. There are also overloads that take delegates, which enable you to dynamically define the boundary of the groups.

6.4. Taming Event Streams with Throttling and Sampling使用节流和采样来驯服事件流

Problem 问题

A common problem with writing reactive code is when the events come in too quickly. A fast-moving stream of events can overwhelm your program’s processing.

Solution 解决方案

System.Reactive provides operators specifically for dealing with a flood of event data. The Throttle and Sample operators give us two different ways to tame fast input events.
The Throttle operator establishes a sliding timeout window. When an incoming event arrives, it resets the timeout window. When the timeout window expires, it publishes the last event value that arrived within the window.
The following example monitors mouse movements and uses Throttle to only report updates once the mouse has stayed still for a full second:
private void Button_Click(object sender, RoutedEventArgs e) 
		handler => (s, a) => handler(s, a), 
		handler => MouseMove += handler, 
		handler => MouseMove -= handler) 
	.Select(x => x.EventArgs.GetPosition(this)) 
	.Subscribe(x => Trace.WriteLine( $"{DateTime.Now.Second}: Saw {x.X + x.Y}")); 
The output varies considerably based on mouse movement, but one example run on my machine looked like this:
47: Saw 139 
49: Saw 137 
51: Saw 424 
56: Saw 226
Throttle is often used in situations such as autocomplete, when the user is typing text into a text box, and you don’t want to do the actual lookup until the user stops typing.
Sample takes a different approach to taming fast-moving sequences. Sample establishes a regular timeout period and publishes the most recent value within that window each time the timeout expires. If no values were received within the sample period, then no results are published for that period.
The following example captures mouse movements and samples them in onesecond intervals. Unlike the Throttle example, this Sample example doesn’t require you to hold the mouse still to see data:
private void Button_Click(object sender, RoutedEventArgs e) 
		handler => (s, a) => handler(s, a), 
		handler => MouseMove += handler, 
		handler => MouseMove -= handler) 
	.Select(x => x.EventArgs.GetPosition(this)) 
	.Subscribe(x => Trace.WriteLine( $"{DateTime.Now.Second}: Saw {x.X + x.Y}")); 
Here’s the output on my machine when I first left the mouse still for a few seconds and then continuously moved it:
12: Saw 311 
17: Saw 254 
18: Saw 269 
19: Saw 342 
20: Saw 224 
21: Saw 277

Discussion 讨论

Throttling and sampling are essential tools for taming the flood of input. Don’t forget that you can also easily do filtering with the standard LINQ Where operator. You can think of the Throttle and Sample operators as similar to Where, only they filter on time windows instead of filtering on event data. All three of these operators help you tame fast-moving input streams in different ways.
节流和采样是控制大量输入的必要工具。不要忘记,您还可以使用标准的LINQ Where操作符轻松地进行过滤。您可以将Throttle和Sample操作符看作类似于Where,只是它们对时间窗口进行过滤,而不是对事件数据进行过滤。这三种操作符都可以帮助您以不同的方式控制快速移动的输入流。

6.5.Timeouts 超时

Problem 问题

You expect an event to arrive within a certain time and need to ensure that your program will respond in a timely fashion, even if the event doesn’t arrive. Most commonly, this kind of expected event is a single asynchronous operation (e.g.,expecting the response from a web service request).

Solution 解决方案

The Timeout operator establishes a sliding timeout window on its input stream. Whenever a new event arrives, the timeout window is reset. If the timeout expires without seeing an event in that window, the Timeout operator will end the stream with an OnError notification containing a TimeoutException.
The following example issues a web request for the example domain and applies a timeout of one second. To get the web request started, the code uses ToObservable to convert a Task<T> to an IObservable<T> (see Recipe 8.6):
void GetWithTimeout(HttpClient client) 
			x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.Length}"), 
			ex => Trace.WriteLine(ex)); 
Timeout is ideal for asynchronous operations, such as web requests, but it can be applied to any event stream. The following example applies Timeout to mouse movements, which are easier to play around with:
private void Button_Click(object sender, RoutedEventArgs e) 
		handler => (s, a) => handler(s, a), 
		handler => MouseMove += handler, 
		handler => MouseMove -= handler) 
	.Select(x => x.EventArgs.GetPosition(this)) 
		x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.X + x.Y}"), 
		ex => Trace.WriteLine(ex)); 
On my machine, I moved the mouse a bit and then kept it still for a second, and got these results:
16: Saw 180 
16: Saw 178 
16: Saw 177 
16: Saw 176 
System.TimeoutException: The operation has timed out.
Note that once the TimeoutException is sent to OnError, the stream is finished. No more mouse movements come through. You may not want exactly this behavior, so the Timeout operator has overloads that substitute a second stream when the timeout occurs instead of ending the stream with an exception.
The code in the following example observes mouse movements until there’s a timeout. After the timeout, the code observes mouse clicks:
private void Button_Click(object sender, RoutedEventArgs e) 
	IObservable clicks = Observable.FromEventPattern( 
		handler => (s, a) => handler(s, a), 
		handler => MouseDown += handler, 
		handler => MouseDown -= handler) 
	.Select(x => x.EventArgs.GetPosition(this));
	handler => (s, a) => handler(s, a),
		handler => MouseMove += handler, 
		handler => MouseMove -= handler) 
	.Select(x => x.EventArgs.GetPosition(this)) 
	.Timeout(TimeSpan.FromSeconds(1), clicks) 
		x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.X}, {x.Y}"), 
		ex => Trace.WriteLine(ex)); 
On my machine, I moved the mouse a bit, then held it still for a second, and then clicked a couple of different points. The following outputs shows the mouse movements quickly moving through until the timeout, and then the two clicks:
49: Saw 95,39 
49: Saw 94,39 
49: Saw 94,38 
49: Saw 94,37 
53: Saw 130,141 
55: Saw 469,4

Discussion 讨论

Timeout is an essential operator in nontrivial applications because you always want your program to be responsive even if the rest of the world isn’t. It’s particularly useful when you have asynchronous operations, but it can be applied to any event stream. Note that the underlying operation is not actually canceled; in the case of a timeout, the operation will continue executing until it succeeds or fails.