Skip to content

Commit

Permalink
Add option for synchronous broadcast in WorkflowInbox
Browse files Browse the repository at this point in the history
This update introduces a new option to control synchronicity when broadcasting messages in the WorkflowInbox. The 'BroadcastWorkflowInboxMessageOptions' class allows the developer to specify whether the broadcasting will occur synchronously or asynchronously. The update also includes a new Endpoint and Workflow for demonstration and testing of this functionality.
  • Loading branch information
sfmskywalker committed Jan 6, 2024
1 parent 53bd1f9 commit 652a617
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 32 deletions.
27 changes: 27 additions & 0 deletions src/bundles/Elsa.Server.Web/MyEndpoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Elsa.Abstractions;
using Elsa.Workflows.Runtime.Contracts;

namespace Elsa.Server.Web;

public class MyEndpoint : ElsaEndpointWithoutRequest
{
private readonly IEventPublisher _eventPublisher;

public MyEndpoint(IEventPublisher eventPublisher)
{
_eventPublisher = eventPublisher;
}

public override void Configure()
{
Get("/my-event-workflow");
AllowAnonymous();
}

public override async Task HandleAsync(CancellationToken ct)
{
Console.WriteLine("Publishing MyEvent");
var results = await _eventPublisher.PublishAsync("MyEvent", cancellationToken: ct);
Console.WriteLine($"Affected workflows: {results.Count}");
}
}
39 changes: 39 additions & 0 deletions src/bundles/Elsa.Server.Web/MyEventWorkflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using Elsa.Workflows;
using Elsa.Workflows.Activities;
using Elsa.Workflows.Contracts;
using Elsa.Workflows.Runtime.Activities;

namespace Elsa.Server.Web;

public class OnMyEventWorkflow : WorkflowBase
{
protected override void Build(IWorkflowBuilder builder)
{
builder.Version = 1;
builder.Id = "OnMyEventWorkflow";
builder.Root = new Sequence
{
Activities =
{
new Event("MyEvent")
{
CanStartWorkflow = true
},
new Inline(async () =>
{
// IEventPublisher.PublishAsync returns before this executes
await SomeCallAsync();
}),
new WriteLine("End of workflow"),
new Finish()
}
};
}

private async Task SomeCallAsync()
{
Console.WriteLine("Hello from OnMyEventWorkflow");
await Task.Delay(1000);
Console.WriteLine("Goodbye from OnMyEventWorkflow");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ public interface IWorkflowInbox
/// <param name="message">The message to deliver.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
ValueTask<DeliverWorkflowInboxMessageResult> DeliverAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default);

/// <summary>
/// Broadcasts the specified message, which may trigger new workflows and resume existing ones.
/// </summary>
/// <param name="message">The message to broadcast.</param>
/// <param name="options">An optional set of delivery options.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
ValueTask<DeliverWorkflowInboxMessageResult> BroadcastAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default);
ValueTask<DeliverWorkflowInboxMessageResult> BroadcastAsync(WorkflowInboxMessage message, BroadcastWorkflowInboxMessageOptions? options, CancellationToken cancellationToken = default);

/// <summary>
/// Finds all messages matching the specified filter.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using Elsa.Extensions;
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Models;
using Elsa.Workflows.Runtime.Notifications;

namespace Elsa.Workflows.Runtime.Handlers;
Expand All @@ -19,11 +21,16 @@ public ReadWorkflowInboxMessage(IWorkflowInbox workflowInbox)
{
_workflowInbox = workflowInbox;
}

/// <inheritdoc />
public async Task HandleAsync(WorkflowInboxMessageReceived notification, CancellationToken cancellationToken)
{
var message = notification.InboxMessage;
await _workflowInbox.BroadcastAsync(message, cancellationToken);
var options = new BroadcastWorkflowInboxMessageOptions
{
DispatchAsynchronously = notification.Options.DispatchAsynchronously
};
var result = await _workflowInbox.BroadcastAsync(message, options, cancellationToken);
notification.WorkflowExecutionResults.AddRange(result.WorkflowExecutionResults);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Elsa.Workflows.Runtime.Models;

/// <summary>
/// Represents the options for broadcasting a workflow inbox message.
/// </summary>
public class BroadcastWorkflowInboxMessageOptions
{
/// <summary>
/// Gets or sets a value indicating whether the dispatch should be executed asynchronously.
/// </summary>
/// <value>
/// <c>true</c> if the dispatch should be executed asynchronously; otherwise, <c>false</c>.
/// </value>
public bool DispatchAsynchronously { get; set; } = true;
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Runtime.Entities;
using Elsa.Workflows.Runtime.Options;
using Elsa.Workflows.Runtime.Results;

namespace Elsa.Workflows.Runtime.Notifications;

/// <summary>
/// A notification that is sent when a workflow inbox message is received.
/// </summary>
/// <param name="InboxMessage">The inbox message that was received.</param>
public record WorkflowInboxMessageReceived(WorkflowInboxMessage InboxMessage) : INotification;
public record WorkflowInboxMessageReceived(
WorkflowInboxMessage InboxMessage,
WorkflowInboxMessageDeliveryOptions Options,
ICollection<WorkflowExecutionResult> WorkflowExecutionResults) : INotification;
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
using Elsa.Mediator;
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Runtime.Notifications;

namespace Elsa.Workflows.Runtime.Options;

/// <summary>
Expand All @@ -10,7 +6,7 @@ namespace Elsa.Workflows.Runtime.Options;
public class WorkflowInboxMessageDeliveryOptions
{
/// <summary>
/// The strategy to use when publishing the <see cref="WorkflowInboxMessageReceived"/> notification.
/// Whether to dispatch the message to the workflow dispatcher or send immediately.
/// </summary>
public IEventPublishingStrategy EventPublishingStrategy { get; set; } = NotificationStrategy.Background;
public bool DispatchAsynchronously { get; set; } = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ namespace Elsa.Workflows.Runtime.Results;
/// <summary>
/// Result of delivering a workflow inbox message.
/// </summary>
public record DeliverWorkflowInboxMessageResult;
public record DeliverWorkflowInboxMessageResult(ICollection<WorkflowExecutionResult> WorkflowExecutionResults);
69 changes: 53 additions & 16 deletions src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowInbox.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Elsa.Common.Contracts;
using Elsa.Mediator;
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Contracts;
using Elsa.Workflows.Runtime.Contracts;
Expand All @@ -16,6 +17,7 @@ namespace Elsa.Workflows.Runtime.Services;
public class DefaultWorkflowInbox : IWorkflowInbox
{
private readonly IWorkflowDispatcher _workflowDispatcher;
private readonly IWorkflowRuntime _workflowRuntime;
private readonly IWorkflowInboxMessageStore _messageStore;
private readonly INotificationSender _notificationSender;
private readonly ISystemClock _systemClock;
Expand All @@ -27,13 +29,15 @@ public class DefaultWorkflowInbox : IWorkflowInbox
/// </summary>
public DefaultWorkflowInbox(
IWorkflowDispatcher workflowDispatcher,
IWorkflowRuntime workflowRuntime,
IWorkflowInboxMessageStore messageStore,
INotificationSender notificationSender,
ISystemClock systemClock,
IIdentityGenerator identityGenerator,
IBookmarkHasher bookmarkHasher)
{
_workflowDispatcher = workflowDispatcher;
_workflowRuntime = workflowRuntime;
_messageStore = messageStore;
_notificationSender = notificationSender;
_systemClock = systemClock;
Expand Down Expand Up @@ -72,10 +76,9 @@ public async ValueTask<SubmitWorkflowInboxMessageResult> SubmitAsync(NewWorkflow
await _messageStore.SaveAsync(message, cancellationToken);

// Send a notification.
var strategy = options.EventPublishingStrategy;
var workflowExecutionResults = new List<WorkflowExecutionResult>();
var notification = new WorkflowInboxMessageReceived(message);
await _notificationSender.SendAsync(notification, strategy, cancellationToken);
var notification = new WorkflowInboxMessageReceived(message, options, workflowExecutionResults);
await _notificationSender.SendAsync(notification, NotificationStrategy.Sequential, cancellationToken);

// Return the result.
return new SubmitWorkflowInboxMessageResult(message, workflowExecutionResults);
Expand All @@ -84,19 +87,12 @@ public async ValueTask<SubmitWorkflowInboxMessageResult> SubmitAsync(NewWorkflow
/// <inheritdoc />
public async ValueTask<DeliverWorkflowInboxMessageResult> DeliverAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default)
{
await ResumeWorkflowsAsync(message, cancellationToken);
return new DeliverWorkflowInboxMessageResult();
await ResumeWorkflowsAsynchronouslyAsync(message, cancellationToken);
return new DeliverWorkflowInboxMessageResult(new List<WorkflowExecutionResult>());
}

/// <inheritdoc />
public async ValueTask<DeliverWorkflowInboxMessageResult> BroadcastAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default)
{
await TriggerWorkflowsAsync(message, cancellationToken);

return new DeliverWorkflowInboxMessageResult();
}

private async Task TriggerWorkflowsAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default)
public async ValueTask<DeliverWorkflowInboxMessageResult> BroadcastAsync(WorkflowInboxMessage message, BroadcastWorkflowInboxMessageOptions? options, CancellationToken cancellationToken = default)
{
var activityTypeName = message.ActivityTypeName;
var correlationId = message.CorrelationId;
Expand All @@ -107,8 +103,28 @@ private async Task TriggerWorkflowsAsync(WorkflowInboxMessage message, Cancellat

if (workflowInstanceId != null)
{
await ResumeWorkflowsAsync(message, cancellationToken);
return;
if (options?.DispatchAsynchronously == true)
{
await ResumeWorkflowsAsynchronouslyAsync(message, cancellationToken);
return new DeliverWorkflowInboxMessageResult(new List<WorkflowExecutionResult>());
}

var results = await ResumeWorkflowsSynchronouslyAsync(message, cancellationToken);
return new DeliverWorkflowInboxMessageResult(results.ToList());
}

if (options?.DispatchAsynchronously == false)
{
var results = await _workflowRuntime.TriggerWorkflowsAsync(activityTypeName, bookmarkPayload, new TriggerWorkflowsOptions
{
CorrelationId = correlationId,
WorkflowInstanceId = workflowInstanceId,
ActivityInstanceId = activityInstanceId,
Input = input,
CancellationTokens = cancellationToken
});

return new DeliverWorkflowInboxMessageResult(results.TriggeredWorkflows);
}

await _workflowDispatcher.DispatchAsync(new DispatchTriggerWorkflowsRequest(activityTypeName, bookmarkPayload)
Expand All @@ -118,9 +134,11 @@ await _workflowDispatcher.DispatchAsync(new DispatchTriggerWorkflowsRequest(acti
ActivityInstanceId = activityInstanceId,
Input = input
}, cancellationToken);

return new DeliverWorkflowInboxMessageResult(new List<WorkflowExecutionResult>());
}

private async Task ResumeWorkflowsAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default)
private async Task ResumeWorkflowsAsynchronouslyAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default)
{
var activityTypeName = message.ActivityTypeName;
var correlationId = message.CorrelationId;
Expand All @@ -138,6 +156,25 @@ await _workflowDispatcher.DispatchAsync(new DispatchResumeWorkflowsRequest(activ
}, cancellationToken);
}

private async Task<IEnumerable<WorkflowExecutionResult>> ResumeWorkflowsSynchronouslyAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default)
{
var activityTypeName = message.ActivityTypeName;
var correlationId = message.CorrelationId;
var workflowInstanceId = message.WorkflowInstanceId;
var activityInstanceId = message.ActivityInstanceId;
var bookmarkPayload = message.BookmarkPayload;
var input = message.Input;

return await _workflowRuntime.ResumeWorkflowsAsync(activityTypeName, bookmarkPayload, new TriggerWorkflowsOptions
{
CorrelationId = correlationId,
WorkflowInstanceId = workflowInstanceId,
ActivityInstanceId = activityInstanceId,
Input = input,
CancellationTokens = cancellationToken
});
}

/// <inheritdoc />
public async ValueTask<IEnumerable<WorkflowInboxMessage>> FindManyAsync(WorkflowInboxMessageFilter filter, CancellationToken cancellationToken = default)
{
Expand Down
8 changes: 4 additions & 4 deletions src/modules/Elsa.Workflows.Runtime/Services/EventPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public EventPublisher(IWorkflowInbox workflowInbox)
IDictionary<string, object>? input = default,
CancellationToken cancellationToken = default)
{
return await PublishInternalAsync(eventName, NotificationStrategy.Sequential, correlationId, workflowInstanceId, activityInstanceId, input, cancellationToken);
return await PublishInternalAsync(eventName, false, correlationId, workflowInstanceId, activityInstanceId, input, cancellationToken);
}

/// <inheritdoc />
Expand All @@ -43,12 +43,12 @@ public EventPublisher(IWorkflowInbox workflowInbox)
IDictionary<string, object>? input = default,
CancellationToken cancellationToken = default)
{
await PublishInternalAsync(eventName, NotificationStrategy.FireAndForget, correlationId, workflowInstanceId, activityInstanceId, input, cancellationToken);
await PublishInternalAsync(eventName, true, correlationId, workflowInstanceId, activityInstanceId, input, cancellationToken);
}

private async Task<ICollection<WorkflowExecutionResult>> PublishInternalAsync(
string eventName,
IEventPublishingStrategy publishingStrategy,
bool dispatchAsynchronously,
string? correlationId = default,
string? workflowInstanceId = default,
string? activityInstanceId = default,
Expand All @@ -59,7 +59,7 @@ public EventPublisher(IWorkflowInbox workflowInbox)
var message = NewWorkflowInboxMessage.For<Event>(eventBookmark, workflowInstanceId, correlationId, activityInstanceId, input);
var options = new WorkflowInboxMessageDeliveryOptions
{
EventPublishingStrategy = publishingStrategy,
DispatchAsynchronously = dispatchAsynchronously
};

var result = await _workflowInbox.SubmitAsync(message, options, cancellationToken);
Expand Down

0 comments on commit 652a617

Please sign in to comment.