Skip to content

Commit

Permalink
Update workflow handling and simplify codebase
Browse files Browse the repository at this point in the history
Refactored the workflow handling logic and simplified numerous components in the codebase. Major changes include the enhancement of the Workflow Runtime handler, altering the broadcast and deliver functions in the Default Workflow Inbox, and the removal of unnecessary details in the Workflow Runtime feature. Also, unnecessary services and requests were removed for better code efficiency and readability.
  • Loading branch information
sfmskywalker committed Dec 18, 2023
1 parent cc24c5c commit 3aedf27
Show file tree
Hide file tree
Showing 15 changed files with 72 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public override void Configure()
{
Module.Configure<WorkflowRuntimeFeature>(workflowRuntimeFeature =>
{
workflowRuntimeFeature.BackgroundActivityInvoker = sp => sp.GetRequiredService<HangfireBackgroundActivityScheduler>();
workflowRuntimeFeature.BackgroundActivityScheduler = sp => sp.GetRequiredService<HangfireBackgroundActivityScheduler>();
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/modules/Elsa.Http/Services/HttpBookmarkProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class HttpBookmarkProcessor : IHttpBookmarkProcessor
private readonly WorkflowStateMapper _workflowStateMapper;

/// <summary>
/// Constructor.
/// Initializes a new instance of the <see cref="HttpBookmarkProcessor"/> class.
/// </summary>
public HttpBookmarkProcessor(
IWorkflowRuntime workflowRuntime,
Expand Down
1 change: 1 addition & 0 deletions src/modules/Elsa.ProtoActor/Features/ProtoActorFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public override void Apply()
.Setup(ClusterName, clusterProvider, new PartitionIdentityLookup())
.WithHeartbeatExpiration(TimeSpan.FromDays(1))
.WithActorRequestTimeout(TimeSpan.FromHours(1))
.WithActorSpawnVerificationTimeout(TimeSpan.FromHours(1))
.WithActorActivationTimeout(TimeSpan.FromHours(1))
.WithActorSpawnVerificationTimeout(TimeSpan.FromHours(1))
.WithClusterKind(WorkflowInstanceActor.Kind, workflowGrainProps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,7 @@ public async Task ImportWorkflowStateAsync(WorkflowState workflowState, Cancella

await client.ImportState(request, cancellationToken);
}

/// <inheritdoc />
public async Task UpdateBookmarksAsync(UpdateBookmarksRequest request, CancellationToken cancellationToken = default)
{
var instanceId = request.WorkflowExecutionContext.Id;
await RemoveBookmarksAsync(instanceId, request.Diff.Removed, cancellationToken);
await StoreBookmarksAsync(instanceId, request.Diff.Added, request.CorrelationId, cancellationToken);
}


/// <inheritdoc />
public async Task UpdateBookmarkAsync(StoredBookmark bookmark, CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using Elsa.Workflows.Core.Models;
using Elsa.Workflows.Core.State;
using Elsa.Workflows.Runtime.Entities;
using Elsa.Workflows.Runtime.Filters;
Expand Down Expand Up @@ -78,11 +77,6 @@ public interface IWorkflowRuntime
/// </summary>
Task ImportWorkflowStateAsync(WorkflowState workflowState, CancellationToken cancellationToken = default);

/// <summary>
/// Adds and removes bookmarks based on the provided bookmarks diff.
/// </summary>
Task UpdateBookmarksAsync(UpdateBookmarksRequest request, CancellationToken cancellationToken = default);

/// <summary>
/// Updates the specified bookmark.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public WorkflowRuntimeFeature(IModule module) : base(module)
/// A factory that instantiates an <see cref="IWorkflowDispatcher"/>.
/// </summary>
public Func<IServiceProvider, IWorkflowDispatcher> WorkflowDispatcher { get; set; } = sp => ActivatorUtilities.CreateInstance<BackgroundWorkflowDispatcher>(sp);

/// <summary>
/// A factory that instantiates an <see cref="IBookmarkStore"/>.
/// </summary>
Expand All @@ -62,7 +62,7 @@ public WorkflowRuntimeFeature(IModule module) : base(module)
/// A factory that instantiates an <see cref="IWorkflowExecutionLogStore"/>.
/// </summary>
public Func<IServiceProvider, IWorkflowExecutionLogStore> WorkflowExecutionLogStore { get; set; } = sp => sp.GetRequiredService<MemoryWorkflowExecutionLogStore>();

/// <summary>
/// A factory that instantiates an <see cref="IActivityExecutionStore"/>.
/// </summary>
Expand All @@ -86,13 +86,13 @@ public WorkflowRuntimeFeature(IModule module) : base(module)
/// <summary>
/// A factory that instantiates an <see cref="IBackgroundActivityScheduler"/>.
/// </summary>
public Func<IServiceProvider, IBackgroundActivityScheduler> BackgroundActivityInvoker { get; set; } = sp => ActivatorUtilities.CreateInstance<LocalBackgroundActivityScheduler>(sp);
public Func<IServiceProvider, IBackgroundActivityScheduler> BackgroundActivityScheduler { get; set; } = sp => ActivatorUtilities.CreateInstance<LocalBackgroundActivityScheduler>(sp);

/// <summary>
/// A delegate to configure the <see cref="DistributedLockingOptions"/>.
/// </summary>
public Action<DistributedLockingOptions> DistributedLockingOptions { get; set; } = _ => { };

/// <summary>
/// A delegate to configure the <see cref="WorkflowInboxCleanupOptions"/>.
/// </summary>
Expand All @@ -106,7 +106,7 @@ public WorkflowRuntimeFeature(IModule module) : base(module)
Workflows.Add<T>();
return this;
}

/// <summary>
/// Register all workflows in the specified assembly.
/// </summary>
Expand All @@ -115,10 +115,10 @@ public WorkflowRuntimeFeature AddWorkflowsFrom(Assembly assembly)
var workflowTypes = assembly.GetExportedTypes()
.Where(x => typeof(IWorkflow).IsAssignableFrom(x) && x is { IsAbstract: false, IsInterface: false, IsGenericType: false })
.ToList();

foreach (var workflowType in workflowTypes)
Workflows.Add(workflowType);

return this;
}

Expand Down Expand Up @@ -158,7 +158,7 @@ public override void Apply()
.AddSingleton(ActivityExecutionLogStore)
.AddSingleton(WorkflowInboxStore)
.AddSingleton(RunTaskDispatcher)
.AddSingleton(BackgroundActivityInvoker)
.AddSingleton(BackgroundActivityScheduler)
.AddSingleton<IBookmarkManager, DefaultBookmarkManager>()
.AddSingleton<IActivityExecutionManager, DefaultActivityExecutionManager>()
.AddSingleton<IActivityExecutionStatsService, ActivityExecutionStatsService>()
Expand All @@ -170,15 +170,17 @@ public override void Apply()
.AddSingleton<BackgroundTaskDispatcher>()
.AddSingleton<IEventPublisher, EventPublisher>()
.AddSingleton<IWorkflowInbox, DefaultWorkflowInbox>()

.AddSingleton<IBookmarkUpdater, BookmarkUpdater>()
.AddSingleton<IBookmarksPersister, BookmarksPersister>()

// Lazy services.
.AddSingleton<Func<IEnumerable<IWorkflowProvider>>>(sp => sp.GetServices<IWorkflowProvider>)
.AddSingleton<Func<IEnumerable<IWorkflowMaterializer>>>(sp => sp.GetServices<IWorkflowMaterializer>)

// Noop stores.
.AddSingleton<MemoryWorkflowExecutionLogStore>()
.AddSingleton<MemoryActivityExecutionStore>()

// Memory stores.
.AddMemoryStore<StoredBookmark, MemoryBookmarkStore>()
.AddMemoryStore<StoredTrigger, MemoryTriggerStore>()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using Elsa.Extensions;
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Notifications;
Expand All @@ -25,7 +24,6 @@ public ReadWorkflowInboxMessage(IWorkflowInbox workflowInbox)
public async Task HandleAsync(WorkflowInboxMessageReceived notification, CancellationToken cancellationToken)
{
var message = notification.InboxMessage;
var result = await _workflowInbox.BroadcastAsync(message, cancellationToken);
notification.WorkflowExecutionResults.AddRange(result.WorkflowExecutionResults);
await _workflowInbox.BroadcastAsync(message, cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ public async Task HandleAsync(WorkflowExecuted notification, CancellationToken c

var bookmark = new BulkDispatchWorkflowsBookmark(parentInstanceId);
var activityTypeName = ActivityTypeNameHelper.GenerateTypeName<BulkDispatchWorkflows>();
var workflowInstanceId = workflowState.Id;
var input = new Dictionary<string, object>
{
["WorkflowOutput"] = workflowState.Output,
["WorkflowInstanceId"] = workflowState.Id,
["WorkflowInstanceId"] = workflowInstanceId,
["WorkflowStatus"] = workflowState.Status,
["WorkflowSubStatus"] = workflowState.SubStatus,
};
Expand All @@ -42,6 +43,7 @@ public async Task HandleAsync(WorkflowExecuted notification, CancellationToken c
ActivityTypeName = activityTypeName,
Input = input,
BookmarkPayload = bookmark,
WorkflowInstanceId = parentInstanceId,
};

await workflowInbox.SubmitAsync(message, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
using Elsa.Mediator;
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Core;
using Elsa.Workflows.Core.Helpers;
using Elsa.Workflows.Core.Pipelines.WorkflowExecution;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Notifications;
using Elsa.Workflows.Runtime.Requests;

namespace Elsa.Workflows.Runtime.Middleware.Workflows;

Expand All @@ -14,47 +10,21 @@ namespace Elsa.Workflows.Runtime.Middleware.Workflows;
/// </summary>
public class PersistBookmarkMiddleware : WorkflowExecutionMiddleware
{
private readonly IWorkflowRuntime _workflowRuntime;
private readonly INotificationSender _notificationSender;
private readonly IBookmarksPersister _bookmarksPersister;

/// <inheritdoc />
public PersistBookmarkMiddleware(WorkflowMiddlewareDelegate next, IWorkflowRuntime workflowRuntime, INotificationSender notificationSender) : base(next)
public PersistBookmarkMiddleware(WorkflowMiddlewareDelegate next, IBookmarksPersister bookmarksPersister) : base(next)
{
_workflowRuntime = workflowRuntime;
_notificationSender = notificationSender;
_bookmarksPersister = bookmarksPersister;
}

/// <inheritdoc />
public override async ValueTask InvokeAsync(WorkflowExecutionContext context)
{
var cancellationToken = context.CancellationTokens.SystemCancellationToken;

// Get current bookmarks.
var originalBookmarks = context.Bookmarks.ToList();

// Invoke next middleware.
await Next(context);

// Get new bookmarks.
var updatedBookmarks = context.Bookmarks.ToList();

// Get a diff.
var diff = Diff.For(originalBookmarks, updatedBookmarks);

// Update bookmarks.
var updateBookmarksContext = new UpdateBookmarksRequest(context, diff, context.CorrelationId);
await _workflowRuntime.UpdateBookmarksAsync(updateBookmarksContext, cancellationToken);

// Publish domain event.
await _notificationSender.SendAsync(new WorkflowBookmarksIndexed(context, new IndexedWorkflowBookmarks(context.Id, diff.Added, diff.Removed, diff.Unchanged)), cancellationToken);

// Notify all interested activities that the bookmarks have been persisted.
var activityExecutionContexts = context.ActivityExecutionContexts.Where(x => x.Activity is IBookmarksPersistedHandler && x.Bookmarks.Any()).ToList();

foreach (var activityExecutionContext in activityExecutionContexts)
await ((IBookmarksPersistedHandler)activityExecutionContext.Activity).BookmarksPersistedAsync(activityExecutionContext);

// Publish domain event.
await _notificationSender.SendAsync(new WorkflowBookmarksPersisted(context, diff), NotificationStrategy.Background, cancellationToken);
await _bookmarksPersister.PersistBookmarksAsync(context, diff);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Runtime.Entities;
using Elsa.Workflows.Runtime.Results;

namespace Elsa.Workflows.Runtime.Notifications;

/// <summary>
/// A notification that is sent when a workflow inbox message is received.
/// </summary>
/// <param name="InboxMessage">The inbox message that was received.</param>
/// <param name="WorkflowExecutionResults">Contains workflow execution results</param>
public record WorkflowInboxMessageReceived(WorkflowInboxMessage InboxMessage, ICollection<WorkflowExecutionResult> WorkflowExecutionResults) : INotification;
public record WorkflowInboxMessageReceived(WorkflowInboxMessage InboxMessage) : INotification;
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Elsa.Workflows.Runtime.Requests;
/// <summary>
/// Published when bookmarks needs to be updated.
/// </summary>
/// <param name="WorkflowExecutionContext">The workflow execution context.</param>
/// <param name="WorkflowInstanceId">The workflow instance ID.</param>
/// <param name="Diff">A diff of the bookmarks.</param>
/// <param name="CorrelationId">The correlation ID, if any.</param>
public record UpdateBookmarksRequest(WorkflowExecutionContext WorkflowExecutionContext, Diff<Bookmark> Diff, string? CorrelationId);
public record UpdateBookmarksRequest(string WorkflowInstanceId, Diff<Bookmark> Diff, string? CorrelationId = default);
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
namespace Elsa.Workflows.Runtime.Results;

public record DeliverWorkflowInboxMessageResult(ICollection<WorkflowExecutionResult> WorkflowExecutionResults);
/// <summary>
/// Result of delivering a workflow inbox message.
/// </summary>
public record DeliverWorkflowInboxMessageResult;
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using Elsa.Expressions.Contracts;
using Elsa.Workflows.Management.Contracts;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.HostedServices;
Expand Down
Loading

0 comments on commit 3aedf27

Please sign in to comment.