You need a way to respond to errors that can happen in your dataflow mesh.
您需要一种方法来响应数据流网格中可能发生的错误。
Solution 解决方法
If a delegate passed to a dataflow block throws an exception, then that block will enter a faulted state. When a block is in a faulted state, it will drop all of its data (and stop accepting new data). The block in the following code will never produce any output data; the first value raises an exception, and the second value is just dropped:
var block = new TransformBlock(item =>
{
if (item == 1)
throw new InvalidOperationException("Blech.");
return item * 2;
});
block.Post(1);
block.Post(2);
To catch exceptions from a dataflow block, you should await its Completion property. The Completion property returns a Task that will complete when the block is completed, and if the block faults, the Completion task is also faulted:
try
{
var block = new TransformBlock(item =>
{
if (item == 1)
throw new InvalidOperationException("Blech.");
return item * 2;
});
block.Post(1);
await block.Completion;
}
catch (InvalidOperationException)
{
// The exception is caught here.
}
When you propagate completion using the PropagateCompletion link option,errors are also propagated. However, the exception is passed to the next block wrapped in an AggregateException. The following example catches the exception from the end of a pipeline, so it would catch AggregateException if an exception was propagated from earlier blocks:
try
{
? ? var multiplyBlock = new TransformBlock<int, int>(item =>
? ? {
? ? ? ? if (item == 1)
? ? ? ? ? ? throw new InvalidOperationException("Blech.");
? ? ? ? return item * 2;
? ? });
? ? var subtractBlock = new TransformBlock<int, int>(item => item - 2);
? ? multiplyBlock.LinkTo(subtractBlock,
? ? new DataflowLinkOptions { PropagateCompletion = true });
? ? multiplyBlock.Post(1);
? ? await subtractBlock.Completion;
}
catch (AggregateException)
{
? ? // The exception is caught here.
}
When you propagate completion using the PropagateCompletion link option,errors are also propagated. However, the exception is passed to the next block wrapped in an AggregateException. Each block wraps incoming errors in an AggregateException, even if the incoming error is already an AggregateException. If an error occurs early in a pipeline and travels down several links before it’s observed, the original error will be wrapped in multiple layers of AggregateException. The AggregateException.Flatten method simplifies error handling in this scenario.The following example catches the exception from the end of a pipeline, so it would catch AggregateException if an exception was propagated from earlier blocks:
When you build your mesh (or pipeline), consider how errors should be handled. In simpler situations, it can be best to just propagate the errors and catch them once at the end. In more complex meshes, you may need to observe each block when the dataflow has completed.
Alternatively, if you want your blocks to remain viable in the face of exceptions, you can choose to treat exceptions as another kind of data and let them flow through your mesh along with your correctly processed data items. Using that pattern, you can keep your dataflow mesh operational, since the blocks themselves don’t fault and continue processing the next data item. See Recipe 14.6 for more details.