Skip to content

Commit

Permalink
Fix consumption of event payload (elsa-workflows#4909)
Browse files Browse the repository at this point in the history
The 'input' parameter in 'PublishAsync', 'DispatchAsync' and related methods across multiple classes has been replaced with 'payload'. The renaming of 'input' to 'payload' provides a more intuitive understanding of the parameter as the actual content or data of the event being published or dispatched. It also includes changes in 'PublishInternalAsync' where the workflow input dictionary is now being populated with the payload.
  • Loading branch information
sfmskywalker committed Feb 9, 2024
1 parent cfb6fa5 commit d4e1cf0
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 14 deletions.
1 change: 0 additions & 1 deletion src/bundles/Elsa.Server.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
.AddWorkflowsFrom<Program>()
.UseFluentStorageProvider()
.UseFileStorage()
// .UseFileStorage(sp => StorageFactory.Blobs.AzureBlobStorageWithSas(configuration.GetConnectionString("AzureStorageSasUrl")))
.UseIdentity(identity =>
{
if (useMongoDb)
Expand Down
4 changes: 4 additions & 0 deletions src/modules/Elsa.Workflows.Runtime/Activities/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ namespace Elsa.Workflows.Runtime.Activities;
[UsedImplicitly]
public class Event : Trigger<object?>
{
internal const string EventPayloadWorkflowInputKey = "__EventPayloadWorkflowInput";

/// <inheritdoc />
internal Event([CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) : base(source, line)
{
Expand Down Expand Up @@ -79,6 +81,8 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context
return;
}

var input = context.GetWorkflowInput<object?>(EventPayloadWorkflowInputKey);
context.SetResult(input);
await context.CompleteActivityAsync();
}
}
10 changes: 5 additions & 5 deletions src/modules/Elsa.Workflows.Runtime/Activities/PublishEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public PublishEvent([CallerFilePath] string? source = default, [CallerLineNumber
/// <summary>
/// The input to send as the event body.
/// </summary>
[Input(Description = "The input to send as the event body.")]
public Input<IDictionary<string, object>?> Input { get; set; } = default!;
[Input(Description = "The payload to send as the event body.")]
public Input<object> Payload { get; set; } = default!;

/// <inheritdoc />
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
Expand All @@ -50,10 +50,10 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context
var correlationId = CorrelationId.GetOrDefault(context);
var isLocalEvent = IsLocalEvent.GetOrDefault(context);
var workflowInstanceId = isLocalEvent ? context.WorkflowExecutionContext.Id : default;
var input = Input.GetOrDefault(context);
var payload = Payload.GetOrDefault(context);
var publisher = context.GetRequiredService<IEventPublisher>();

await publisher.DispatchAsync(eventName, correlationId, workflowInstanceId, null, input, context.CancellationToken);
await publisher.DispatchAsync(eventName, correlationId, workflowInstanceId, null, payload, context.CancellationToken);
await context.CompleteActivityAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ public interface IEventPublisher
/// <summary>
/// Synchronously publishes the specified event using the workflow runtime, effectively triggering all <see cref="Event"/> activities matching the provided event name.
/// </summary>
Task<ICollection<WorkflowExecutionResult>> PublishAsync(string eventName, string? correlationId = default, string? workflowInstanceId = default, string? activityInstanceId = default, IDictionary<string, object>? input = default, CancellationToken cancellationToken = default);
Task<ICollection<WorkflowExecutionResult>> PublishAsync(string eventName, string? correlationId = default, string? workflowInstanceId = default, string? activityInstanceId = default, object? payload = default, CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously publishes the specified event using the workflow dispatcher.
/// </summary>
Task DispatchAsync(string eventName, string? correlationId = default, string? workflowInstanceId = default, string? activityInstanceId = default, IDictionary<string, object>? input = default, CancellationToken cancellationToken = default);
Task DispatchAsync(string eventName, string? correlationId = default, string? workflowInstanceId = default, string? activityInstanceId = default, object? payload = default, CancellationToken cancellationToken = default);
}
16 changes: 10 additions & 6 deletions src/modules/Elsa.Workflows.Runtime/Services/EventPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ public EventPublisher(IWorkflowInbox workflowInbox)
string? correlationId = default,
string? workflowInstanceId = default,
string? activityInstanceId = default,
IDictionary<string, object>? input = default,
object? payload = default,
CancellationToken cancellationToken = default)
{
return await PublishInternalAsync(eventName, false, correlationId, workflowInstanceId, activityInstanceId, input, cancellationToken);
return await PublishInternalAsync(eventName, false, correlationId, workflowInstanceId, activityInstanceId, payload, cancellationToken);
}

/// <inheritdoc />
Expand All @@ -40,10 +40,10 @@ public EventPublisher(IWorkflowInbox workflowInbox)
string? correlationId = default,
string? workflowInstanceId = default,
string? activityInstanceId = default,
IDictionary<string, object>? input = default,
object? payload = default,
CancellationToken cancellationToken = default)
{
await PublishInternalAsync(eventName, true, correlationId, workflowInstanceId, activityInstanceId, input, cancellationToken);
await PublishInternalAsync(eventName, true, correlationId, workflowInstanceId, activityInstanceId, payload, cancellationToken);
}

private async Task<ICollection<WorkflowExecutionResult>> PublishInternalAsync(
Expand All @@ -52,11 +52,15 @@ public EventPublisher(IWorkflowInbox workflowInbox)
string? correlationId = default,
string? workflowInstanceId = default,
string? activityInstanceId = default,
IDictionary<string, object>? input = default,
object? payload = default,
CancellationToken cancellationToken = default)
{
var eventBookmark = new EventBookmarkPayload(eventName);
var message = NewWorkflowInboxMessage.For<Event>(eventBookmark, workflowInstanceId, correlationId, activityInstanceId, input);
var workflowInput = new Dictionary<string, object>
{
[Event.EventPayloadWorkflowInputKey] = payload ?? new Dictionary<string, object>()
};
var message = NewWorkflowInboxMessage.For<Event>(eventBookmark, workflowInstanceId, correlationId, activityInstanceId, workflowInput);
var options = new WorkflowInboxMessageDeliveryOptions
{
DispatchAsynchronously = dispatchAsynchronously
Expand Down

0 comments on commit d4e1cf0

Please sign in to comment.