Table of Contents
The Scatter-Gather pattern: send a request to multiple recipients, and aggregate the results into a single response.
This pattern helps to limit the coupling between the consumer and the recipients in integration scenarios, and provides standard error-handling and timeout capabilities.
Use a RecipientsCollection
to register the eligible recipients:
var collection = new RecipientsCollection();
collection.Add<Foo>(); // type
collection.Add(new Bar()); // instance
collection.Add((int n) => n.ToString()); // delegate
Use an Aggregator
for sending the requests to all the available recipients that can support the desired request/response types, and for aggregating the results:
var aggregator = new Aggregator(collection);
// Send a request to all the recipients
// capable of accepting and int.
// The results are then combined in the response:
AggregatedResponse<object> objects = await aggregator.Send(42);
// The following overload can be used when
// the return type is either known or binding.
// Only the recipients that accept an int and
// return a string will be invoked:
AggregatedResponse<string> strings = await aggregator.Send<string>(42);
Inspect the AggregatedResponse
containing the results of the scatter-gather operation, grouped by completed, faulted and incomplete:
var response = await aggregator.Send<string>(42, timeout: TimeSpan.FromSeconds(5));
var completed = response.Completed[0];
// (_, _, string result, TimeSpan duration) = completed;
var faulted = response.Faulted[0];
// (_, _, Exception? exception, TimeSpan duration) = faulted;
var incomplete = response.Incomplete[0];
// (string? recipientName, Type? recipientType) = incomplete;
Embrace duck typing
No binding contracts are used (e.g. IRecipient
).
A recipient is invoked if it defines a method matching the request:
class Foo
{
public string ThisIsInvoked(int n) => n.ToString();
}
class Bar
{
public int ThisIsInvokedToo(int n) => n * 2;
}
// Invoke every recipient that accepts an int.
_ = await aggregator.Send(42);
The recipients compete in order to provide the best, or the fastest, response to the request. The consumer will then pick the best value from the aggregated response.
sample: get an item's best price from a collection of suppliers:
Different operations are computed concurrently, and their results combined or used together. The result types could be different.
sample: get a user's data from different services, and then compose into a model:
The Aggregator
exposes async-only methods for sending requests.
Even if the consumer requested only results of type TResponse
, a recipient that returns Task<TResponse>
, or ValueTask<TResponse>
, or any task-like type, will still be invoked and its result awaited:
class Foo { public int Echo(int n) => n; }
class Bar { public Task<int> EchoAsync(int n) => Task.FromResult(n); }
// Nothing changes!
var response = await aggregator.Send(42);
// [ 42, 42 ]
Sometimes, a recipient can have two or more methods conflicting, given a request type:
class Foo
{
public int Double(int n) => n * 2;
public long Triple(int n) => n * 3L;
}
In this case, the aggregator will be able to invoke the recipient only if the return type of the conflicting methods is different, and it's explicitely defined by the consumer:
// The recipient won't be used.
_ = await aggregator.Send(42);
// Method "Triple" will be invoked.
var response = await aggregator.Send<long>(42);
class Foo { public int Double(int n) => n * 2; }
class Bar { public long Square(int n) => n * 1L * n; }
var response = await aggregator.Send(42);
// [ 84, 1764L ]
class Foo { public string Stringify(int n) => n.ToString(); }
class Bar { public long Longify(int n) => n * 1L; }
var onlyStrings = await aggregator.Send<string>(42);
// [ "42" ]
class Foo { public string Stringify(int n) => n.ToString(); }
class Bar
{
public async Task<long> Longify(int n)
{
await Task.Yield();
return n * 1L;
}
}
var response = await aggregator.Send(42);
// [ "42", 42L ]
class Foo
{
public string Todo(string s) =>
throw new NotImplementedException("TODO");
}
var response = await aggregator.Send("Don't Panic");
var (recipientType, exception) = response.Faulted[0];
// ( typeof(Foo), NotImplementedException("TODO") )
class Foo
{
public Task<int> Block(int n)
{
var tcs = new TaskCompletionSource<int>();
return tcs.Task; // It will never complete.
}
}
var response = await aggregator.Send(42, timeout: TimeSpan.FromSeconds(5));
Type recipientType = response.Incomplete[0];
// typeof(Foo)
// Register typeless recipients using delegates:
collection.Add((int n) => n.ToString());
collection.Add((int n) => n * 2);
// Use them as normal recipients:
var aggregator = new Aggregator(collection);
var responseOfInt = await aggregator.Send(42);
var resultsOfInt = responseOfInt.AsResultsList(); // 84, "42"
For more, take a look at the samples project in solution.