Skip to content

Commit

Permalink
Add 'Abort' feature
Browse files Browse the repository at this point in the history
  • Loading branch information
litenova committed Apr 16, 2024
1 parent fb89a05 commit 4218437
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 6 deletions.
3 changes: 3 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## v.0.24.2
- Allow aborting the execution of handlers by calling `Abort` on the execution context.

## v.0.24.1
- Add `Tags` to `IExecutionContext`.

Expand Down
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<LangVersion>latest</LangVersion>
<Authors>A. Shafie</Authors>
<PackageTags>mediator;cqrs</PackageTags>
<VersionPrefix>0.24.1</VersionPrefix>
<VersionPrefix>0.24.2</VersionPrefix>
<PackageIcon>icon.png</PackageIcon>
<Description>LiteBus is an easy-to-use and ambitious in-process mediator providing the foundation to implement Command Query Separation (CQS). It is implemented with minimal reflection and instead utilizes covariance and contravariance to provide its core functionality.</Description>
<PackageProjectUrl>https://github.com/litenova/LiteBus</PackageProjectUrl>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,15 @@ public interface IExecutionContext
/// Gets the collection of specified tags used to filter message handlers (i.e., pre, main and post) during mediation.
/// </summary>
IReadOnlyCollection<string> Tags { get; }

/// <summary>
/// The result of the message mediation.
/// </summary>
object? MessageResult { get; set; }

/// <summary>
/// Aborts the execution of the current mediation execution.
/// </summary>
/// <remarks>The messsage result is required if message has specific result and the execution is aborted in pre-handler phase.</remarks>
void Abort(object? messageResult = null);
}
19 changes: 19 additions & 0 deletions src/LiteBus.Messaging.Abstractions/Exceptions/ExecutionAborted.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;

namespace LiteBus.Messaging.Abstractions;

/// <summary>
/// Represents errors that occur when an execution process is aborted.
/// This exception is typically thrown in scenarios where a process is halted
/// due to failed validations or other conditions that prevent continuation of execution.
/// </summary>
[Serializable]
public class LiteBusExecutionAbortedException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="LiteBusExecutionAbortedException"/> class.
/// </summary>
public LiteBusExecutionAbortedException() : base("LiteBus Execution was aborted.")
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,17 @@ public async Task<TMessageResult> Mediate(TMessage message, IMessageDependencies

await messageDependencies.RunAsyncPostHandlers(message, messageResult);
}
catch (Exception e)
catch (LiteBusExecutionAbortedException)
{
if (executionContext.MessageResult is null)
{
throw new InvalidOperationException(
$"A Message result of type '{typeof(TMessageResult).Name}' is required when the execution is aborted as this message has a specific result.");
}

return await Task.FromResult((TMessageResult) executionContext.MessageResult);
}
catch (Exception e) when (e is not LiteBusExecutionAbortedException)
{
await messageDependencies.RunAsyncErrorHandlers(message, messageResult, ExceptionDispatchInfo.Capture(e));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public async Task Mediate(TMessage message, IMessageDependencies messageDependen

await messageDependencies.RunAsyncPostHandlers(message, messageResult);
}
catch (Exception e)
catch (LiteBusExecutionAbortedException)
{
return;
}
catch (Exception e) when (e is not LiteBusExecutionAbortedException)
{
await messageDependencies.RunAsyncErrorHandlers(message, messageResult, ExceptionDispatchInfo.Capture(e));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ public async IAsyncEnumerable<TMessageResult> Mediate(TMessage message, IMessage

messageResultAsyncEnumerable = (IAsyncEnumerable<TMessageResult>) handler!.Handle(message);
}
catch (Exception exception)
catch (LiteBusExecutionAbortedException)
{
yield break;
}
catch (Exception exception) when (exception is not LiteBusExecutionAbortedException)
{
await messageDependencies.RunAsyncErrorHandlers(message, messageResultAsyncEnumerable, ExceptionDispatchInfo.Capture(exception));
}
Expand All @@ -62,7 +66,11 @@ public async IAsyncEnumerable<TMessageResult> Mediate(TMessage message, IMessage

item = hasResult ? messageResultAsyncEnumerator.Current : default;
}
catch (Exception exception)
catch (LiteBusExecutionAbortedException)
{
yield break;
}
catch (Exception exception) when (exception is not LiteBusExecutionAbortedException)
{
await messageDependencies.RunAsyncErrorHandlers(message, messageResultAsyncEnumerable, ExceptionDispatchInfo.Capture(exception));
}
Expand All @@ -79,7 +87,11 @@ public async IAsyncEnumerable<TMessageResult> Mediate(TMessage message, IMessage
AmbientExecutionContext.Current = executionContext;
await messageDependencies.RunAsyncPostHandlers(message, messageResultAsyncEnumerable);
}
catch (Exception exception)
catch (LiteBusExecutionAbortedException)
{
yield break;
}
catch (Exception exception) when (exception is not LiteBusExecutionAbortedException)
{
AmbientExecutionContext.Current = executionContext;
await messageDependencies.RunAsyncErrorHandlers(message, messageResultAsyncEnumerable, ExceptionDispatchInfo.Capture(exception));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,12 @@ public ExecutionContext(CancellationToken cancellationToken, IEnumerable<string>
public IDictionary<object, object?> Items { get; } = new Dictionary<object, object?>();

public IReadOnlyCollection<string> Tags { get; }

public object? MessageResult { get; set; }

public void Abort(object? messageResult = null)
{
MessageResult = messageResult;
throw new LiteBusExecutionAbortedException();
}
}
26 changes: 26 additions & 0 deletions tests/LiteBus.Commands.UnitTests/CommandModuleTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,32 @@ public async Task mediating_a_command_with_exception_in_pre_handler_goes_through
command.ExecutedTypes[4].Should().Be<ProblematicCommandErrorHandler2>();
}

[Fact]
public async Task mediating_a_command_that_is_aborted_in_pre_handler_goes_through_correct_handlers()
{
// Arrange
var serviceProvider = new ServiceCollection()
.AddLiteBus(configuration => { configuration.AddCommandModule(builder => { builder.RegisterFromAssembly(typeof(CreateProductCommand).Assembly); }); })
.BuildServiceProvider();

var commandMediator = serviceProvider.GetRequiredService<ICommandMediator>();

var command = new CreateProductCommand
{
AbortInPreHandler = true
};

// Act
var commandResult = await commandMediator.SendAsync(command);

// Assert
commandResult.CorrelationId.Should().Be(Guid.Empty);
command.ExecutedTypes.Should().HaveCount(2);

command.ExecutedTypes[0].Should().Be<GlobalCommandPreHandler>();
command.ExecutedTypes[1].Should().Be<CreateProductCommandHandlerPreHandler>();
}

[Fact]
public async Task mediating_a_command_with_exception_in_post_global_handler_goes_through_error_handlers()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ public sealed class CreateProductCommand : IAuditableCommand, ICommand<CreatePro
public List<Type> ExecutedTypes { get; } = new();

public Guid CorrelationId { get; } = Guid.NewGuid();

public bool AbortInPreHandler { get; set; }
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using LiteBus.Commands.Abstractions;
using LiteBus.Messaging.Abstractions;

namespace LiteBus.Commands.UnitTests.UseCases.CreateProduct;

Expand All @@ -7,6 +8,12 @@ public sealed class CreateProductCommandHandlerPreHandler : ICommandPreHandler<C
public Task PreHandleAsync(CreateProductCommand message, CancellationToken cancellationToken = default)
{
message.ExecutedTypes.Add(GetType());

if (message.AbortInPreHandler)
{
AmbientExecutionContext.Current!.Abort(new CreateProductCommandResult { CorrelationId = Guid.Empty });
}

return Task.CompletedTask;
}
}
25 changes: 25 additions & 0 deletions tests/LiteBus.Queries.UnitTests/QueryModuleTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,29 @@ public async Task mediating_the_an_query_with_both_all_available_tags_will_fail_
// Assert
await act.Should().ThrowAsync<MultipleHandlerFoundException>();
}

[Fact]
public async Task mediating_a_stream_query_that_is_aborted_in_pre_handler_goes_through_correct_handlers()
{
// Arrange
var serviceProvider = new ServiceCollection()
.AddLiteBus(configuration => { configuration.AddQueryModule(builder => { builder.RegisterFromAssembly(typeof(GetProductQuery).Assembly); }); })
.BuildServiceProvider();

var queryMediator = serviceProvider.GetRequiredService<IQueryMediator>();
var query = new StreamProductsQuery
{
AbortInPreHandler = true
};

// Act
var queryResult = await queryMediator.StreamAsync(query).ToListAsync();

// Assert
queryResult.Should().BeEmpty();
query.ExecutedTypes.Should().HaveCount(2);

query.ExecutedTypes[0].Should().Be<GlobalQueryPreHandler>();
query.ExecutedTypes[1].Should().Be<StreamProductsQueryHandlerPreHandler>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ public sealed class StreamProductsQuery : IAuditableQuery, IStreamQuery<StreamPr
public List<Type> ExecutedTypes { get; } = new();

public Guid CorrelationId { get; } = Guid.NewGuid();

public bool AbortInPreHandler { get; set; }
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using LiteBus.Messaging.Abstractions;
using LiteBus.Queries.Abstractions;

namespace LiteBus.Queries.UnitTests.UseCases.StreamProducts;
Expand All @@ -7,6 +8,12 @@ public sealed class StreamProductsQueryHandlerPreHandler : IQueryPreHandler<Stre
public Task PreHandleAsync(StreamProductsQuery message, CancellationToken cancellationToken = default)
{
message.ExecutedTypes.Add(GetType());

if (message.AbortInPreHandler)
{
AmbientExecutionContext.Current!.Abort();
}

return Task.CompletedTask;
}
}

0 comments on commit 4218437

Please sign in to comment.