Skip to content

Commit

Permalink
Implement dispatch channels (elsa-workflows#4949)
Browse files Browse the repository at this point in the history
* Add DispatchWorkflowOptions and update MassTransit configuration

Introduced a new class `DispatchWorkflowOptions` to provide workflow dispatch options. Updated MassTransit configuration to include NET6.0 and NET7.0 support, and ensure correct MassTransit version usage per target framework version.

* Add support for configurable MassTransit message dispatching

Implemented a feature which allows for configurable MassTransit message dispatching. Added support for specifying channels and message brokers. Message dispatching code was massively refactored and relevant endpoints and response models were updated to support new message dispatching features.

* Add IEndpointChannelFormatter interface and implementation

An interface for formatting channel queue names, 'IEndpointChannelFormatter', has been added, along with its default implementation 'DefaultEndpointChannelFormatter'. The code that uses hardcoded queue name formatting has been modified to use the new formatter instead, making it more configurable and reusable. The implementation of the formatter uses the 'Humanizer' library to kebab-case the channel names.

* Add channel dispatch option to workflow activities

Introduced `WorkflowDispatcherChannelOptionsProvider` to provide dropdown channel options for workflow dispatch-related activities. Updated `DispatchWorkflow` and `BulkDispatchWorkflows` activities to include a new dropdown input for specifying a dispatch channel. Also, included the selected channel name in the `DispatchWorkflowOptions` during workflow dispatching process.

* Update MassTransit configurations and remove unused code

The MassTransit setup in Elsa.MassTransit module has been simplified by removing conditional code for different .NET versions. Additionally, unused parameter '__X_Channel' in 'DispatchWorkflowDefinition' was removed. Lastly, the MassTransit broker in Elsa.Server.Web was switched from RabbitMq to AzureServiceBus.

* Refactor dispatch workflow classes and methods

Simplified the class, method and variable names related to workflow dispatching in the Elsa.Workflows.Runtime module. For example, the 'WorkflowDispatcherChannelDescriptor' class was renamed to 'DispatcherChannel'. This refactoring was performed to make code more readable and maintainable by removing redundant wording in the naming convention.

* Update GitHub Workflow to support feature and issue branches

The workflow changes add support for feature and issue branches. Now, it extracts the branch name and verifies the commit exists in the given branch rather than just 'main'. The versioning scheme is also modified to include the branch name and not just the run number.

* Add 'bug/*' to triggering branches in packages workflow

The 'bug/*' pattern was missing from the triggers that initiate the GitHub actions within our packages workflow. This update includes any branch with a 'bug/' prefix to the list, allowing bug-related branches to start jobs in our CI/CD pipeline.

* Remove 'issue/*' and 'bug/*' branches from packages workflow

The 'issue/*' and 'bug/*' branches have been removed from the GitHub action workflow for packages. This change was made to simplify the workflow and optimize the triggering of package building.

* Update GitHub workflow to handle main branch versioning

This commit modifies the GitHub workflow script to accommodate changes when the branch name is "main." If the branch name is "main", a preview version is used. It also updates script execution to print the branch name for easier debugging and verifies commit existence on the correct branch instead of dispatch channels.

* Enclose branch names in quotes in packages.yml

The update modifies the branch names in the packages.yml GitHub Actions workflow file. The change consists of enclosing the branch names 'main' and 'feature/*' in single quotes, ensuring compatibility and preventing potential string interpretation issues.

* Update GitHub workflows package configuration

The workflows package configuration has been updated to specifically watch for changes on 'feature/dispatch-channels' rather than on all feature branches. This change will prevent unnecessary builds on less relevant feature branches.

* Update trigger branches in packages workflow

The triggering branches in the packages workflow have been updated. Previously, only changes in the 'main' and 'feature/dispatch-channels' would trigger the workflow, now any 'feature/*' branch will. This will cause more frequent and comprehensive testing.

* Add 'patch/*' to workflow trigger branches

This update adds 'patch/*' to the list of branches in .github/workflows/packages.yml that can trigger the workflow. It will allow the workflow to be initiated not just for main and feature branches, but also for patches.

* Add 'preview/*' to workflow triggers

This commit adds a new trigger for the GitHub Actions workflow. It now also responds to push events on 'preview/*' branches, allowing for automated testing and building of these preview branches.

* Update branch name extraction in GitHub Actions

The extraction of the branch name has been slightly modified in the packages.yml GitHub workflow file. This alteration ensures the correct branch name is obtained for further processing within the workflow without any discrepancy.

* Update branch name extraction in packages.yml

Corrected the syntax for extracting the branch name within the packages.yml github workflow file. Added an extra line to print out the ref which might be useful for debugging.

* Update branch name extraction in GitHub workflow

The commit simplifies the way the branch name is being extracted from the GitHub ref in the packages.yml workflow file. The new method employs straightforward string manipulation, making it easier to understand and debug in case of potential issues.

* Add extraction of branch name in workflow

Added a new line in the GitHub workflow file (.github/workflows/packages.yml) to extract the last part after the final slash from the branch name. This enhancement allows cleaner naming conventions, especially in cases where branches are named feature/issue-123, as it will only retain 'issue-123'.

* Update package naming in Github workflow

The Github workflow has been updated to handle package naming more effectively. Previously, the branch name was used directly for package versioning. Now, the last part of the branch name is extracted and used as the package prefix. If the branch name is "main", the package prefix is set to "preview".

* Move and add environment variable assignments

The placement of the assignment for BRANCH_NAME environment variable was moved for better readability. Additionally, the PACKAGE_PREFIX environment variable was also added. These environment variables are crucial for subsequent steps in the GitHub workflow.

* Add workflow dispatch validation and response handling

Removed several specific dispatch response classes and consolidated all types of dispatch responses into a single DispatchWorkflowResponse class. Added a new ValidatingWorkflowDispatcher service to validate dispatch requests before they're sent. Updated several classes to work with these changes, including the BackgroundWorkflowDispatcher, MassTransitWorkflowDispatcher, and the API endpoint class.

* Handle dispatch workflow failures with exceptions

The DispatchWorkflow and BulkDispatchWorkflows activities now throw a FaultException when the dispatch operations fail. Previously, these operations were not checking for success and could fail silently. Now, an unsuccessful dispatch response results in a FaultException with an error message from the dispatch response.
  • Loading branch information
sfmskywalker committed Feb 16, 2024
1 parent ff7c5dc commit ef3431c
Show file tree
Hide file tree
Showing 50 changed files with 558 additions and 190 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http:https://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http:https://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=enums/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
10 changes: 10 additions & 0 deletions src/bundles/Elsa.Server.Web/Enums/MassTransitBroker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace Elsa.Server.Web;

/// <summary>
/// Represents the type of messaging broker used in MassTransit.
/// </summary>
public enum MassTransitBroker
{
AzureServiceBus,
RabbitMq
}
11 changes: 6 additions & 5 deletions src/bundles/Elsa.Server.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Elsa.MongoDb.Modules.Identity;
using Elsa.MongoDb.Modules.Management;
using Elsa.MongoDb.Modules.Runtime;
using Elsa.Server.Web;
using Elsa.Workflows.Management.Compression;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Options;
Expand All @@ -27,10 +28,9 @@
const bool useProtoActor = false;
const bool useHangfire = false;
const bool useQuartz = true;
const bool useMassTransit = false;
const bool useMassTransitAzureServiceBus = true;
const bool useMassTransitRabbitMq = false;
const bool useMassTransit = true;
const bool useZipCompression = true;
const MassTransitBroker useMassTransitBroker = MassTransitBroker.AzureServiceBus;

var builder = WebApplication.CreateBuilder(args);
var services = builder.Services;
Expand Down Expand Up @@ -142,6 +142,7 @@
runtime.UseMassTransitDispatcher();
}
runtime.WorkflowInboxCleanupOptions = options => configuration.GetSection("Runtime:WorkflowInboxCleanup").Bind(options);
runtime.WorkflowDispatcherOptions = options => configuration.GetSection("Runtime:WorkflowDispatcher").Bind(options);
})
.UseEnvironments(environments => environments.EnvironmentsOptions = options => configuration.GetSection("Environments").Bind(options))
.UseScheduling(scheduling =>
Expand Down Expand Up @@ -210,7 +211,7 @@
{
elsa.UseMassTransit(massTransit =>
{
if (useMassTransitAzureServiceBus)
if (useMassTransitBroker == MassTransitBroker.AzureServiceBus)
{
massTransit.UseAzureServiceBus(azureServiceBusConnectionString, serviceBusFeature => serviceBusFeature.ConfigureServiceBus = bus =>
{
Expand All @@ -222,7 +223,7 @@
});
}
if (useMassTransitRabbitMq)
if (useMassTransitBroker == MassTransitBroker.RabbitMq)
{
massTransit.UseRabbitMq(rabbitMqConnectionString, rabbit => rabbit.ConfigureServiceBus = bus =>
{
Expand Down
13 changes: 13 additions & 0 deletions src/bundles/Elsa.Server.Web/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@
"WorkflowInboxCleanup": {
"SweepInterval": "00:00:10:00",
"BatchSize": 1000
},
"WorkflowDispatcher": {
"Channels": [
{
"Name": "Low"
},
{
"Name": "Medium"
},
{
"Name": "High"
}
]
}
},
"Scripting": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ public async Task DispatchAsync(IEnumerable<RunAlterationsResult> results, Cance
/// <inheritdoc />
public async Task DispatchAsync(RunAlterationsResult result, CancellationToken cancellationToken = default)
{
await _workflowDispatcher.DispatchAsync(new DispatchWorkflowInstanceRequest(result.WorkflowInstanceId), cancellationToken);
await _workflowDispatcher.DispatchAsync(new DispatchWorkflowInstanceRequest(result.WorkflowInstanceId), cancellationToken: cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public override async Task HandleAsync(Request request, CancellationToken cancel
allResults.AddRange(results);

// Schedule updated workflow.
await _workflowDispatcher.DispatchAsync(new DispatchWorkflowInstanceRequest(workflowInstance.Id), cancellationToken);
await _workflowDispatcher.DispatchAsync(new DispatchWorkflowInstanceRequest(workflowInstance.Id), cancellationToken: cancellationToken);
}

// Write response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ public async Task HandleAsync(AlterationJobCompleted notification, CancellationT
return;

// Resume workflow instance.
await _workflowDispatcher.DispatchAsync(new DispatchWorkflowInstanceRequest(job.WorkflowInstanceId), cancellationToken);
await _workflowDispatcher.DispatchAsync(new DispatchWorkflowInstanceRequest(job.WorkflowInstanceId), cancellationToken: cancellationToken);
}
}
2 changes: 1 addition & 1 deletion src/modules/Elsa.Hangfire/Jobs/ResumeWorkflowJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public ResumeWorkflowJob(IWorkflowDispatcher workflowDispatcher)
/// <param name="name">The name of the job.</param>
/// <param name="request">The workflow request.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public async Task ExecuteAsync(string name, DispatchWorkflowInstanceRequest request, CancellationToken cancellationToken) => await _workflowDispatcher.DispatchAsync(request, cancellationToken);
public async Task ExecuteAsync(string name, DispatchWorkflowInstanceRequest request, CancellationToken cancellationToken) => await _workflowDispatcher.DispatchAsync(request, cancellationToken: cancellationToken);
}
2 changes: 1 addition & 1 deletion src/modules/Elsa.Hangfire/Jobs/RunWorkflowJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public RunWorkflowJob(IWorkflowDispatcher workflowDispatcher)
/// <param name="name">The name of the job.</param>
/// <param name="request">The workflow request.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public async Task ExecuteAsync(string name, DispatchWorkflowDefinitionRequest request, CancellationToken cancellationToken) => await _workflowDispatcher.DispatchAsync(request, cancellationToken);
public async Task ExecuteAsync(string name, DispatchWorkflowDefinitionRequest request, CancellationToken cancellationToken) => await _workflowDispatcher.DispatchAsync(request, cancellationToken: cancellationToken);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using Elsa.Features.Abstractions;
using Elsa.Features.Attributes;
using Elsa.Features.Services;
using Elsa.MassTransit.Extensions;
using Elsa.MassTransit.Features;
using Elsa.MassTransit.Messages;
using Elsa.Workflows.Runtime.Activities;
using MassTransit;

namespace Elsa.MassTransit.AzureServiceBus.Features;
Expand Down Expand Up @@ -33,14 +36,15 @@ public override void Configure()
{
configure.AddServiceBusMessageScheduler();
configure.UsingAzureServiceBus((context, serviceBus) =>
configure.UsingAzureServiceBus((context, configurator) =>
{
if (ConnectionString != null)
serviceBus.Host(ConnectionString);
configurator.Host(ConnectionString);
serviceBus.UseServiceBusMessageScheduler();
ConfigureServiceBus?.Invoke(serviceBus);
serviceBus.ConfigureEndpoints(context, new KebabCaseEndpointNameFormatter("Elsa", false));
configurator.UseServiceBusMessageScheduler();
configurator.SetupWorkflowDispatcherEndpoints(context);
ConfigureServiceBus?.Invoke(configurator);
configurator.ConfigureEndpoints(context, new KebabCaseEndpointNameFormatter("Elsa", false));
});
};
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using Elsa.Features.Abstractions;
using Elsa.Features.Attributes;
using Elsa.Features.Services;
using Elsa.MassTransit.Consumers;
using Elsa.MassTransit.Extensions;
using Elsa.MassTransit.Features;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -21,9 +23,8 @@ public RabbitMqServiceBusFeature(IModule module) : base(module)
/// A RabbitMQ connection string.
public string? ConnectionString { get; set; }

/// Configures the RabbitMQ transport options.
public Action<RabbitMqTransportOptions>? TransportOptions { get; set; }

/// <summary>
/// Configures the RabbitMQ bus.
/// </summary>
Expand All @@ -34,14 +35,15 @@ public override void Configure()
{
Module.Configure<MassTransitFeature>().BusConfigurator = configure =>
{
configure.UsingRabbitMq((context, serviceBus) =>
configure.UsingRabbitMq((context, configurator) =>
{
if (!string.IsNullOrEmpty(ConnectionString))
serviceBus.Host(ConnectionString);
configurator.Host(ConnectionString);
ConfigureServiceBus?.Invoke(serviceBus);
ConfigureServiceBus?.Invoke(configurator);
serviceBus.ConfigureEndpoints(context, new KebabCaseEndpointNameFormatter("Elsa", false));
configurator.SetupWorkflowDispatcherEndpoints(context);
configurator.ConfigureEndpoints(context, new KebabCaseEndpointNameFormatter("Elsa", false));
});
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/modules/Elsa.MassTransit/Activities/MessageReceived.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.Runtime.CompilerServices;
using Elsa.Expressions.Models;
using Elsa.Extensions;
using Elsa.MassTransit.Implementations;
using Elsa.MassTransit.Services;
using Elsa.Workflows;

namespace Elsa.MassTransit.Activities;
Expand Down
2 changes: 1 addition & 1 deletion src/modules/Elsa.MassTransit/Activities/PublishMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.Runtime.CompilerServices;
using Elsa.Expressions.Helpers;
using Elsa.Extensions;
using Elsa.MassTransit.Implementations;
using Elsa.MassTransit.Services;
using Elsa.Workflows;
using Elsa.Workflows.Attributes;
using Elsa.Workflows.Models;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
using Elsa.MassTransit.Messages;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Options;
using JetBrains.Annotations;
using MassTransit;

namespace Elsa.MassTransit.Consumers;

/// <summary>
/// A consumer of various dispatch message types to asynchronously execute workflows.
/// </summary>
[UsedImplicitly]
public class DispatchWorkflowRequestConsumer :
IConsumer<DispatchWorkflowDefinition>,
IConsumer<DispatchWorkflowInstance>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ public async Task Consume(ConsumeContext<T> context)
CorrelationId = correlationId,
Input = input
};
await _workflowRuntime.DispatchAsync(request, cancellationToken);
await _workflowRuntime.DispatchAsync(request, cancellationToken: cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace Elsa.MassTransit.Contracts;

/// <summary>
/// Represents a contract for formatting channel queue names.
/// </summary>
public interface IEndpointChannelFormatter
{
/// <summary>
/// Formats the queue name based on the provided channel name.
/// </summary>
/// <param name="channelName">The channel name to be formatted.</param>
/// <returns>The formatted queue name.</returns>
string FormatEndpointName(string? channelName = default);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using Elsa.Features.Services;
using Elsa.MassTransit.Features;
using Elsa.MassTransit.Implementations;
using Elsa.MassTransit.Models;
using Elsa.MassTransit.Services;
using MassTransit;

// ReSharper disable once CheckNamespace
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using Elsa.MassTransit.Consumers;
using Elsa.MassTransit.Contracts;
using Elsa.Workflows.Runtime.Options;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;

namespace Elsa.MassTransit.Extensions;

/// <summary>
/// Provides extension methods for configuring service bus channel endpoints.
/// </summary>
public static class ServiceBusChannelsConfigurationExtensions
{
/// <summary>
/// Sets up channel endpoints for receiving workflow dispatch requests.
/// </summary>
/// <param name="configurator">The MassTransit receive configurator.</param>
/// <param name="context">The MassTransit bus registration context.</param>
public static void SetupWorkflowDispatcherEndpoints(this IReceiveConfigurator configurator, IBusRegistrationContext context)
{
var dispatcherOptions = context.GetRequiredService<IOptions<WorkflowDispatcherOptions>>();
var formatter = context.GetRequiredService<IEndpointChannelFormatter>();
var channelDescriptors = dispatcherOptions.Value.Channels;
var defaultEndpointName = formatter.FormatEndpointName();

var endpointNames = new List<string>
{
defaultEndpointName
};

endpointNames.AddRange(channelDescriptors.Select(x => formatter.FormatEndpointName(x.Name)));

foreach (string endpointName in endpointNames)
{
configurator.ReceiveEndpoint(endpointName, endpoint =>
{
endpoint.ConfigureConsumer<DispatchWorkflowRequestConsumer>(context);
});
}
}
}
22 changes: 14 additions & 8 deletions src/modules/Elsa.MassTransit/Features/MassTransitFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
using Elsa.Features.Abstractions;
using Elsa.Features.Services;
using Elsa.MassTransit.Consumers;
using Elsa.MassTransit.Implementations;
using Elsa.MassTransit.Extensions;
using Elsa.MassTransit.Models;
using Elsa.MassTransit.Options;
using Elsa.MassTransit.Services;
using Elsa.Workflows.Attributes;
using Elsa.Workflows.Management.Models;
using Elsa.Workflows.Management.Options;
Expand Down Expand Up @@ -49,20 +50,23 @@ public override void Apply()
Services.Configure<MassTransitWorkflowDispatcherOptions>(x => { });
Services.AddActivityProvider<MassTransitActivityTypeProvider>();

var busConfigurator = BusConfigurator ??= configure =>
void Configurator(IBusRegistrationConfigurator configure)
{
configure.UsingInMemory((context, configurator) =>
{
configurator.SetupWorkflowDispatcherEndpoints(context);
configurator.ConfigureEndpoints(context);
if (PrefetchCount != null)
configurator.PrefetchCount = PrefetchCount.Value;
if (PrefetchCount != null) configurator.PrefetchCount = PrefetchCount.Value;
});
};
}

var busConfigurator = BusConfigurator ??= Configurator;
AddMassTransit(busConfigurator);

// Add collected message types to options.
Services.Configure<MassTransitActivityOptions>(options => options.MessageTypes = new HashSet<Type>(messageTypes));
Services.Configure<MassTransitActivityOptions>(
options => options.MessageTypes = new HashSet<Type>(messageTypes));

// Add collected message types as available variable types.
Services.Configure<ManagementOptions>(options =>
Expand All @@ -79,7 +83,8 @@ public override void Apply()
});

// Configure message serializer.
SystemTextJsonMessageSerializer.Options.Converters.Add(new TypeJsonConverter(WellKnownTypeRegistry.CreateDefault()));
SystemTextJsonMessageSerializer.Options.Converters.Add(
new TypeJsonConverter(WellKnownTypeRegistry.CreateDefault()));
}

/// <summary>
Expand All @@ -89,7 +94,8 @@ private void AddMassTransit(Action<IBusRegistrationConfigurator> busConfigurator
{
// For each message type, create a concrete WorkflowMessageConsumer<T>.
var workflowMessageConsumerType = typeof(WorkflowMessageConsumer<>);
var workflowMessageConsumers = this.GetMessages().Select(x => new ConsumerTypeDefinition(workflowMessageConsumerType.MakeGenericType(x)));
var workflowMessageConsumers = this.GetMessages()
.Select(x => new ConsumerTypeDefinition(workflowMessageConsumerType.MakeGenericType(x)));

// Concatenate the manually registered consumers with the workflow message consumers.
var consumerTypeDefinitions = this.GetConsumers().Concat(workflowMessageConsumers).ToArray();
Expand Down
Loading

0 comments on commit ef3431c

Please sign in to comment.