From d4e1cf0985e3bd28d88ad99d8fd89df68fe05450 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Fri, 9 Feb 2024 08:20:54 +0100 Subject: [PATCH] Fix consumption of event payload (#4909) The 'input' parameter in 'PublishAsync', 'DispatchAsync' and related methods across multiple classes has been replaced with 'payload'. The renaming of 'input' to 'payload' provides a more intuitive understanding of the parameter as the actual content or data of the event being published or dispatched. It also includes changes in 'PublishInternalAsync' where the workflow input dictionary is now being populated with the payload. --- src/bundles/Elsa.Server.Web/Program.cs | 1 - .../Elsa.Workflows.Runtime/Activities/Event.cs | 4 ++++ .../Activities/PublishEvent.cs | 10 +++++----- .../Contracts/IEventPublisher.cs | 4 ++-- .../Services/EventPublisher.cs | 16 ++++++++++------ 5 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/bundles/Elsa.Server.Web/Program.cs b/src/bundles/Elsa.Server.Web/Program.cs index 64b000aa79..78f1468e53 100644 --- a/src/bundles/Elsa.Server.Web/Program.cs +++ b/src/bundles/Elsa.Server.Web/Program.cs @@ -71,7 +71,6 @@ .AddWorkflowsFrom() .UseFluentStorageProvider() .UseFileStorage() - // .UseFileStorage(sp => StorageFactory.Blobs.AzureBlobStorageWithSas(configuration.GetConnectionString("AzureStorageSasUrl"))) .UseIdentity(identity => { if (useMongoDb) diff --git a/src/modules/Elsa.Workflows.Runtime/Activities/Event.cs b/src/modules/Elsa.Workflows.Runtime/Activities/Event.cs index 86df0ee5c1..870e8c40c3 100644 --- a/src/modules/Elsa.Workflows.Runtime/Activities/Event.cs +++ b/src/modules/Elsa.Workflows.Runtime/Activities/Event.cs @@ -16,6 +16,8 @@ namespace Elsa.Workflows.Runtime.Activities; [UsedImplicitly] public class Event : Trigger { + internal const string EventPayloadWorkflowInputKey = "__EventPayloadWorkflowInput"; + /// internal Event([CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) : base(source, line) { @@ -79,6 +81,8 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context return; } + var input = context.GetWorkflowInput(EventPayloadWorkflowInputKey); + context.SetResult(input); await context.CompleteActivityAsync(); } } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Activities/PublishEvent.cs b/src/modules/Elsa.Workflows.Runtime/Activities/PublishEvent.cs index 1f6a46028f..ef3d0415ed 100644 --- a/src/modules/Elsa.Workflows.Runtime/Activities/PublishEvent.cs +++ b/src/modules/Elsa.Workflows.Runtime/Activities/PublishEvent.cs @@ -40,8 +40,8 @@ public PublishEvent([CallerFilePath] string? source = default, [CallerLineNumber /// /// The input to send as the event body. /// - [Input(Description = "The input to send as the event body.")] - public Input?> Input { get; set; } = default!; + [Input(Description = "The payload to send as the event body.")] + public Input Payload { get; set; } = default!; /// protected override async ValueTask ExecuteAsync(ActivityExecutionContext context) @@ -50,10 +50,10 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context var correlationId = CorrelationId.GetOrDefault(context); var isLocalEvent = IsLocalEvent.GetOrDefault(context); var workflowInstanceId = isLocalEvent ? context.WorkflowExecutionContext.Id : default; - var input = Input.GetOrDefault(context); + var payload = Payload.GetOrDefault(context); var publisher = context.GetRequiredService(); - - await publisher.DispatchAsync(eventName, correlationId, workflowInstanceId, null, input, context.CancellationToken); + + await publisher.DispatchAsync(eventName, correlationId, workflowInstanceId, null, payload, context.CancellationToken); await context.CompleteActivityAsync(); } } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Contracts/IEventPublisher.cs b/src/modules/Elsa.Workflows.Runtime/Contracts/IEventPublisher.cs index bb9be51a37..f11feacbe9 100644 --- a/src/modules/Elsa.Workflows.Runtime/Contracts/IEventPublisher.cs +++ b/src/modules/Elsa.Workflows.Runtime/Contracts/IEventPublisher.cs @@ -11,10 +11,10 @@ public interface IEventPublisher /// /// Synchronously publishes the specified event using the workflow runtime, effectively triggering all activities matching the provided event name. /// - Task> PublishAsync(string eventName, string? correlationId = default, string? workflowInstanceId = default, string? activityInstanceId = default, IDictionary? input = default, CancellationToken cancellationToken = default); + Task> PublishAsync(string eventName, string? correlationId = default, string? workflowInstanceId = default, string? activityInstanceId = default, object? payload = default, CancellationToken cancellationToken = default); /// /// Asynchronously publishes the specified event using the workflow dispatcher. /// - Task DispatchAsync(string eventName, string? correlationId = default, string? workflowInstanceId = default, string? activityInstanceId = default, IDictionary? input = default, CancellationToken cancellationToken = default); + Task DispatchAsync(string eventName, string? correlationId = default, string? workflowInstanceId = default, string? activityInstanceId = default, object? payload = default, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Services/EventPublisher.cs b/src/modules/Elsa.Workflows.Runtime/Services/EventPublisher.cs index 1acd1159ff..44d5c4524a 100644 --- a/src/modules/Elsa.Workflows.Runtime/Services/EventPublisher.cs +++ b/src/modules/Elsa.Workflows.Runtime/Services/EventPublisher.cs @@ -28,10 +28,10 @@ public EventPublisher(IWorkflowInbox workflowInbox) string? correlationId = default, string? workflowInstanceId = default, string? activityInstanceId = default, - IDictionary? input = default, + object? payload = default, CancellationToken cancellationToken = default) { - return await PublishInternalAsync(eventName, false, correlationId, workflowInstanceId, activityInstanceId, input, cancellationToken); + return await PublishInternalAsync(eventName, false, correlationId, workflowInstanceId, activityInstanceId, payload, cancellationToken); } /// @@ -40,10 +40,10 @@ public EventPublisher(IWorkflowInbox workflowInbox) string? correlationId = default, string? workflowInstanceId = default, string? activityInstanceId = default, - IDictionary? input = default, + object? payload = default, CancellationToken cancellationToken = default) { - await PublishInternalAsync(eventName, true, correlationId, workflowInstanceId, activityInstanceId, input, cancellationToken); + await PublishInternalAsync(eventName, true, correlationId, workflowInstanceId, activityInstanceId, payload, cancellationToken); } private async Task> PublishInternalAsync( @@ -52,11 +52,15 @@ public EventPublisher(IWorkflowInbox workflowInbox) string? correlationId = default, string? workflowInstanceId = default, string? activityInstanceId = default, - IDictionary? input = default, + object? payload = default, CancellationToken cancellationToken = default) { var eventBookmark = new EventBookmarkPayload(eventName); - var message = NewWorkflowInboxMessage.For(eventBookmark, workflowInstanceId, correlationId, activityInstanceId, input); + var workflowInput = new Dictionary + { + [Event.EventPayloadWorkflowInputKey] = payload ?? new Dictionary() + }; + var message = NewWorkflowInboxMessage.For(eventBookmark, workflowInstanceId, correlationId, activityInstanceId, workflowInput); var options = new WorkflowInboxMessageDeliveryOptions { DispatchAsynchronously = dispatchAsynchronously