From 8002f20e50f803f9cc5b094afade1cbd65ff767e Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Wed, 3 Jan 2024 09:30:24 +0100 Subject: [PATCH 01/23] Stash --- src/bundles/Elsa.Server.Web/Program.cs | 2 +- .../Extensions/ModuleExtensions.cs | 4 +++- .../Contracts/IWorkflowRuntime.cs | 5 +++++ .../DefaultBackgroundActivityInvoker.cs | 4 +++- .../Services/DefaultWorkflowRuntime.cs | 19 +++++++++++++------ 5 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/bundles/Elsa.Server.Web/Program.cs b/src/bundles/Elsa.Server.Web/Program.cs index 229c9380f3..7df6fea323 100644 --- a/src/bundles/Elsa.Server.Web/Program.cs +++ b/src/bundles/Elsa.Server.Web/Program.cs @@ -24,7 +24,7 @@ const bool useMongoDb = false; const bool useSqlServer = false; const bool useDapper = false; -const bool useProtoActor = true; +const bool useProtoActor = false; const bool useHangfire = false; const bool useQuartz = true; const bool useMassTransit = true; diff --git a/src/modules/Elsa.Workflows.Core/Extensions/ModuleExtensions.cs b/src/modules/Elsa.Workflows.Core/Extensions/ModuleExtensions.cs index 70f989e5b8..a591ccba71 100644 --- a/src/modules/Elsa.Workflows.Core/Extensions/ModuleExtensions.cs +++ b/src/modules/Elsa.Workflows.Core/Extensions/ModuleExtensions.cs @@ -16,6 +16,8 @@ public static IModule UseWorkflows(this IModule configuration, Action(this IServiceCollection services) where T : class, IStorageDriver { - return services.AddSingleton(); + return services + .AddSingleton() + .AddSingleton(); } } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Contracts/IWorkflowRuntime.cs b/src/modules/Elsa.Workflows.Runtime/Contracts/IWorkflowRuntime.cs index 7464813799..3421c4cb63 100644 --- a/src/modules/Elsa.Workflows.Runtime/Contracts/IWorkflowRuntime.cs +++ b/src/modules/Elsa.Workflows.Runtime/Contracts/IWorkflowRuntime.cs @@ -88,4 +88,9 @@ public interface IWorkflowRuntime /// Counts the number of workflow instances based on the provided query args. /// Task CountRunningWorkflowsAsync(CountRunningWorkflowsRequest request, CancellationToken cancellationToken = default); + + /// + /// Merges the specified workflow state into the workflow runtime. + /// + Task MergeWorkflowStateAsync(WorkflowState workflowState, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs b/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs index bc40b44731..55d0f86971 100644 --- a/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs +++ b/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs @@ -57,6 +57,7 @@ public DefaultBackgroundActivityInvoker( public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundActivity, CancellationToken cancellationToken = default) { var workflowInstanceId = scheduledBackgroundActivity.WorkflowInstanceId; + var workflowState = await _workflowRuntime.ExportWorkflowStateAsync(workflowInstanceId, cancellationToken); if (workflowState == null) @@ -118,7 +119,8 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc // - Bookmarks workflowState = _workflowStateExtractor.Extract(workflowExecutionContext); await _variablePersistenceManager.SaveVariablesAsync(workflowExecutionContext); - await _workflowRuntime.ImportWorkflowStateAsync(workflowState, cancellationToken); + await _workflowRuntime.MergeWorkflowStateAsync(workflowState, cancellationToken); + //await _workflowRuntime.ImportWorkflowStateAsync(workflowState, cancellationToken); // Process bookmarks. var newBookmarks = workflowExecutionContext.Bookmarks.ToList(); diff --git a/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowRuntime.cs b/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowRuntime.cs index f97926a0cc..4c00e1e676 100644 --- a/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowRuntime.cs +++ b/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowRuntime.cs @@ -292,6 +292,19 @@ public async Task CountRunningWorkflowsAsync(CountRunningWorkflowsRequest return await _workflowInstanceStore.CountAsync(filter, cancellationToken); } + public async Task MergeWorkflowStateAsync(WorkflowState workflowState, CancellationToken cancellationToken = default) + { + var existingWorkflowInstance = (await _workflowInstanceStore.FindAsync(workflowState.Id, cancellationToken))!; + var workflowInstance = _workflowStateMapper.Map(workflowState)!; + + foreach (var bookmark in workflowState.Bookmarks) + { + existingWorkflowInstance.WorkflowState.Bookmarks.RemoveWhere(x => x.Id == bookmark.Id); + existingWorkflowInstance.WorkflowState.Bookmarks.Add(bookmark); + } + await _workflowInstanceManager.SaveAsync(workflowInstance, cancellationToken); + } + private async Task StartWorkflowAsync(IWorkflowHost workflowHost, StartWorkflowRuntimeOptions options) { var workflowInstanceId = string.IsNullOrEmpty(options.InstanceId) ? _identityGenerator.GenerateId() : options.InstanceId; @@ -423,11 +436,5 @@ private async Task> FindResumableWorkflowsAsync(Workf private async Task AcquireLockAsync(string resource, CancellationToken cancellationToken) { return await _distributedLockProvider.AcquireLockAsync(resource, TimeSpan.FromMinutes(2), cancellationToken); - // if (AcquiredLock.Value?.Key == resource) - // return AcquiredLock.Value.Lock; - // - // var distributedLock = await _distributedLockProvider.AcquireLockAsync(resource, TimeSpan.FromMinutes(2), cancellationToken); - // AcquiredLock.Value = new AcquiredLock { Lock = distributedLock, Key = resource }; - // return distributedLock; } } \ No newline at end of file From f89641df9d2a134f34bf9f38b4c8a5013aa9eac9 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Wed, 3 Jan 2024 20:23:35 +0100 Subject: [PATCH 02/23] Update packages.yml for version 3.0.1 The packages.yml workflow file has been updated to target the v3.0.1 branch instead of main. The version also has been updated from 3.0.0 to 3.0.1 in the version prediction logic. Hotfix tags usage has been removed. --- .github/workflows/packages.yml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/packages.yml b/.github/workflows/packages.yml index 167d8318c2..93818f22fd 100644 --- a/.github/workflows/packages.yml +++ b/.github/workflows/packages.yml @@ -3,9 +3,7 @@ on: workflow_dispatch: push: branches: - - main - tags: - - hotfix-* # e.g. 3.0.0-hotfix-1 + - v3.0.1 release: types: [ prereleased, published ] env: @@ -31,7 +29,7 @@ jobs: TAG_NAME=${TAG_NAME#refs/tags/} # remove the refs/tags/ prefix echo "VERSION=${TAG_NAME}" >> $GITHUB_ENV else - echo "VERSION=3.0.0-preview.${{github.run_number}}" >> $GITHUB_ENV + echo "VERSION=3.0.1-preview.${{github.run_number}}" >> $GITHUB_ENV fi - name: Build run: dotnet build --configuration Release /p:Version=${VERSION} From 4c783ec84dfd92a869f042816db5d7ca8e1121e3 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Wed, 3 Jan 2024 20:25:54 +0100 Subject: [PATCH 03/23] Update git branch for commit verification in workflow The Github workflow's step for verifying commits' existence has been updated. Instead of searching in the 'origin/main' branch, the workflow now checks in the 'origin/v3.0.1' branch. This modification ensures compatibility and consistency with the version being used. --- .github/workflows/packages.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/packages.yml b/.github/workflows/packages.yml index 93818f22fd..2482632b59 100644 --- a/.github/workflows/packages.yml +++ b/.github/workflows/packages.yml @@ -18,10 +18,10 @@ jobs: steps: - name: Checkout uses: actions/checkout@v3 - - name: Verify commit exists in origin/main + - name: Verify commit exists in origin/v3.0.1 run: | git fetch --no-tags --prune --depth=1 origin +refs/heads/*:refs/remotes/origin/* - git branch --remote --contains | grep origin/main + git branch --remote --contains | grep origin/v3.0.1 - name: Set VERSION variable run: | if [[ "${{ github.ref }}" == refs/tags/* && "${{ github.event_name }}" == "release" && "${{ github.event.action }}" == "published" ]]; then From 7f1b5be9b95a1432244fdfabb6f067250add993d Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Thu, 4 Jan 2024 20:35:05 +0100 Subject: [PATCH 04/23] Update Elsa.Studio package versions The Elsa.Studio and Elsa.Studio.Login.BlazorWasm packages in the Elsa.ServerAndStudio.Web and Elsa.Studio.Web projects have been updated from version 3.0.0-preview.177 to the stable version 3.0.0. This is to ensure we're using the stable and reliable versions of these packages in our projects. --- .../Elsa.ServerAndStudio.Web/Elsa.ServerAndStudio.Web.csproj | 4 ++-- src/bundles/Elsa.Studio.Web/Elsa.Studio.Web.csproj | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/bundles/Elsa.ServerAndStudio.Web/Elsa.ServerAndStudio.Web.csproj b/src/bundles/Elsa.ServerAndStudio.Web/Elsa.ServerAndStudio.Web.csproj index bcaf30f1c7..c56e03dcd6 100644 --- a/src/bundles/Elsa.ServerAndStudio.Web/Elsa.ServerAndStudio.Web.csproj +++ b/src/bundles/Elsa.ServerAndStudio.Web/Elsa.ServerAndStudio.Web.csproj @@ -34,8 +34,8 @@ - - + + \ No newline at end of file diff --git a/src/bundles/Elsa.Studio.Web/Elsa.Studio.Web.csproj b/src/bundles/Elsa.Studio.Web/Elsa.Studio.Web.csproj index 9b6a06e056..f3c053fe84 100644 --- a/src/bundles/Elsa.Studio.Web/Elsa.Studio.Web.csproj +++ b/src/bundles/Elsa.Studio.Web/Elsa.Studio.Web.csproj @@ -21,8 +21,8 @@ - - + + From 853355bcbf6dc3f94d1299eab429ca7d618adc5d Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Fri, 5 Jan 2024 21:22:18 +0100 Subject: [PATCH 05/23] Add background execution to activities and update HTTP requests Significantly enhanced the capabilities of background execution of activities. Included a change in activity type of "SendHttpRequest" from 'Task' to 'Action'. Introduced new classes for handling outcomes of context in background execution. Made some necessary adjustments to HTTP Request Task to handle sending HTTP requests from a background task. Updated several middleware classes to align with these modifications. --- .../Activities/FlowSendHttpRequest.cs | 2 +- .../Elsa.Http/Activities/SendHttpRequest.cs | 2 +- .../Activities/SendHttpRequestTask.cs | 220 ++++++++++++++++++ .../MassTransitWorkflowDispatcher.cs | 1 + ...kflowContextActivityExecutionMiddleware.cs | 2 +- ...roundActivityExecutionContextExtensions.cs | 24 ++ .../Models/BackgroundExecutionOutcome.cs | 3 + .../Models/BackgroundExecutionResult.cs | 8 + .../Services/WorkflowStateExtractor.cs | 6 +- .../Contracts/IWorkflowRuntime.cs | 5 - ...ivityExecutionPipelineBuilderExtensions.cs | 4 +- .../Handlers/CancelBackgroundActivities.cs | 2 +- .../BackgroundActivityInvokerMiddleware.cs | 65 ++++-- .../ScheduleBackgroundActivitiesMiddleware.cs | 2 +- .../DefaultBackgroundActivityInvoker.cs | 34 +-- .../Services/DefaultWorkflowRuntime.cs | 13 -- 16 files changed, 330 insertions(+), 63 deletions(-) create mode 100644 src/modules/Elsa.Http/Activities/SendHttpRequestTask.cs create mode 100644 src/modules/Elsa.Workflows.Core/Extensions/BackgroundActivityExecutionContextExtensions.cs create mode 100644 src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionOutcome.cs create mode 100644 src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionResult.cs diff --git a/src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs b/src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs index 2960b5887c..cae3cbacf6 100644 --- a/src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs +++ b/src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs @@ -12,7 +12,7 @@ namespace Elsa.Http; /// /// Send an HTTP request. /// -[Activity("Elsa", "HTTP", "Send an HTTP request.", DisplayName = "HTTP Request (flow)", Kind = ActivityKind.Task)] +[Activity("Elsa", "HTTP", "Send an HTTP request.", DisplayName = "HTTP Request (flow)", Kind = ActivityKind.Action)] public class FlowSendHttpRequest : SendHttpRequestBase, IActivityPropertyDefaultValueProvider { /// diff --git a/src/modules/Elsa.Http/Activities/SendHttpRequest.cs b/src/modules/Elsa.Http/Activities/SendHttpRequest.cs index b45e5558f4..0fd76e72ce 100644 --- a/src/modules/Elsa.Http/Activities/SendHttpRequest.cs +++ b/src/modules/Elsa.Http/Activities/SendHttpRequest.cs @@ -10,7 +10,7 @@ namespace Elsa.Http; /// /// Send an HTTP request. /// -[Activity("Elsa", "HTTP", "Send an HTTP request.", DisplayName = "HTTP Request", Kind = ActivityKind.Task)] +[Activity("Elsa", "HTTP", "Send an HTTP request.", DisplayName = "HTTP Request", Kind = ActivityKind.Action)] public class SendHttpRequest : SendHttpRequestBase { /// diff --git a/src/modules/Elsa.Http/Activities/SendHttpRequestTask.cs b/src/modules/Elsa.Http/Activities/SendHttpRequestTask.cs new file mode 100644 index 0000000000..3c2c114ba1 --- /dev/null +++ b/src/modules/Elsa.Http/Activities/SendHttpRequestTask.cs @@ -0,0 +1,220 @@ +using System.Net; +using System.Net.Http.Headers; +using System.Runtime.CompilerServices; +using Elsa.Extensions; +using Elsa.Http.ContentWriters; +using Elsa.Http.UIHints; +using Elsa.Workflows; +using Elsa.Workflows.Attributes; +using Elsa.Workflows.UIHints; +using Elsa.Workflows.Models; +using HttpHeaders = Elsa.Http.Models.HttpHeaders; + +namespace Elsa.Http; + +/// +/// Sends HTTP requests from a background task. +/// +[Activity("Elsa", "HTTP", "Send an HTTP request from a background task.", DisplayName = "HTTP Request Task", Kind = ActivityKind.Job)] +public class SendHttpRequestTask : Activity +{ + /// + public SendHttpRequestTask([CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) : base(source, line) + { + } + + /// + /// The URL to send the request to. + /// + [Input(Description = "The URL to send the request to.")] + public Input Url { get; set; } = default!; + + /// + /// The HTTP method to use when sending the request. + /// + [Input( + Description = "The HTTP method to use when sending the request.", + Options = new[] { "GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD" }, + DefaultValue = "GET", + UIHint = InputUIHints.DropDown + )] + public Input Method { get; set; } = new("GET"); + + /// + /// The content to send with the request. Can be a string, an object, a byte array or a stream. + /// + [Input(Description = "The content to send with the request. Can be a string, an object, a byte array or a stream.")] + public Input Content { get; set; } = default!; + + /// + /// The content type to use when sending the request. + /// + [Input( + Description = "The content type to use when sending the request.", + UIHandler = typeof(HttpContentTypeOptionsProvider), + UIHint = InputUIHints.DropDown + )] + public Input ContentType { get; set; } = default!; + + /// + /// The Authorization header value to send with the request. + /// + /// Bearer {some-access-token} + [Input(Description = "The Authorization header value to send with the request. For example: Bearer {some-access-token}", Category = "Security")] + public Input Authorization { get; set; } = default!; + + /// + /// A list of expected status codes to handle. + /// + [Input( + Description = "A list of expected status codes to handle.", + UIHint = InputUIHints.MultiText, + DefaultValueProvider = typeof(FlowSendHttpRequest) + )] + public Input> ExpectedStatusCodes { get; set; } = default!; + + /// + /// A value that allows to add the Authorization header without validation. + /// + [Input(Description = "A value that allows to add the Authorization header without validation.", Category = "Security")] + public Input DisableAuthorizationHeaderValidation { get; set; } = default!; + + /// + /// The headers to send along with the request. + /// + [Input( + Description = "The headers to send along with the request.", + UIHint = InputUIHints.JsonEditor, + Category = "Advanced" + )] + public Input RequestHeaders { get; set; } = new(new HttpHeaders()); + + /// + /// The parsed content, if any. + /// + [Output(Description = "The parsed content, if any.")] + public Output ParsedContent { get; set; } = default!; + + /// + protected override async ValueTask ExecuteAsync(ActivityExecutionContext context) + { + await TrySendAsync(context); + } + + private async Task TrySendAsync(ActivityExecutionContext context) + { + var request = PrepareRequest(context); + var httpClientFactory = context.GetRequiredService(); + var httpClient = httpClientFactory.CreateClient(nameof(SendHttpRequestBase)); + var cancellationToken = context.CancellationToken; + + try + { + var response = await httpClient.SendAsync(request, cancellationToken); + var parsedContent = await ParseContentAsync(context, response.Content); + context.SetResult(response.StatusCode); + context.Set(ParsedContent, parsedContent); + + HandleResponse(context, response); + } + catch (HttpRequestException e) + { + context.AddExecutionLogEntry("Error", e.Message, payload: new { StackTrace = e.StackTrace }); + context.JournalData.Add("Error", e.Message); + HandleRequestException(context, e); + } + catch (TaskCanceledException e) + { + context.AddExecutionLogEntry("Error", e.Message, payload: new { StackTrace = e.StackTrace }); + context.JournalData.Add("Cancelled", true); + HandleTaskCanceledException(context, e); + } + } + + private void HandleResponse(ActivityExecutionContext context, HttpResponseMessage response) + { + var expectedStatusCodes = ExpectedStatusCodes.GetOrDefault(context) ?? new List(0); + var statusCode = (int)response.StatusCode; + var hasMatchingStatusCode = expectedStatusCodes.Contains(statusCode); + var outcome = expectedStatusCodes.Any() ? hasMatchingStatusCode ? statusCode.ToString() : "Unmatched status code" : default; + var outcomes = new List(); + + if (outcome != null) + outcomes.Add(outcome); + + outcomes.Add("Done"); + context.JournalData["StatusCode"] = statusCode; + context.SetBackgroundOutcomes(outcomes); + } + + private void HandleRequestException(ActivityExecutionContext context, HttpRequestException exception) + { + context.SetBackgroundOutcomes(new[] { "Failed to connect" }); + } + + private void HandleTaskCanceledException(ActivityExecutionContext context, TaskCanceledException exception) + { + context.SetBackgroundOutcomes(new[] { "Timeout" }); + } + + private async Task ParseContentAsync(ActivityExecutionContext context, HttpContent httpContent) + { + if (!HasContent(httpContent)) + return null; + + var cancellationToken = context.CancellationToken; + var targetType = ParsedContent.GetTargetType(context); + var contentStream = await httpContent.ReadAsStreamAsync(cancellationToken); + var contentType = httpContent.Headers.ContentType?.MediaType!; + + targetType ??= contentType switch + { + "application/json" => typeof(object), + _ => typeof(string) + }; + + return await context.ParseContentAsync(contentStream, contentType, targetType, cancellationToken); + } + + private static bool HasContent(HttpContent httpContent) => httpContent.Headers.ContentLength > 0; + + private HttpRequestMessage PrepareRequest(ActivityExecutionContext context) + { + var method = Method.GetOrDefault(context) ?? "GET"; + var url = Url.Get(context); + var request = new HttpRequestMessage(new HttpMethod(method), url); + var headers = context.GetHeaders(RequestHeaders); + var authorization = Authorization.GetOrDefault(context); + var addAuthorizationWithoutValidation = DisableAuthorizationHeaderValidation.GetOrDefault(context); + + if (!string.IsNullOrWhiteSpace(authorization)) + if (addAuthorizationWithoutValidation) + request.Headers.TryAddWithoutValidation("Authorization", authorization); + else + request.Headers.Authorization = AuthenticationHeaderValue.Parse(authorization); + + foreach (var header in headers) + request.Headers.Add(header.Key, header.Value.AsEnumerable()); + + var contentType = ContentType.GetOrDefault(context); + var content = Content.GetOrDefault(context); + + if (contentType != null && content != null) + { + var factories = context.GetServices(); + var factory = SelectContentWriter(contentType, factories); + request.Content = factory.CreateHttpContent(content, contentType); + } + + return request; + } + + private IHttpContentFactory SelectContentWriter(string? contentType, IEnumerable factories) + { + if (string.IsNullOrWhiteSpace(contentType)) + return new JsonContentFactory(); + + var parsedContentType = new System.Net.Mime.ContentType(contentType); + return factories.FirstOrDefault(httpContentFactory => httpContentFactory.SupportedContentTypes.Any(c => c == parsedContentType.MediaType)) ?? new JsonContentFactory(); + } +} \ No newline at end of file diff --git a/src/modules/Elsa.MassTransit/Implementations/MassTransitWorkflowDispatcher.cs b/src/modules/Elsa.MassTransit/Implementations/MassTransitWorkflowDispatcher.cs index 6299094b0c..c0ccee0050 100644 --- a/src/modules/Elsa.MassTransit/Implementations/MassTransitWorkflowDispatcher.cs +++ b/src/modules/Elsa.MassTransit/Implementations/MassTransitWorkflowDispatcher.cs @@ -46,6 +46,7 @@ await _bus.Publish(new DispatchWorkflowInstance(request.InstanceId) ActivityInstanceId = request.ActivityInstanceId, ActivityHash = request.ActivityHash, Input = request.Input, + Properties = request.Properties, CorrelationId = request.CorrelationId }, cancellationToken); return new(); diff --git a/src/modules/Elsa.WorkflowContexts/Middleware/WorkflowContextActivityExecutionMiddleware.cs b/src/modules/Elsa.WorkflowContexts/Middleware/WorkflowContextActivityExecutionMiddleware.cs index ecc43655c5..d587de7139 100644 --- a/src/modules/Elsa.WorkflowContexts/Middleware/WorkflowContextActivityExecutionMiddleware.cs +++ b/src/modules/Elsa.WorkflowContexts/Middleware/WorkflowContextActivityExecutionMiddleware.cs @@ -36,7 +36,7 @@ public async ValueTask InvokeAsync(ActivityExecutionContext context) } // Check if this is a background execution. - var isBackgroundExecution = context.TransientProperties.GetValueOrDefault(BackgroundActivityCollectorMiddleware.IsBackgroundExecution); + var isBackgroundExecution = context.TransientProperties.GetValueOrDefault(BackgroundActivityInvokerMiddleware.IsBackgroundExecution); // Is the activity configured to load the context? foreach (var providerType in providerTypes) diff --git a/src/modules/Elsa.Workflows.Core/Extensions/BackgroundActivityExecutionContextExtensions.cs b/src/modules/Elsa.Workflows.Core/Extensions/BackgroundActivityExecutionContextExtensions.cs new file mode 100644 index 0000000000..c4cc0de389 --- /dev/null +++ b/src/modules/Elsa.Workflows.Core/Extensions/BackgroundActivityExecutionContextExtensions.cs @@ -0,0 +1,24 @@ +namespace Elsa.Workflows; + +/// +/// Adds extension methods to . +/// +public static class BackgroundActivityExecutionContextExtensions +{ + /// + /// Sets the background outcomes. + /// + public static void SetBackgroundOutcomes(this ActivityExecutionContext activityExecutionContext, IEnumerable outcomes) + { + var outcomesList = outcomes.ToList(); + activityExecutionContext.SetProperty("BackgroundOutcomes", outcomesList); + } + + /// + /// Gets the background outcomes. + /// + public static IEnumerable GetBackgroundOutcomes(this ActivityExecutionContext activityExecutionContext) + { + return activityExecutionContext.GetProperty>("BackgroundOutcomes") ?? Enumerable.Empty(); + } +} \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionOutcome.cs b/src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionOutcome.cs new file mode 100644 index 0000000000..8d462bb743 --- /dev/null +++ b/src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionOutcome.cs @@ -0,0 +1,3 @@ +namespace Elsa.Workflows.Models; + +public record BackgroundExecutionOutcome(string Name, object? Payload); \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionResult.cs b/src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionResult.cs new file mode 100644 index 0000000000..133d3e6b60 --- /dev/null +++ b/src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionResult.cs @@ -0,0 +1,8 @@ +namespace Elsa.Workflows.Models; + +public class BackgroundExecutionResult +{ + public ICollection Outcomes { get; set; } = new List(); + public ICollection ExecutionLog { get; set; } = new List(); + public IDictionary JournalData { get; } = new Dictionary(); +} \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Core/Services/WorkflowStateExtractor.cs b/src/modules/Elsa.Workflows.Core/Services/WorkflowStateExtractor.cs index 5615ac0e9b..aa8ef62dad 100644 --- a/src/modules/Elsa.Workflows.Core/Services/WorkflowStateExtractor.cs +++ b/src/modules/Elsa.Workflows.Core/Services/WorkflowStateExtractor.cs @@ -77,7 +77,9 @@ private void ExtractProperties(WorkflowState state, WorkflowExecutionContext wor private void ApplyProperties(WorkflowState state, WorkflowExecutionContext workflowExecutionContext) { - workflowExecutionContext.Properties = state.Properties; + // Merge properties. + foreach (var property in state.Properties) + workflowExecutionContext.Properties[property.Key] = property.Value; } private static void ApplyActivityExecutionContexts(WorkflowState state, WorkflowExecutionContext workflowExecutionContext) @@ -248,7 +250,7 @@ private static IEnumerable GetActiveActivityExecutionC // // If there are any faulted contexts, keep everything so that the user can fix the issue and potentially reschedule existing instances. // if (contexts.Any(x => x.Status == ActivityStatus.Faulted)) - return contexts; + return contexts; // return contexts // .Where(x => !x.IsCompleted) diff --git a/src/modules/Elsa.Workflows.Runtime/Contracts/IWorkflowRuntime.cs b/src/modules/Elsa.Workflows.Runtime/Contracts/IWorkflowRuntime.cs index 3421c4cb63..7464813799 100644 --- a/src/modules/Elsa.Workflows.Runtime/Contracts/IWorkflowRuntime.cs +++ b/src/modules/Elsa.Workflows.Runtime/Contracts/IWorkflowRuntime.cs @@ -88,9 +88,4 @@ public interface IWorkflowRuntime /// Counts the number of workflow instances based on the provided query args. /// Task CountRunningWorkflowsAsync(CountRunningWorkflowsRequest request, CancellationToken cancellationToken = default); - - /// - /// Merges the specified workflow state into the workflow runtime. - /// - Task MergeWorkflowStateAsync(WorkflowState workflowState, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Extensions/ActivityExecutionPipelineBuilderExtensions.cs b/src/modules/Elsa.Workflows.Runtime/Extensions/ActivityExecutionPipelineBuilderExtensions.cs index 0781caf606..e471f2815c 100644 --- a/src/modules/Elsa.Workflows.Runtime/Extensions/ActivityExecutionPipelineBuilderExtensions.cs +++ b/src/modules/Elsa.Workflows.Runtime/Extensions/ActivityExecutionPipelineBuilderExtensions.cs @@ -11,7 +11,7 @@ namespace Elsa.Extensions; public static class ActivityExecutionPipelineBuilderExtensions { /// - /// Installs the . + /// Installs the . /// - public static IActivityExecutionPipelineBuilder UseBackgroundActivityInvoker(this IActivityExecutionPipelineBuilder pipelineBuilder) => pipelineBuilder.UseMiddleware(); + public static IActivityExecutionPipelineBuilder UseBackgroundActivityInvoker(this IActivityExecutionPipelineBuilder pipelineBuilder) => pipelineBuilder.UseMiddleware(); } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Handlers/CancelBackgroundActivities.cs b/src/modules/Elsa.Workflows.Runtime/Handlers/CancelBackgroundActivities.cs index 7801a84e70..2b09aee679 100644 --- a/src/modules/Elsa.Workflows.Runtime/Handlers/CancelBackgroundActivities.cs +++ b/src/modules/Elsa.Workflows.Runtime/Handlers/CancelBackgroundActivities.cs @@ -31,7 +31,7 @@ public CancelBackgroundActivities(IBackgroundActivityScheduler backgroundActivit /// public async Task HandleAsync(WorkflowBookmarksIndexed notification, CancellationToken cancellationToken) { - var removedBookmarks = notification.IndexedWorkflowBookmarks.RemovedBookmarks.Where(x => x.Name == BackgroundActivityCollectorMiddleware.BackgroundActivityBookmarkName); + var removedBookmarks = notification.IndexedWorkflowBookmarks.RemovedBookmarks.Where(x => x.Name == BackgroundActivityInvokerMiddleware.BackgroundActivityBookmarkName); foreach (var removedBookmark in removedBookmarks) { diff --git a/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs b/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs index da4f05bf5d..c10cf90d14 100644 --- a/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs +++ b/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs @@ -12,19 +12,21 @@ namespace Elsa.Workflows.Runtime.Middleware.Activities; /// Collects the current activity for scheduling for execution from a background job if the activity is of kind or . /// The actual scheduling of the activity happens in . /// -public class BackgroundActivityCollectorMiddleware : DefaultActivityInvokerMiddleware +public class BackgroundActivityInvokerMiddleware : DefaultActivityInvokerMiddleware { /// /// A key into the activity execution context's transient properties that indicates whether the current activity is being executed in the background. /// public static readonly object IsBackgroundExecution = new(); - internal static string GetBackgroundActivityOutputKey(string activityId) => $"__BackgroundActivityOutput:{activityId}"; + internal static string GetBackgroundActivityOutputKey(string activityNodeId) => $"__BackgroundActivityOutput:{activityNodeId}"; + internal static string GetBackgroundActivityOutcomesKey(string activityNodeId) => $"__BackgroundActivityOutcomes:{activityNodeId}"; + internal static string GetBackgroundActivityJournalDataKey(string activityNodeId) => $"__BackgroundActivityJournalData:{activityNodeId}"; internal static readonly object BackgroundActivitySchedulesKey = new(); internal const string BackgroundActivityBookmarkName = "BackgroundActivity"; /// - public BackgroundActivityCollectorMiddleware(ActivityMiddlewareDelegate next) : base(next) + public BackgroundActivityInvokerMiddleware(ActivityMiddlewareDelegate next) : base(next) { } @@ -37,8 +39,16 @@ protected override async ValueTask ExecuteActivityAsync(ActivityExecutionContext ScheduleBackgroundActivity(context); else { - CaptureOutputIfAny(context); await base.ExecuteActivityAsync(context); + + // This part is either executed from the background, or in the foreground when the activity is resumed. + var isResuming = !GetIsBackgroundExecution(context) && context.ActivityDescriptor.Kind is ActivityKind.Task or ActivityKind.Job; + if (isResuming) + { + CaptureOutputIfAny(context); + CaptureJournalData(context); + await CompleteBackgroundActivityAsync(context); + } } } @@ -51,11 +61,13 @@ private static bool GetShouldRunInBackground(ActivityExecutionContext context) var activityDescriptor = context.ActivityDescriptor; var kind = activityDescriptor.Kind; - return !context.TransientProperties.ContainsKey(IsBackgroundExecution) + return !GetIsBackgroundExecution(context) && context.WorkflowExecutionContext.ExecuteDelegate == null && (kind is ActivityKind.Job || (kind == ActivityKind.Task && activity.GetRunAsynchronously())); } + private static bool GetIsBackgroundExecution(ActivityExecutionContext context) => context.TransientProperties.ContainsKey(IsBackgroundExecution); + /// /// Schedules the current activity for execution in the background. /// @@ -77,21 +89,48 @@ private static void ScheduleBackgroundActivity(ActivityExecutionContext context) private static void CaptureOutputIfAny(ActivityExecutionContext context) { var activity = context.Activity; - var inputKey = GetBackgroundActivityOutputKey(activity.Id); - - if (!context.WorkflowInput.TryGetValue(inputKey, out var capturedOutput)) + var inputKey = GetBackgroundActivityOutputKey(activity.NodeId); + var capturedOutput = context.WorkflowExecutionContext.GetProperty>(inputKey); + + if(capturedOutput == null) return; - - var input = (IDictionary)capturedOutput; - foreach (var inputEntry in input) + + foreach (var outputEntry in capturedOutput) { - var outputDescriptor = context.ActivityDescriptor.Outputs.FirstOrDefault(x => x.Name == inputEntry.Key); + var outputDescriptor = context.ActivityDescriptor.Outputs.FirstOrDefault(x => x.Name == outputEntry.Key); if (outputDescriptor == null) continue; var output = (Output?)outputDescriptor.ValueGetter(activity); - context.Set(output, inputEntry.Value); + context.Set(output, outputEntry.Value); + } + } + + private void CaptureJournalData(ActivityExecutionContext context) + { + var activity = context.Activity; + var journalDataKey = GetBackgroundActivityJournalDataKey(activity.NodeId); + var journalData = context.WorkflowExecutionContext.GetProperty>(journalDataKey); + + if (journalData == null) + return; + + foreach (var journalEntry in journalData) + context.JournalData[journalEntry.Key] = journalEntry.Value; + } + + private async Task CompleteBackgroundActivityAsync(ActivityExecutionContext context) + { + var outcomesKey = GetBackgroundActivityOutcomesKey(context.NodeId); + var outcomes = context.WorkflowExecutionContext.GetProperty>(outcomesKey); + + if (outcomes != null) + { + await context.CompleteActivityWithOutcomesAsync(outcomes.ToArray()); + + // Remove the outcomes from the workflow execution context. + context.WorkflowExecutionContext.Properties.Remove(outcomesKey); } } } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Middleware/Workflows/ScheduleBackgroundActivitiesMiddleware.cs b/src/modules/Elsa.Workflows.Runtime/Middleware/Workflows/ScheduleBackgroundActivitiesMiddleware.cs index f5b83f1a8c..c9d32723fd 100644 --- a/src/modules/Elsa.Workflows.Runtime/Middleware/Workflows/ScheduleBackgroundActivitiesMiddleware.cs +++ b/src/modules/Elsa.Workflows.Runtime/Middleware/Workflows/ScheduleBackgroundActivitiesMiddleware.cs @@ -43,7 +43,7 @@ public override async ValueTask InvokeAsync(WorkflowExecutionContext context) var scheduledBackgroundActivities = workflowExecutionContext .TransientProperties - .GetOrAdd(BackgroundActivityCollectorMiddleware.BackgroundActivitySchedulesKey, () => new List()); + .GetOrAdd(BackgroundActivityInvokerMiddleware.BackgroundActivitySchedulesKey, () => new List()); if (scheduledBackgroundActivities.Any()) { diff --git a/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs b/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs index 55d0f86971..379a10e24c 100644 --- a/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs +++ b/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs @@ -70,7 +70,6 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc var workflow = await _workflowDefinitionService.MaterializeWorkflowAsync(workflowDefinition, cancellationToken); var workflowExecutionContext = await WorkflowExecutionContext.CreateAsync(_serviceProvider, workflow, workflowState, cancellationTokens: cancellationToken); - var originalBookmarks = workflowExecutionContext.Bookmarks.ToList(); var activityNodeId = scheduledBackgroundActivity.ActivityNodeId; var activityExecutionContext = workflowExecutionContext.ActivityExecutionContexts.First(x => x.NodeId == activityNodeId); @@ -78,7 +77,7 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc await _variablePersistenceManager.LoadVariablesAsync(workflowExecutionContext); // Mark the activity as being invoked from a background worker. - activityExecutionContext.TransientProperties[BackgroundActivityCollectorMiddleware.IsBackgroundExecution] = true; + activityExecutionContext.TransientProperties[BackgroundActivityInvokerMiddleware.IsBackgroundExecution] = true; // Invoke the activity. await _activityInvoker.InvokeAsync(activityExecutionContext); @@ -86,6 +85,7 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc // Capture any activity output produced by the activity (but only if the associated memory block is stored in the workflow itself). var outputDescriptors = activityExecutionContext.ActivityDescriptor.Outputs; var outputValues = new Dictionary(); + var outcomes = activityExecutionContext.GetBackgroundOutcomes().ToList(); foreach (var outputDescriptor in outputDescriptors) { @@ -112,33 +112,21 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc outputValues[outputDescriptor.Name] = outputValue; } - // TODO: Instead of importing the entire workflow state, we should only import the following: - // - Variables - // - Activity state - // - Activity output - // - Bookmarks - workflowState = _workflowStateExtractor.Extract(workflowExecutionContext); - await _variablePersistenceManager.SaveVariablesAsync(workflowExecutionContext); - await _workflowRuntime.MergeWorkflowStateAsync(workflowState, cancellationToken); - //await _workflowRuntime.ImportWorkflowStateAsync(workflowState, cancellationToken); - - // Process bookmarks. - var newBookmarks = workflowExecutionContext.Bookmarks.ToList(); - var diff = Diff.For(originalBookmarks, newBookmarks); - await _bookmarksPersister.PersistBookmarksAsync(workflowExecutionContext, diff); - // Resume the workflow, passing along the activity output. - // TODO: This approach will fail if the output is non-serializable. We need to find a way to pass the output to the workflow without serializing it. var bookmarkId = scheduledBackgroundActivity.BookmarkId; - var inputKey = BackgroundActivityCollectorMiddleware.GetBackgroundActivityOutputKey(activityNodeId); + var inputKey = BackgroundActivityInvokerMiddleware.GetBackgroundActivityOutputKey(activityNodeId); + var outcomesKey = BackgroundActivityInvokerMiddleware.GetBackgroundActivityOutcomesKey(activityNodeId); + var journalDataKey = BackgroundActivityInvokerMiddleware.GetBackgroundActivityJournalDataKey(activityNodeId); var dispatchRequest = new DispatchWorkflowInstanceRequest { - InstanceId = workflowInstanceId, - BookmarkId = bookmarkId, - Input = new Dictionary + InstanceId = workflowInstanceId, + BookmarkId = bookmarkId, + Properties = new Dictionary { - [inputKey] = outputValues + [outcomesKey] = outcomes, + [inputKey] = outputValues, + [journalDataKey] = activityExecutionContext.JournalData } }; diff --git a/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowRuntime.cs b/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowRuntime.cs index 4c00e1e676..11730d0afb 100644 --- a/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowRuntime.cs +++ b/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowRuntime.cs @@ -292,19 +292,6 @@ public async Task CountRunningWorkflowsAsync(CountRunningWorkflowsRequest return await _workflowInstanceStore.CountAsync(filter, cancellationToken); } - public async Task MergeWorkflowStateAsync(WorkflowState workflowState, CancellationToken cancellationToken = default) - { - var existingWorkflowInstance = (await _workflowInstanceStore.FindAsync(workflowState.Id, cancellationToken))!; - var workflowInstance = _workflowStateMapper.Map(workflowState)!; - - foreach (var bookmark in workflowState.Bookmarks) - { - existingWorkflowInstance.WorkflowState.Bookmarks.RemoveWhere(x => x.Id == bookmark.Id); - existingWorkflowInstance.WorkflowState.Bookmarks.Add(bookmark); - } - await _workflowInstanceManager.SaveAsync(workflowInstance, cancellationToken); - } - private async Task StartWorkflowAsync(IWorkflowHost workflowHost, StartWorkflowRuntimeOptions options) { var workflowInstanceId = string.IsNullOrEmpty(options.InstanceId) ? _identityGenerator.GenerateId() : options.InstanceId; From 2b6294bcc132d2e76c3cb7bd3c8379c711ae6c59 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Fri, 5 Jan 2024 21:45:02 +0100 Subject: [PATCH 06/23] Add background execution handling to activity context This commit adds the ability to manage the background execution state directly within the activity execution context. This includes adding methods to set and verify the background execution state, and modifying the existing code to use these new methods. A method for handling activity scheduling during background execution has also been started, but its implementation is not finished yet. The HTTP Request activities were updated accordingly to reflect these changes. --- .../Activities/FlowSendHttpRequest.cs | 3 ++- .../Elsa.Http/Activities/SendHttpRequest.cs | 2 +- .../Activities/SendHttpRequestTask.cs | 20 +++++++++--------- ...kflowContextActivityExecutionMiddleware.cs | 2 +- .../Contexts/ActivityExecutionContext.cs | 6 ++++++ .../ActivityExecutionContextExtensions.cs | 12 ++++++++++- ...roundActivityExecutionContextExtensions.cs | 21 +++++++++++++++++++ .../BackgroundActivityInvokerMiddleware.cs | 7 +------ .../DefaultBackgroundActivityInvoker.cs | 2 +- 9 files changed, 54 insertions(+), 21 deletions(-) diff --git a/src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs b/src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs index cae3cbacf6..f46f0e4d25 100644 --- a/src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs +++ b/src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs @@ -12,7 +12,8 @@ namespace Elsa.Http; /// /// Send an HTTP request. /// -[Activity("Elsa", "HTTP", "Send an HTTP request.", DisplayName = "HTTP Request (flow)", Kind = ActivityKind.Action)] +[Activity("Elsa", "HTTP", "Send an HTTP request. This activity yis deprecated in favor of the SendHttpRequestTask activity", DisplayName = "HTTP Request (flow) [Deprecated] ", Kind = ActivityKind.Task)] +[Obsolete("Use SendHttpRequestTask instead.")] public class FlowSendHttpRequest : SendHttpRequestBase, IActivityPropertyDefaultValueProvider { /// diff --git a/src/modules/Elsa.Http/Activities/SendHttpRequest.cs b/src/modules/Elsa.Http/Activities/SendHttpRequest.cs index 0fd76e72ce..b45e5558f4 100644 --- a/src/modules/Elsa.Http/Activities/SendHttpRequest.cs +++ b/src/modules/Elsa.Http/Activities/SendHttpRequest.cs @@ -10,7 +10,7 @@ namespace Elsa.Http; /// /// Send an HTTP request. /// -[Activity("Elsa", "HTTP", "Send an HTTP request.", DisplayName = "HTTP Request", Kind = ActivityKind.Action)] +[Activity("Elsa", "HTTP", "Send an HTTP request.", DisplayName = "HTTP Request", Kind = ActivityKind.Task)] public class SendHttpRequest : SendHttpRequestBase { /// diff --git a/src/modules/Elsa.Http/Activities/SendHttpRequestTask.cs b/src/modules/Elsa.Http/Activities/SendHttpRequestTask.cs index 3c2c114ba1..911ca0039e 100644 --- a/src/modules/Elsa.Http/Activities/SendHttpRequestTask.cs +++ b/src/modules/Elsa.Http/Activities/SendHttpRequestTask.cs @@ -15,7 +15,7 @@ namespace Elsa.Http; /// /// Sends HTTP requests from a background task. /// -[Activity("Elsa", "HTTP", "Send an HTTP request from a background task.", DisplayName = "HTTP Request Task", Kind = ActivityKind.Job)] +[Activity("Elsa", "HTTP", "Send an HTTP request from a background task.", DisplayName = "HTTP Request Task", Kind = ActivityKind.Task)] public class SendHttpRequestTask : Activity { /// @@ -115,23 +115,23 @@ private async Task TrySendAsync(ActivityExecutionContext context) context.SetResult(response.StatusCode); context.Set(ParsedContent, parsedContent); - HandleResponse(context, response); + await HandleResponseAsync(context, response); } catch (HttpRequestException e) { context.AddExecutionLogEntry("Error", e.Message, payload: new { StackTrace = e.StackTrace }); context.JournalData.Add("Error", e.Message); - HandleRequestException(context, e); + await HandleRequestExceptionAsync(context, e); } catch (TaskCanceledException e) { context.AddExecutionLogEntry("Error", e.Message, payload: new { StackTrace = e.StackTrace }); context.JournalData.Add("Cancelled", true); - HandleTaskCanceledException(context, e); + await HandleTaskCanceledExceptionAsync(context, e); } } - private void HandleResponse(ActivityExecutionContext context, HttpResponseMessage response) + private async Task HandleResponseAsync(ActivityExecutionContext context, HttpResponseMessage response) { var expectedStatusCodes = ExpectedStatusCodes.GetOrDefault(context) ?? new List(0); var statusCode = (int)response.StatusCode; @@ -144,17 +144,17 @@ private void HandleResponse(ActivityExecutionContext context, HttpResponseMessag outcomes.Add("Done"); context.JournalData["StatusCode"] = statusCode; - context.SetBackgroundOutcomes(outcomes); + await context.CompleteActivityWithOutcomesAsync(outcomes.ToArray()); } - private void HandleRequestException(ActivityExecutionContext context, HttpRequestException exception) + private async Task HandleRequestExceptionAsync(ActivityExecutionContext context, HttpRequestException exception) { - context.SetBackgroundOutcomes(new[] { "Failed to connect" }); + await context.CompleteActivityWithOutcomesAsync("Failed to connect"); } - private void HandleTaskCanceledException(ActivityExecutionContext context, TaskCanceledException exception) + private async Task HandleTaskCanceledExceptionAsync(ActivityExecutionContext context, TaskCanceledException exception) { - context.SetBackgroundOutcomes(new[] { "Timeout" }); + await context.CompleteActivityWithOutcomesAsync("Timeout"); } private async Task ParseContentAsync(ActivityExecutionContext context, HttpContent httpContent) diff --git a/src/modules/Elsa.WorkflowContexts/Middleware/WorkflowContextActivityExecutionMiddleware.cs b/src/modules/Elsa.WorkflowContexts/Middleware/WorkflowContextActivityExecutionMiddleware.cs index d587de7139..ccb7d4ba20 100644 --- a/src/modules/Elsa.WorkflowContexts/Middleware/WorkflowContextActivityExecutionMiddleware.cs +++ b/src/modules/Elsa.WorkflowContexts/Middleware/WorkflowContextActivityExecutionMiddleware.cs @@ -36,7 +36,7 @@ public async ValueTask InvokeAsync(ActivityExecutionContext context) } // Check if this is a background execution. - var isBackgroundExecution = context.TransientProperties.GetValueOrDefault(BackgroundActivityInvokerMiddleware.IsBackgroundExecution); + var isBackgroundExecution = context.GetIsBackgroundExecution(); // Is the activity configured to load the context? foreach (var providerType in providerTypes) diff --git a/src/modules/Elsa.Workflows.Core/Contexts/ActivityExecutionContext.cs b/src/modules/Elsa.Workflows.Core/Contexts/ActivityExecutionContext.cs index 2fd7df1b5b..1aa60ee4a3 100644 --- a/src/modules/Elsa.Workflows.Core/Contexts/ActivityExecutionContext.cs +++ b/src/modules/Elsa.Workflows.Core/Contexts/ActivityExecutionContext.cs @@ -230,6 +230,12 @@ public async ValueTask ScheduleActivityAsync(IActivity? activity, ActivityExecut /// The options used to schedule the activity. public async ValueTask ScheduleActivityAsync(ActivityNode? activityNode, ActivityExecutionContext? owner = default, ScheduleWorkOptions? options = default) { + if (this.GetIsBackgroundExecution()) + { + // TODO: Capture the information in a serializable format and store it in the workflow execution context Properties dictionary. + // The information should be stored in a way that allows the workflow execution context to resume the activity execution context. + } + var completionCallback = options?.CompletionCallback; owner ??= this; diff --git a/src/modules/Elsa.Workflows.Core/Extensions/ActivityExecutionContextExtensions.cs b/src/modules/Elsa.Workflows.Core/Extensions/ActivityExecutionContextExtensions.cs index 7735978454..937934592a 100644 --- a/src/modules/Elsa.Workflows.Core/Extensions/ActivityExecutionContextExtensions.cs +++ b/src/modules/Elsa.Workflows.Core/Extensions/ActivityExecutionContextExtensions.cs @@ -456,6 +456,16 @@ public static async ValueTask CompleteActivityAsync(this ActivityCompletedContex /// public static async ValueTask CompleteActivityAsync(this ActivityExecutionContext context, object? result = default) { + var outcomes = result as Outcomes; + + // If the activity is executing in the background, simply capture the result and return. + if (context.GetIsBackgroundExecution()) + { + if (outcomes != null) + context.SetBackgroundOutcomes(outcomes.Names); + return; + } + // If the activity is not running, do nothing. if (context.Status != ActivityStatus.Running) return; @@ -470,7 +480,7 @@ public static async ValueTask CompleteActivityAsync(this ActivityExecutionContex context.Status = ActivityStatus.Completed; // Record the outcomes, if any. - if (result is Outcomes outcomes) + if (outcomes != null) context.JournalData["Outcomes"] = outcomes.Names; // Record the output, if any. diff --git a/src/modules/Elsa.Workflows.Core/Extensions/BackgroundActivityExecutionContextExtensions.cs b/src/modules/Elsa.Workflows.Core/Extensions/BackgroundActivityExecutionContextExtensions.cs index c4cc0de389..72d84e745f 100644 --- a/src/modules/Elsa.Workflows.Core/Extensions/BackgroundActivityExecutionContextExtensions.cs +++ b/src/modules/Elsa.Workflows.Core/Extensions/BackgroundActivityExecutionContextExtensions.cs @@ -5,6 +5,27 @@ namespace Elsa.Workflows; /// public static class BackgroundActivityExecutionContextExtensions { + /// + /// A key into the activity execution context's transient properties that indicates whether the current activity is being executed in the background. + /// + public static readonly object IsBackgroundExecution = new(); + + /// + /// Configures the activity execution context to execute the current activity in the background. + /// + public static void SetIsBackgroundExecution(this ActivityExecutionContext activityExecutionContext, bool value = true) + { + activityExecutionContext.TransientProperties[IsBackgroundExecution] = value; + } + + /// + /// Gets a value indicating whether the current activity is being executed in the background. + /// + public static bool GetIsBackgroundExecution(this ActivityExecutionContext activityExecutionContext) + { + return activityExecutionContext.TransientProperties.ContainsKey(IsBackgroundExecution); + } + /// /// Sets the background outcomes. /// diff --git a/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs b/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs index c10cf90d14..6f440c8f57 100644 --- a/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs +++ b/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs @@ -14,11 +14,6 @@ namespace Elsa.Workflows.Runtime.Middleware.Activities; /// public class BackgroundActivityInvokerMiddleware : DefaultActivityInvokerMiddleware { - /// - /// A key into the activity execution context's transient properties that indicates whether the current activity is being executed in the background. - /// - public static readonly object IsBackgroundExecution = new(); - internal static string GetBackgroundActivityOutputKey(string activityNodeId) => $"__BackgroundActivityOutput:{activityNodeId}"; internal static string GetBackgroundActivityOutcomesKey(string activityNodeId) => $"__BackgroundActivityOutcomes:{activityNodeId}"; internal static string GetBackgroundActivityJournalDataKey(string activityNodeId) => $"__BackgroundActivityJournalData:{activityNodeId}"; @@ -66,7 +61,7 @@ private static bool GetShouldRunInBackground(ActivityExecutionContext context) && (kind is ActivityKind.Job || (kind == ActivityKind.Task && activity.GetRunAsynchronously())); } - private static bool GetIsBackgroundExecution(ActivityExecutionContext context) => context.TransientProperties.ContainsKey(IsBackgroundExecution); + private static bool GetIsBackgroundExecution(ActivityExecutionContext context) => context.TransientProperties.ContainsKey(BackgroundActivityExecutionContextExtensions.IsBackgroundExecution); /// /// Schedules the current activity for execution in the background. diff --git a/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs b/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs index 379a10e24c..90b7557a21 100644 --- a/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs +++ b/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs @@ -77,7 +77,7 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc await _variablePersistenceManager.LoadVariablesAsync(workflowExecutionContext); // Mark the activity as being invoked from a background worker. - activityExecutionContext.TransientProperties[BackgroundActivityInvokerMiddleware.IsBackgroundExecution] = true; + activityExecutionContext.SetIsBackgroundExecution(); // Invoke the activity. await _activityInvoker.InvokeAsync(activityExecutionContext); From a950a6de858c7ae42065f70bac3f09f9a2fcc343 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Fri, 5 Jan 2024 22:27:56 +0100 Subject: [PATCH 07/23] Add scheduling function for background activities This commit achieves two main goals. Firstly, it introduces two new classes called ScheduledActivity and ScheduledActivityOptions to store scheduled activities' information. Secondly, it modifies how activities are executed in the background by capturing the scheduling information as a serializable format and storing it in the workflow execution context properties dictionary. This change allows the workflow execution context to resume the activity execution context. --- .../Contexts/ActivityExecutionContext.cs | 23 +++++++-- ...roundActivityExecutionContextExtensions.cs | 20 ++++++++ .../Models/ScheduledActivity.cs | 8 +++ .../Models/ScheduledActivityOptions.cs | 13 +++++ .../BackgroundActivityInvokerMiddleware.cs | 51 ++++++++++++++++--- .../DefaultBackgroundActivityInvoker.cs | 14 +++-- 6 files changed, 113 insertions(+), 16 deletions(-) create mode 100644 src/modules/Elsa.Workflows.Core/Models/ScheduledActivity.cs create mode 100644 src/modules/Elsa.Workflows.Core/Models/ScheduledActivityOptions.cs diff --git a/src/modules/Elsa.Workflows.Core/Contexts/ActivityExecutionContext.cs b/src/modules/Elsa.Workflows.Core/Contexts/ActivityExecutionContext.cs index 1aa60ee4a3..b53b7958ad 100644 --- a/src/modules/Elsa.Workflows.Core/Contexts/ActivityExecutionContext.cs +++ b/src/modules/Elsa.Workflows.Core/Contexts/ActivityExecutionContext.cs @@ -232,10 +232,27 @@ public async ValueTask ScheduleActivityAsync(ActivityNode? activityNode, Activit { if (this.GetIsBackgroundExecution()) { - // TODO: Capture the information in a serializable format and store it in the workflow execution context Properties dictionary. - // The information should be stored in a way that allows the workflow execution context to resume the activity execution context. + var scheduledActivity = new ScheduledActivity + { + ActivityNodeId = activityNode?.NodeId, + OwnerActivityInstanceId = owner?.Id, + Options = options != null ? new ScheduledActivityOptions + { + CompletionCallback = options?.CompletionCallback?.Method.Name, + Tag = options?.Tag, + ExistingActivityInstanceId = options?.ExistingActivityExecutionContext?.Id, + PreventDuplicateScheduling = options?.PreventDuplicateScheduling ?? false, + Variables = options?.Variables?.ToList(), + Input = options?.Input + } : default + }; + + var scheduledActivities = this.GetBackgroundScheduledActivities().ToList(); + scheduledActivities.Add(scheduledActivity); + this.SetBackgroundScheduledActivities(scheduledActivities); + return; } - + var completionCallback = options?.CompletionCallback; owner ??= this; diff --git a/src/modules/Elsa.Workflows.Core/Extensions/BackgroundActivityExecutionContextExtensions.cs b/src/modules/Elsa.Workflows.Core/Extensions/BackgroundActivityExecutionContextExtensions.cs index 72d84e745f..b427d67513 100644 --- a/src/modules/Elsa.Workflows.Core/Extensions/BackgroundActivityExecutionContextExtensions.cs +++ b/src/modules/Elsa.Workflows.Core/Extensions/BackgroundActivityExecutionContextExtensions.cs @@ -1,3 +1,5 @@ +using Elsa.Workflows.Models; + namespace Elsa.Workflows; /// @@ -42,4 +44,22 @@ public static IEnumerable GetBackgroundOutcomes(this ActivityExecutionCo { return activityExecutionContext.GetProperty>("BackgroundOutcomes") ?? Enumerable.Empty(); } + + /// + /// Sets the background scheduled activities. + /// + public static void SetBackgroundScheduledActivities(this ActivityExecutionContext activityExecutionContext, IEnumerable scheduledActivities) + { + var scheduledActivitiesList = scheduledActivities.ToList(); + activityExecutionContext.SetProperty("BackgroundScheduledActivities", scheduledActivitiesList); + } + + /// + /// Gets the background scheduled activities. + /// + /// + public static IEnumerable GetBackgroundScheduledActivities(this ActivityExecutionContext activityExecutionContext) + { + return activityExecutionContext.GetProperty>("BackgroundScheduledActivities") ?? Enumerable.Empty(); + } } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Core/Models/ScheduledActivity.cs b/src/modules/Elsa.Workflows.Core/Models/ScheduledActivity.cs new file mode 100644 index 0000000000..768bbb112e --- /dev/null +++ b/src/modules/Elsa.Workflows.Core/Models/ScheduledActivity.cs @@ -0,0 +1,8 @@ +namespace Elsa.Workflows.Models; + +public class ScheduledActivity +{ + public string? ActivityNodeId { get; set; } + public string? OwnerActivityInstanceId { get; set; } + public ScheduledActivityOptions? Options { get; set; } +} \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Core/Models/ScheduledActivityOptions.cs b/src/modules/Elsa.Workflows.Core/Models/ScheduledActivityOptions.cs new file mode 100644 index 0000000000..403c91e652 --- /dev/null +++ b/src/modules/Elsa.Workflows.Core/Models/ScheduledActivityOptions.cs @@ -0,0 +1,13 @@ +using Elsa.Workflows.Memory; + +namespace Elsa.Workflows.Models; + +public class ScheduledActivityOptions +{ + public string? CompletionCallback { get; set; } + public object? Tag { get; set; } + public ICollection? Variables { get; set; } + public string? ExistingActivityInstanceId { get; set; } + public bool PreventDuplicateScheduling { get; set; } + public IDictionary? Input { get; set; } +} \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs b/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs index 6f440c8f57..defca4956b 100644 --- a/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs +++ b/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs @@ -1,6 +1,8 @@ +using System.Text.Json; using Elsa.Extensions; using Elsa.Workflows.Middleware.Activities; using Elsa.Workflows.Models; +using Elsa.Workflows.Options; using Elsa.Workflows.Pipelines.ActivityExecution; using Elsa.Workflows.Runtime.Bookmarks; using Elsa.Workflows.Runtime.Middleware.Workflows; @@ -17,6 +19,7 @@ public class BackgroundActivityInvokerMiddleware : DefaultActivityInvokerMiddlew internal static string GetBackgroundActivityOutputKey(string activityNodeId) => $"__BackgroundActivityOutput:{activityNodeId}"; internal static string GetBackgroundActivityOutcomesKey(string activityNodeId) => $"__BackgroundActivityOutcomes:{activityNodeId}"; internal static string GetBackgroundActivityJournalDataKey(string activityNodeId) => $"__BackgroundActivityJournalData:{activityNodeId}"; + internal static string GetBackgroundActivityScheduledActivitiesKey(string activityNodeId) => $"__BackgroundActivityScheduledActivities:{activityNodeId}"; internal static readonly object BackgroundActivitySchedulesKey = new(); internal const string BackgroundActivityBookmarkName = "BackgroundActivity"; @@ -42,7 +45,8 @@ protected override async ValueTask ExecuteActivityAsync(ActivityExecutionContext { CaptureOutputIfAny(context); CaptureJournalData(context); - await CompleteBackgroundActivityAsync(context); + await CompleteBackgroundActivityOutcomesAsync(context); + await CompleteBackgroundActivityScheduledActivitiesAsync(context); } } } @@ -86,10 +90,10 @@ private static void CaptureOutputIfAny(ActivityExecutionContext context) var activity = context.Activity; var inputKey = GetBackgroundActivityOutputKey(activity.NodeId); var capturedOutput = context.WorkflowExecutionContext.GetProperty>(inputKey); - - if(capturedOutput == null) + + if (capturedOutput == null) return; - + foreach (var outputEntry in capturedOutput) { var outputDescriptor = context.ActivityDescriptor.Outputs.FirstOrDefault(x => x.Name == outputEntry.Key); @@ -101,7 +105,7 @@ private static void CaptureOutputIfAny(ActivityExecutionContext context) context.Set(output, outputEntry.Value); } } - + private void CaptureJournalData(ActivityExecutionContext context) { var activity = context.Activity; @@ -114,8 +118,8 @@ private void CaptureJournalData(ActivityExecutionContext context) foreach (var journalEntry in journalData) context.JournalData[journalEntry.Key] = journalEntry.Value; } - - private async Task CompleteBackgroundActivityAsync(ActivityExecutionContext context) + + private async Task CompleteBackgroundActivityOutcomesAsync(ActivityExecutionContext context) { var outcomesKey = GetBackgroundActivityOutcomesKey(context.NodeId); var outcomes = context.WorkflowExecutionContext.GetProperty>(outcomesKey); @@ -123,9 +127,40 @@ private async Task CompleteBackgroundActivityAsync(ActivityExecutionContext cont if (outcomes != null) { await context.CompleteActivityWithOutcomesAsync(outcomes.ToArray()); - + // Remove the outcomes from the workflow execution context. context.WorkflowExecutionContext.Properties.Remove(outcomesKey); } } + + private async Task CompleteBackgroundActivityScheduledActivitiesAsync(ActivityExecutionContext context) + { + var scheduledActivitiesKey = GetBackgroundActivityScheduledActivitiesKey(context.NodeId); + var scheduledActivitiesJson = context.WorkflowExecutionContext.GetProperty(scheduledActivitiesKey); + var scheduledActivities = scheduledActivitiesJson != null ? JsonSerializer.Deserialize>(scheduledActivitiesJson) : null; + + if (scheduledActivities != null) + { + foreach (var scheduledActivity in scheduledActivities) + { + var activityNode = scheduledActivity.ActivityNodeId != null ? context.WorkflowExecutionContext.FindActivityByNodeId(scheduledActivity.ActivityNodeId) : null; + var owner = scheduledActivity.OwnerActivityInstanceId != null ? context.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x.Id == scheduledActivity.OwnerActivityInstanceId) : null; + var options = scheduledActivity.Options != null + ? new ScheduleWorkOptions + { + ExistingActivityExecutionContext = scheduledActivity.Options.ExistingActivityInstanceId != null ? context.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x.Id == scheduledActivity.Options.ExistingActivityInstanceId) : null, + Variables = scheduledActivity.Options?.Variables, + CompletionCallback = !string.IsNullOrEmpty(scheduledActivity.Options?.CompletionCallback) && owner != null ? owner.Activity.GetActivityCompletionCallback(scheduledActivity.Options.CompletionCallback) : default, + PreventDuplicateScheduling = scheduledActivity.Options?.PreventDuplicateScheduling ?? false, + Input = scheduledActivity.Options?.Input, + Tag = scheduledActivity.Options?.Tag + } + : default; + await context.ScheduleActivityAsync(activityNode, owner, options); + } + + // Remove the scheduled activities from the workflow execution context. + context.WorkflowExecutionContext.Properties.Remove(scheduledActivitiesKey); + } + } } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs b/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs index 90b7557a21..372271f225 100644 --- a/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs +++ b/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs @@ -1,3 +1,4 @@ +using System.Text.Json; using Elsa.Common.Models; using Elsa.Workflows.Contracts; using Elsa.Workflows.Helpers; @@ -57,7 +58,7 @@ public DefaultBackgroundActivityInvoker( public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundActivity, CancellationToken cancellationToken = default) { var workflowInstanceId = scheduledBackgroundActivity.WorkflowInstanceId; - + var workflowState = await _workflowRuntime.ExportWorkflowStateAsync(workflowInstanceId, cancellationToken); if (workflowState == null) @@ -85,7 +86,6 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc // Capture any activity output produced by the activity (but only if the associated memory block is stored in the workflow itself). var outputDescriptors = activityExecutionContext.ActivityDescriptor.Outputs; var outputValues = new Dictionary(); - var outcomes = activityExecutionContext.GetBackgroundOutcomes().ToList(); foreach (var outputDescriptor in outputDescriptors) { @@ -112,19 +112,23 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc outputValues[outputDescriptor.Name] = outputValue; } - // Resume the workflow, passing along the activity output. + // Resume the workflow, passing along activity output, outcomes and scheduled activities. var bookmarkId = scheduledBackgroundActivity.BookmarkId; var inputKey = BackgroundActivityInvokerMiddleware.GetBackgroundActivityOutputKey(activityNodeId); var outcomesKey = BackgroundActivityInvokerMiddleware.GetBackgroundActivityOutcomesKey(activityNodeId); var journalDataKey = BackgroundActivityInvokerMiddleware.GetBackgroundActivityJournalDataKey(activityNodeId); + var scheduledActivitiesKey = BackgroundActivityInvokerMiddleware.GetBackgroundActivityScheduledActivitiesKey(activityNodeId); + var outcomes = activityExecutionContext.GetBackgroundOutcomes().ToList(); + var scheduledActivities = activityExecutionContext.GetBackgroundScheduledActivities().ToList(); var dispatchRequest = new DispatchWorkflowInstanceRequest { - InstanceId = workflowInstanceId, - BookmarkId = bookmarkId, + InstanceId = workflowInstanceId, + BookmarkId = bookmarkId, Properties = new Dictionary { [outcomesKey] = outcomes, + [scheduledActivitiesKey] = JsonSerializer.Serialize(scheduledActivities), [inputKey] = outputValues, [journalDataKey] = activityExecutionContext.JournalData } From 7c632a5b5529df2d61b3a1e8187dd069ad09323f Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 10:33:44 +0100 Subject: [PATCH 08/23] Refactor HTTP request handling by removing SendHttpRequestTask SendHttpRequestTask was deleted and its functionality was merged into SendHttpRequestBase. This consolidation led to the addition of StatusCode and ResponseHeaders output fields in SendHttpRequestBase. Another change includes the update in FlowSendHttpRequest to indicate that it's no longer deprecated. Also, HttpHeaders class was extended to accommodate HttpResponseHeaders objects. The consolidation was done to streamline the HTTP request handling process. --- .../Activities/FlowSendHttpRequest.cs | 5 +- .../Activities/SendHttpRequestBase.cs | 17 ++ .../Activities/SendHttpRequestTask.cs | 220 ------------------ src/modules/Elsa.Http/Models/HttpHeaders.cs | 20 ++ 4 files changed, 38 insertions(+), 224 deletions(-) delete mode 100644 src/modules/Elsa.Http/Activities/SendHttpRequestTask.cs diff --git a/src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs b/src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs index f46f0e4d25..6b6e8b1d7b 100644 --- a/src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs +++ b/src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs @@ -12,8 +12,7 @@ namespace Elsa.Http; /// /// Send an HTTP request. /// -[Activity("Elsa", "HTTP", "Send an HTTP request. This activity yis deprecated in favor of the SendHttpRequestTask activity", DisplayName = "HTTP Request (flow) [Deprecated] ", Kind = ActivityKind.Task)] -[Obsolete("Use SendHttpRequestTask instead.")] +[Activity("Elsa", "HTTP", "Send an HTTP request.", DisplayName = "HTTP Request (flow)", Kind = ActivityKind.Task)] public class FlowSendHttpRequest : SendHttpRequestBase, IActivityPropertyDefaultValueProvider { /// @@ -44,8 +43,6 @@ protected override async ValueTask HandleResponseAsync(ActivityExecutionContext outcomes.Add(outcome); outcomes.Add("Done"); - - context.JournalData["StatusCode"] = statusCode; await context.CompleteActivityWithOutcomesAsync(outcomes.ToArray()); } diff --git a/src/modules/Elsa.Http/Activities/SendHttpRequestBase.cs b/src/modules/Elsa.Http/Activities/SendHttpRequestBase.cs index 1618927512..53cc92daab 100644 --- a/src/modules/Elsa.Http/Activities/SendHttpRequestBase.cs +++ b/src/modules/Elsa.Http/Activities/SendHttpRequestBase.cs @@ -77,12 +77,24 @@ protected SendHttpRequestBase(string? source = default, int? line = default) : b )] public Input RequestHeaders { get; set; } = new(new HttpHeaders()); + /// + /// The HTTP response status code + /// + [Output(Description = "The HTTP response status code")] + public Output StatusCode { get; set; } = default!; + /// /// The parsed content, if any. /// [Output(Description = "The parsed content, if any.")] public Output ParsedContent { get; set; } = default!; + /// + /// The response headers that were received. + /// + [Output(Description = "The response headers that were received.")] + public Output ResponseHeaders { get; set; } = default!; + /// protected override async ValueTask ExecuteAsync(ActivityExecutionContext context) { @@ -115,8 +127,13 @@ private async Task TrySendAsync(ActivityExecutionContext context) { var response = await httpClient.SendAsync(request, cancellationToken); var parsedContent = await ParseContentAsync(context, response.Content); + var statusCode = (int)response.StatusCode; + var responseHeaders = new HttpHeaders(response.Headers); + context.Set(Result, response); context.Set(ParsedContent, parsedContent); + context.Set(StatusCode, statusCode); + context.Set(ResponseHeaders, responseHeaders); await HandleResponseAsync(context, response); } diff --git a/src/modules/Elsa.Http/Activities/SendHttpRequestTask.cs b/src/modules/Elsa.Http/Activities/SendHttpRequestTask.cs deleted file mode 100644 index 911ca0039e..0000000000 --- a/src/modules/Elsa.Http/Activities/SendHttpRequestTask.cs +++ /dev/null @@ -1,220 +0,0 @@ -using System.Net; -using System.Net.Http.Headers; -using System.Runtime.CompilerServices; -using Elsa.Extensions; -using Elsa.Http.ContentWriters; -using Elsa.Http.UIHints; -using Elsa.Workflows; -using Elsa.Workflows.Attributes; -using Elsa.Workflows.UIHints; -using Elsa.Workflows.Models; -using HttpHeaders = Elsa.Http.Models.HttpHeaders; - -namespace Elsa.Http; - -/// -/// Sends HTTP requests from a background task. -/// -[Activity("Elsa", "HTTP", "Send an HTTP request from a background task.", DisplayName = "HTTP Request Task", Kind = ActivityKind.Task)] -public class SendHttpRequestTask : Activity -{ - /// - public SendHttpRequestTask([CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) : base(source, line) - { - } - - /// - /// The URL to send the request to. - /// - [Input(Description = "The URL to send the request to.")] - public Input Url { get; set; } = default!; - - /// - /// The HTTP method to use when sending the request. - /// - [Input( - Description = "The HTTP method to use when sending the request.", - Options = new[] { "GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD" }, - DefaultValue = "GET", - UIHint = InputUIHints.DropDown - )] - public Input Method { get; set; } = new("GET"); - - /// - /// The content to send with the request. Can be a string, an object, a byte array or a stream. - /// - [Input(Description = "The content to send with the request. Can be a string, an object, a byte array or a stream.")] - public Input Content { get; set; } = default!; - - /// - /// The content type to use when sending the request. - /// - [Input( - Description = "The content type to use when sending the request.", - UIHandler = typeof(HttpContentTypeOptionsProvider), - UIHint = InputUIHints.DropDown - )] - public Input ContentType { get; set; } = default!; - - /// - /// The Authorization header value to send with the request. - /// - /// Bearer {some-access-token} - [Input(Description = "The Authorization header value to send with the request. For example: Bearer {some-access-token}", Category = "Security")] - public Input Authorization { get; set; } = default!; - - /// - /// A list of expected status codes to handle. - /// - [Input( - Description = "A list of expected status codes to handle.", - UIHint = InputUIHints.MultiText, - DefaultValueProvider = typeof(FlowSendHttpRequest) - )] - public Input> ExpectedStatusCodes { get; set; } = default!; - - /// - /// A value that allows to add the Authorization header without validation. - /// - [Input(Description = "A value that allows to add the Authorization header without validation.", Category = "Security")] - public Input DisableAuthorizationHeaderValidation { get; set; } = default!; - - /// - /// The headers to send along with the request. - /// - [Input( - Description = "The headers to send along with the request.", - UIHint = InputUIHints.JsonEditor, - Category = "Advanced" - )] - public Input RequestHeaders { get; set; } = new(new HttpHeaders()); - - /// - /// The parsed content, if any. - /// - [Output(Description = "The parsed content, if any.")] - public Output ParsedContent { get; set; } = default!; - - /// - protected override async ValueTask ExecuteAsync(ActivityExecutionContext context) - { - await TrySendAsync(context); - } - - private async Task TrySendAsync(ActivityExecutionContext context) - { - var request = PrepareRequest(context); - var httpClientFactory = context.GetRequiredService(); - var httpClient = httpClientFactory.CreateClient(nameof(SendHttpRequestBase)); - var cancellationToken = context.CancellationToken; - - try - { - var response = await httpClient.SendAsync(request, cancellationToken); - var parsedContent = await ParseContentAsync(context, response.Content); - context.SetResult(response.StatusCode); - context.Set(ParsedContent, parsedContent); - - await HandleResponseAsync(context, response); - } - catch (HttpRequestException e) - { - context.AddExecutionLogEntry("Error", e.Message, payload: new { StackTrace = e.StackTrace }); - context.JournalData.Add("Error", e.Message); - await HandleRequestExceptionAsync(context, e); - } - catch (TaskCanceledException e) - { - context.AddExecutionLogEntry("Error", e.Message, payload: new { StackTrace = e.StackTrace }); - context.JournalData.Add("Cancelled", true); - await HandleTaskCanceledExceptionAsync(context, e); - } - } - - private async Task HandleResponseAsync(ActivityExecutionContext context, HttpResponseMessage response) - { - var expectedStatusCodes = ExpectedStatusCodes.GetOrDefault(context) ?? new List(0); - var statusCode = (int)response.StatusCode; - var hasMatchingStatusCode = expectedStatusCodes.Contains(statusCode); - var outcome = expectedStatusCodes.Any() ? hasMatchingStatusCode ? statusCode.ToString() : "Unmatched status code" : default; - var outcomes = new List(); - - if (outcome != null) - outcomes.Add(outcome); - - outcomes.Add("Done"); - context.JournalData["StatusCode"] = statusCode; - await context.CompleteActivityWithOutcomesAsync(outcomes.ToArray()); - } - - private async Task HandleRequestExceptionAsync(ActivityExecutionContext context, HttpRequestException exception) - { - await context.CompleteActivityWithOutcomesAsync("Failed to connect"); - } - - private async Task HandleTaskCanceledExceptionAsync(ActivityExecutionContext context, TaskCanceledException exception) - { - await context.CompleteActivityWithOutcomesAsync("Timeout"); - } - - private async Task ParseContentAsync(ActivityExecutionContext context, HttpContent httpContent) - { - if (!HasContent(httpContent)) - return null; - - var cancellationToken = context.CancellationToken; - var targetType = ParsedContent.GetTargetType(context); - var contentStream = await httpContent.ReadAsStreamAsync(cancellationToken); - var contentType = httpContent.Headers.ContentType?.MediaType!; - - targetType ??= contentType switch - { - "application/json" => typeof(object), - _ => typeof(string) - }; - - return await context.ParseContentAsync(contentStream, contentType, targetType, cancellationToken); - } - - private static bool HasContent(HttpContent httpContent) => httpContent.Headers.ContentLength > 0; - - private HttpRequestMessage PrepareRequest(ActivityExecutionContext context) - { - var method = Method.GetOrDefault(context) ?? "GET"; - var url = Url.Get(context); - var request = new HttpRequestMessage(new HttpMethod(method), url); - var headers = context.GetHeaders(RequestHeaders); - var authorization = Authorization.GetOrDefault(context); - var addAuthorizationWithoutValidation = DisableAuthorizationHeaderValidation.GetOrDefault(context); - - if (!string.IsNullOrWhiteSpace(authorization)) - if (addAuthorizationWithoutValidation) - request.Headers.TryAddWithoutValidation("Authorization", authorization); - else - request.Headers.Authorization = AuthenticationHeaderValue.Parse(authorization); - - foreach (var header in headers) - request.Headers.Add(header.Key, header.Value.AsEnumerable()); - - var contentType = ContentType.GetOrDefault(context); - var content = Content.GetOrDefault(context); - - if (contentType != null && content != null) - { - var factories = context.GetServices(); - var factory = SelectContentWriter(contentType, factories); - request.Content = factory.CreateHttpContent(content, contentType); - } - - return request; - } - - private IHttpContentFactory SelectContentWriter(string? contentType, IEnumerable factories) - { - if (string.IsNullOrWhiteSpace(contentType)) - return new JsonContentFactory(); - - var parsedContentType = new System.Net.Mime.ContentType(contentType); - return factories.FirstOrDefault(httpContentFactory => httpContentFactory.SupportedContentTypes.Any(c => c == parsedContentType.MediaType)) ?? new JsonContentFactory(); - } -} \ No newline at end of file diff --git a/src/modules/Elsa.Http/Models/HttpHeaders.cs b/src/modules/Elsa.Http/Models/HttpHeaders.cs index 1cbe03b8cb..e1f61b26d4 100644 --- a/src/modules/Elsa.Http/Models/HttpHeaders.cs +++ b/src/modules/Elsa.Http/Models/HttpHeaders.cs @@ -1,3 +1,4 @@ +using System.Net.Http.Headers; using System.Text.Json.Serialization; using Elsa.Extensions; using Elsa.Http.Serialization; @@ -10,6 +11,25 @@ namespace Elsa.Http.Models; [JsonConverter(typeof(HttpHeadersConverter))] public class HttpHeaders : Dictionary { + /// + public HttpHeaders() + { + } + + /// + public HttpHeaders(IDictionary source) + { + foreach (var item in source) + Add(item.Key, item.Value); + } + + /// + public HttpHeaders(HttpResponseHeaders source) + { + foreach (var item in source) + Add(item.Key, item.Value.ToArray()); + } + /// /// Gets the content type. /// From 6baaa2a59f290fea0753636da9ec06ed346550f7 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 10:35:37 +0100 Subject: [PATCH 09/23] Update GitHub Actions workflow for new release The GitHub Actions workflow configuration has been updated to target the '3.0.1' branch instead of 'main'. Furthermore, the preview version set in the workflow has been updated to '3.0.1-preview', changing from the previous '3.0.0-preview'. --- .github/workflows/packages.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/packages.yml b/.github/workflows/packages.yml index 167d8318c2..176ffb9587 100644 --- a/.github/workflows/packages.yml +++ b/.github/workflows/packages.yml @@ -3,7 +3,7 @@ on: workflow_dispatch: push: branches: - - main + - 3.0.1 tags: - hotfix-* # e.g. 3.0.0-hotfix-1 release: @@ -31,7 +31,7 @@ jobs: TAG_NAME=${TAG_NAME#refs/tags/} # remove the refs/tags/ prefix echo "VERSION=${TAG_NAME}" >> $GITHUB_ENV else - echo "VERSION=3.0.0-preview.${{github.run_number}}" >> $GITHUB_ENV + echo "VERSION=3.0.1-preview.${{github.run_number}}" >> $GITHUB_ENV fi - name: Build run: dotnet build --configuration Release /p:Version=${VERSION} From dc519afafa30d1fcbe32ba0071990f6d18ec1b78 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 10:36:32 +0100 Subject: [PATCH 10/23] Update branch verification in GitHub workflow The GitHub workflow configuration has been updated to verify that the commit exists in the branch 'origin/3.0.1' instead of 'origin/main'. This is done during the automated package generation process. --- .github/workflows/packages.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/packages.yml b/.github/workflows/packages.yml index 176ffb9587..79aed8fc03 100644 --- a/.github/workflows/packages.yml +++ b/.github/workflows/packages.yml @@ -20,10 +20,10 @@ jobs: steps: - name: Checkout uses: actions/checkout@v3 - - name: Verify commit exists in origin/main + - name: Verify commit exists in origin/3.0.1 run: | git fetch --no-tags --prune --depth=1 origin +refs/heads/*:refs/remotes/origin/* - git branch --remote --contains | grep origin/main + git branch --remote --contains | grep origin/3.0.1 - name: Set VERSION variable run: | if [[ "${{ github.ref }}" == refs/tags/* && "${{ github.event_name }}" == "release" && "${{ github.event.action }}" == "published" ]]; then From c750686a4bc850d68e174249649fa768eaf19240 Mon Sep 17 00:00:00 2001 From: Night Wu Date: Sat, 6 Jan 2024 09:42:36 +0000 Subject: [PATCH 11/23] New options to control retry logic for transient failures (#4750) * Add an option to control the number of automatic retries for transient failures. * Add SleepDurationProvider option for ElsaClientBuilderOptions --------- Co-authored-by: admin --- .../DependencyInjectionExtensions.cs | 8 +++++++- .../Options/ElsaClientBuilderOptions.cs | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/clients/Elsa.Api.Client/Extensions/DependencyInjectionExtensions.cs b/src/clients/Elsa.Api.Client/Extensions/DependencyInjectionExtensions.cs index 416962a0b5..03c70efeab 100644 --- a/src/clients/Elsa.Api.Client/Extensions/DependencyInjectionExtensions.cs +++ b/src/clients/Elsa.Api.Client/Extensions/DependencyInjectionExtensions.cs @@ -90,7 +90,13 @@ public static void AddApi(this IServiceCollection services, ElsaClientBuilder { var builder = services.AddRefitClient(CreateRefitSettings, typeof(T).Name).ConfigureHttpClient(ConfigureElsaApiHttpClient); httpClientBuilderOptions?.ConfigureHttpClientBuilder?.Invoke(builder); - builder.AddTransientHttpErrorPolicy(p => p.WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)))); + + var retryCount = httpClientBuilderOptions?.TransientHttpErrorRetryCount ?? 0; + var sleepDurationProvider = httpClientBuilderOptions?.SleepDurationProvider ?? (retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))); + if (retryCount > 0) + { + builder.AddTransientHttpErrorPolicy(p => p.WaitAndRetryAsync(retryCount, sleepDurationProvider)); + } } /// diff --git a/src/clients/Elsa.Api.Client/Options/ElsaClientBuilderOptions.cs b/src/clients/Elsa.Api.Client/Options/ElsaClientBuilderOptions.cs index 998b6d4999..264fd0637a 100644 --- a/src/clients/Elsa.Api.Client/Options/ElsaClientBuilderOptions.cs +++ b/src/clients/Elsa.Api.Client/Options/ElsaClientBuilderOptions.cs @@ -33,4 +33,23 @@ public class ElsaClientBuilderOptions /// Gets or sets a delegate that can be used to configure the HTTP client builder. /// public Action ConfigureHttpClientBuilder { get; set; } = _ => { }; + + /// + /// Number of automatic retries for transient failures, including following categories: + /// + /// Network failures(as ) + /// HTTP 5XX status codes(server errors) + /// HTTP 408 status code(request timeout) + /// + /// Default value is 3. + /// Set the value to 0 to disable automatic retry. + /// + public int TransientHttpErrorRetryCount { get; set; } = 3; + + /// + /// The function that provides the duration to wait for for each transient failure retry attempt. + /// This option is useless if TransientHttpErrorRetryCount is set to 0. + /// Default strategy is exponential backoff: TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)). + /// + public Func SleepDurationProvider = retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)); } \ No newline at end of file From 53bd1f9df952d8013472737feaadfc9fb04e6d12 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 11:13:29 +0100 Subject: [PATCH 12/23] Add IExecuteWorkflowApi interface and refine retry policy configuration A new interface, IExecuteWorkflowApi, was created to handle execution and dispatch of workflow definitions. This breaks down functionalities previously present in IWorkflowDefinitionsApi. Also, the retry policy configuration for HTTP requests has been refactored. Instead of hardcoding retry settings, now a delegate method can be optionally passed to customize the behavior. This makes it more flexible and shifts the responsibility of configuring retry policies to the client. --- .../DependencyInjectionExtensions.cs | 35 ++++++++++++++----- .../Options/ElsaClientBuilderOptions.cs | 19 ++-------- .../Contracts/IExecuteWorkflowApi.cs | 32 +++++++++++++++++ .../Contracts/IWorkflowDefinitionsApi.cs | 20 ----------- 4 files changed, 61 insertions(+), 45 deletions(-) create mode 100644 src/clients/Elsa.Api.Client/Resources/WorkflowDefinitions/Contracts/IExecuteWorkflowApi.cs diff --git a/src/clients/Elsa.Api.Client/Extensions/DependencyInjectionExtensions.cs b/src/clients/Elsa.Api.Client/Extensions/DependencyInjectionExtensions.cs index 03c70efeab..b80136e945 100644 --- a/src/clients/Elsa.Api.Client/Extensions/DependencyInjectionExtensions.cs +++ b/src/clients/Elsa.Api.Client/Extensions/DependencyInjectionExtensions.cs @@ -52,7 +52,6 @@ public static IServiceCollection AddElsaClient(this IServiceCollection services, { var builderOptions = new ElsaClientBuilderOptions(); configureClient.Invoke(builderOptions); - builderOptions.ConfigureHttpClientBuilder += builder => builder.AddHttpMessageHandler(sp => (DelegatingHandler)sp.GetRequiredService(builderOptions.AuthenticationHandler)); services.AddScoped(builderOptions.AuthenticationHandler); @@ -63,7 +62,19 @@ public static IServiceCollection AddElsaClient(this IServiceCollection services, options.ConfigureHttpClient = builderOptions.ConfigureHttpClient; options.ApiKey = builderOptions.ApiKey; }); + + var builderOptionsWithoutRetryPolicy = new ElsaClientBuilderOptions + { + ApiKey = builderOptions.ApiKey, + AuthenticationHandler = builderOptions.AuthenticationHandler, + BaseAddress = builderOptions.BaseAddress, + ConfigureHttpClient = builderOptions.ConfigureHttpClient, + ConfigureHttpClientBuilder = builderOptions.ConfigureHttpClientBuilder, + ConfigureRetryPolicy = null + }; + services.AddApi(builderOptions); + services.AddApi(builderOptionsWithoutRetryPolicy); services.AddApi(builderOptions); services.AddApi(builderOptions); services.AddApi(builderOptions); @@ -89,14 +100,20 @@ public static IServiceCollection AddElsaClient(this IServiceCollection services, public static void AddApi(this IServiceCollection services, ElsaClientBuilderOptions? httpClientBuilderOptions = default) where T : class { var builder = services.AddRefitClient(CreateRefitSettings, typeof(T).Name).ConfigureHttpClient(ConfigureElsaApiHttpClient); - httpClientBuilderOptions?.ConfigureHttpClientBuilder?.Invoke(builder); - - var retryCount = httpClientBuilderOptions?.TransientHttpErrorRetryCount ?? 0; - var sleepDurationProvider = httpClientBuilderOptions?.SleepDurationProvider ?? (retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))); - if (retryCount > 0) - { - builder.AddTransientHttpErrorPolicy(p => p.WaitAndRetryAsync(retryCount, sleepDurationProvider)); - } + httpClientBuilderOptions?.ConfigureHttpClientBuilder(builder); + httpClientBuilderOptions?.ConfigureRetryPolicy?.Invoke(builder); + } + + /// + /// Adds a refit client for the specified API type. + /// + /// The service collection. + /// An options object that can be used to configure the HTTP client builder. + /// The type representing the API. + public static void AddApiWithoutRetryPolicy(this IServiceCollection services, ElsaClientBuilderOptions? httpClientBuilderOptions = default) where T : class + { + var builder = services.AddRefitClient(CreateRefitSettings, typeof(T).Name).ConfigureHttpClient(ConfigureElsaApiHttpClient); + httpClientBuilderOptions?.ConfigureHttpClientBuilder(builder); } /// diff --git a/src/clients/Elsa.Api.Client/Options/ElsaClientBuilderOptions.cs b/src/clients/Elsa.Api.Client/Options/ElsaClientBuilderOptions.cs index 264fd0637a..a7312eb602 100644 --- a/src/clients/Elsa.Api.Client/Options/ElsaClientBuilderOptions.cs +++ b/src/clients/Elsa.Api.Client/Options/ElsaClientBuilderOptions.cs @@ -1,5 +1,6 @@ using Elsa.Api.Client.HttpMessageHandlers; using Microsoft.Extensions.DependencyInjection; +using Polly; namespace Elsa.Api.Client.Options; @@ -35,21 +36,7 @@ public class ElsaClientBuilderOptions public Action ConfigureHttpClientBuilder { get; set; } = _ => { }; /// - /// Number of automatic retries for transient failures, including following categories: - /// - /// Network failures(as ) - /// HTTP 5XX status codes(server errors) - /// HTTP 408 status code(request timeout) - /// - /// Default value is 3. - /// Set the value to 0 to disable automatic retry. + /// Gets or sets a delegate that can be used to configure the retry policy. /// - public int TransientHttpErrorRetryCount { get; set; } = 3; - - /// - /// The function that provides the duration to wait for for each transient failure retry attempt. - /// This option is useless if TransientHttpErrorRetryCount is set to 0. - /// Default strategy is exponential backoff: TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)). - /// - public Func SleepDurationProvider = retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)); + public Action? ConfigureRetryPolicy { get; set; } = builder => builder.AddTransientHttpErrorPolicy(p => p.WaitAndRetryAsync(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)))); } \ No newline at end of file diff --git a/src/clients/Elsa.Api.Client/Resources/WorkflowDefinitions/Contracts/IExecuteWorkflowApi.cs b/src/clients/Elsa.Api.Client/Resources/WorkflowDefinitions/Contracts/IExecuteWorkflowApi.cs new file mode 100644 index 0000000000..9b2ec493c9 --- /dev/null +++ b/src/clients/Elsa.Api.Client/Resources/WorkflowDefinitions/Contracts/IExecuteWorkflowApi.cs @@ -0,0 +1,32 @@ +using Elsa.Api.Client.Resources.WorkflowDefinitions.Requests; +using JetBrains.Annotations; +using Refit; + +namespace Elsa.Api.Client.Resources.WorkflowDefinitions.Contracts; + +/// +/// Represents a client for the workflow definitions API. +/// +[PublicAPI] +public interface IExecuteWorkflowApi +{ + /// + /// Executes a workflow definition. + /// + /// The definition ID of the workflow definition to execute. + /// An optional request containing options for executing the workflow definition. + /// An optional cancellation token. + /// A response containing information about the workflow instance that was created. + [Post("/workflow-definitions/{definitionId}/execute")] + Task ExecuteAsync(string definitionId, ExecuteWorkflowDefinitionRequest? request, CancellationToken cancellationToken = default); + + /// + /// Dispatches a request to execute the specified workflow definition. + /// + /// The definition ID of the workflow definition to dispatch request. + /// An optional request containing options for dispatching a request to execute the specified workflow definition. + /// An optional cancellation token. + /// A response containing information about the workflow instance that was created. + [Post("/workflow-definitions/{definitionId}/dispatch")] + Task DispatchAsync(string definitionId, DispatchWorkflowDefinitionRequest? request, CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/src/clients/Elsa.Api.Client/Resources/WorkflowDefinitions/Contracts/IWorkflowDefinitionsApi.cs b/src/clients/Elsa.Api.Client/Resources/WorkflowDefinitions/Contracts/IWorkflowDefinitionsApi.cs index 73205909bc..b165cec8fd 100644 --- a/src/clients/Elsa.Api.Client/Resources/WorkflowDefinitions/Contracts/IWorkflowDefinitionsApi.cs +++ b/src/clients/Elsa.Api.Client/Resources/WorkflowDefinitions/Contracts/IWorkflowDefinitionsApi.cs @@ -196,24 +196,4 @@ public interface IWorkflowDefinitionsApi /// An optional cancellation token. [Post("/workflow-definitions/{definitionId}/revert/{version}")] Task RevertVersionAsync(string definitionId, int version, CancellationToken cancellationToken = default); - - /// - /// Executes a workflow definition. - /// - /// The definition ID of the workflow definition to execute. - /// An optional request containing options for executing the workflow definition. - /// An optional cancellation token. - /// A response containing information about the workflow instance that was created. - [Post("/workflow-definitions/{definitionId}/execute")] - Task ExecuteAsync(string definitionId, ExecuteWorkflowDefinitionRequest? request, CancellationToken cancellationToken = default); - - /// - /// Dispatches a request to execute the specified workflow definition. - /// - /// The definition ID of the workflow definition to dispatch request. - /// An optional request containing options for dispatching a request to execute the specified workflow definition. - /// An optional cancellation token. - /// A response containing information about the workflow instance that was created. - [Post("/workflow-definitions/{definitionId}/dispatch")] - Task DispatchAsync(string definitionId, DispatchWorkflowDefinitionRequest? request, CancellationToken cancellationToken = default); } \ No newline at end of file From 652a617a3b49fef4be63347229cb346b89e84cd8 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 12:04:51 +0100 Subject: [PATCH 13/23] Add option for synchronous broadcast in WorkflowInbox This update introduces a new option to control synchronicity when broadcasting messages in the WorkflowInbox. The 'BroadcastWorkflowInboxMessageOptions' class allows the developer to specify whether the broadcasting will occur synchronously or asynchronously. The update also includes a new Endpoint and Workflow for demonstration and testing of this functionality. --- src/bundles/Elsa.Server.Web/MyEndpoint.cs | 27 ++++++++ .../Elsa.Server.Web/MyEventWorkflow.cs | 39 +++++++++++ .../Contracts/IWorkflowInbox.cs | 5 +- .../Handlers/ReadWorkflowInboxMessage.cs | 11 ++- .../BroadcastWorkflowInboxMessageOptions.cs | 15 ++++ .../WorkflowInboxMessageReceived.cs | 7 +- .../WorkflowInboxMessageDeliveryOptions.cs | 8 +-- .../DeliverWorkflowInboxMessageResult.cs | 2 +- .../Services/DefaultWorkflowInbox.cs | 69 ++++++++++++++----- .../Services/EventPublisher.cs | 8 +-- 10 files changed, 159 insertions(+), 32 deletions(-) create mode 100644 src/bundles/Elsa.Server.Web/MyEndpoint.cs create mode 100644 src/bundles/Elsa.Server.Web/MyEventWorkflow.cs create mode 100644 src/modules/Elsa.Workflows.Runtime/Models/BroadcastWorkflowInboxMessageOptions.cs diff --git a/src/bundles/Elsa.Server.Web/MyEndpoint.cs b/src/bundles/Elsa.Server.Web/MyEndpoint.cs new file mode 100644 index 0000000000..12ff777c29 --- /dev/null +++ b/src/bundles/Elsa.Server.Web/MyEndpoint.cs @@ -0,0 +1,27 @@ +using Elsa.Abstractions; +using Elsa.Workflows.Runtime.Contracts; + +namespace Elsa.Server.Web; + +public class MyEndpoint : ElsaEndpointWithoutRequest +{ + private readonly IEventPublisher _eventPublisher; + + public MyEndpoint(IEventPublisher eventPublisher) + { + _eventPublisher = eventPublisher; + } + + public override void Configure() + { + Get("/my-event-workflow"); + AllowAnonymous(); + } + + public override async Task HandleAsync(CancellationToken ct) + { + Console.WriteLine("Publishing MyEvent"); + var results = await _eventPublisher.PublishAsync("MyEvent", cancellationToken: ct); + Console.WriteLine($"Affected workflows: {results.Count}"); + } +} \ No newline at end of file diff --git a/src/bundles/Elsa.Server.Web/MyEventWorkflow.cs b/src/bundles/Elsa.Server.Web/MyEventWorkflow.cs new file mode 100644 index 0000000000..2439e537cb --- /dev/null +++ b/src/bundles/Elsa.Server.Web/MyEventWorkflow.cs @@ -0,0 +1,39 @@ +using Elsa.Workflows; +using Elsa.Workflows.Activities; +using Elsa.Workflows.Contracts; +using Elsa.Workflows.Runtime.Activities; + +namespace Elsa.Server.Web; + +public class OnMyEventWorkflow : WorkflowBase +{ + protected override void Build(IWorkflowBuilder builder) + { + builder.Version = 1; + builder.Id = "OnMyEventWorkflow"; + builder.Root = new Sequence + { + Activities = + { + new Event("MyEvent") + { + CanStartWorkflow = true + }, + new Inline(async () => + { + // IEventPublisher.PublishAsync returns before this executes + await SomeCallAsync(); + }), + new WriteLine("End of workflow"), + new Finish() + } + }; + } + + private async Task SomeCallAsync() + { + Console.WriteLine("Hello from OnMyEventWorkflow"); + await Task.Delay(1000); + Console.WriteLine("Goodbye from OnMyEventWorkflow"); + } +} \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Contracts/IWorkflowInbox.cs b/src/modules/Elsa.Workflows.Runtime/Contracts/IWorkflowInbox.cs index 949c7cf62b..52245a1fe6 100644 --- a/src/modules/Elsa.Workflows.Runtime/Contracts/IWorkflowInbox.cs +++ b/src/modules/Elsa.Workflows.Runtime/Contracts/IWorkflowInbox.cs @@ -32,13 +32,14 @@ public interface IWorkflowInbox /// The message to deliver. /// An optional cancellation token. ValueTask DeliverAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default); - + /// /// Broadcasts the specified message, which may trigger new workflows and resume existing ones. /// /// The message to broadcast. + /// An optional set of delivery options. /// An optional cancellation token. - ValueTask BroadcastAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default); + ValueTask BroadcastAsync(WorkflowInboxMessage message, BroadcastWorkflowInboxMessageOptions? options, CancellationToken cancellationToken = default); /// /// Finds all messages matching the specified filter. diff --git a/src/modules/Elsa.Workflows.Runtime/Handlers/ReadWorkflowInboxMessage.cs b/src/modules/Elsa.Workflows.Runtime/Handlers/ReadWorkflowInboxMessage.cs index 59e1159678..ecf6cb0801 100644 --- a/src/modules/Elsa.Workflows.Runtime/Handlers/ReadWorkflowInboxMessage.cs +++ b/src/modules/Elsa.Workflows.Runtime/Handlers/ReadWorkflowInboxMessage.cs @@ -1,5 +1,7 @@ +using Elsa.Extensions; using Elsa.Mediator.Contracts; using Elsa.Workflows.Runtime.Contracts; +using Elsa.Workflows.Runtime.Models; using Elsa.Workflows.Runtime.Notifications; namespace Elsa.Workflows.Runtime.Handlers; @@ -19,11 +21,16 @@ public ReadWorkflowInboxMessage(IWorkflowInbox workflowInbox) { _workflowInbox = workflowInbox; } - + /// public async Task HandleAsync(WorkflowInboxMessageReceived notification, CancellationToken cancellationToken) { var message = notification.InboxMessage; - await _workflowInbox.BroadcastAsync(message, cancellationToken); + var options = new BroadcastWorkflowInboxMessageOptions + { + DispatchAsynchronously = notification.Options.DispatchAsynchronously + }; + var result = await _workflowInbox.BroadcastAsync(message, options, cancellationToken); + notification.WorkflowExecutionResults.AddRange(result.WorkflowExecutionResults); } } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Models/BroadcastWorkflowInboxMessageOptions.cs b/src/modules/Elsa.Workflows.Runtime/Models/BroadcastWorkflowInboxMessageOptions.cs new file mode 100644 index 0000000000..e43cb2e981 --- /dev/null +++ b/src/modules/Elsa.Workflows.Runtime/Models/BroadcastWorkflowInboxMessageOptions.cs @@ -0,0 +1,15 @@ +namespace Elsa.Workflows.Runtime.Models; + +/// +/// Represents the options for broadcasting a workflow inbox message. +/// +public class BroadcastWorkflowInboxMessageOptions +{ + /// + /// Gets or sets a value indicating whether the dispatch should be executed asynchronously. + /// + /// + /// true if the dispatch should be executed asynchronously; otherwise, false. + /// + public bool DispatchAsynchronously { get; set; } = true; +} \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Notifications/WorkflowInboxMessageReceived.cs b/src/modules/Elsa.Workflows.Runtime/Notifications/WorkflowInboxMessageReceived.cs index 075a960e2a..823fc7cd4d 100644 --- a/src/modules/Elsa.Workflows.Runtime/Notifications/WorkflowInboxMessageReceived.cs +++ b/src/modules/Elsa.Workflows.Runtime/Notifications/WorkflowInboxMessageReceived.cs @@ -1,5 +1,7 @@ using Elsa.Mediator.Contracts; using Elsa.Workflows.Runtime.Entities; +using Elsa.Workflows.Runtime.Options; +using Elsa.Workflows.Runtime.Results; namespace Elsa.Workflows.Runtime.Notifications; @@ -7,4 +9,7 @@ namespace Elsa.Workflows.Runtime.Notifications; /// A notification that is sent when a workflow inbox message is received. /// /// The inbox message that was received. -public record WorkflowInboxMessageReceived(WorkflowInboxMessage InboxMessage) : INotification; \ No newline at end of file +public record WorkflowInboxMessageReceived( + WorkflowInboxMessage InboxMessage, + WorkflowInboxMessageDeliveryOptions Options, + ICollection WorkflowExecutionResults) : INotification; \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Options/WorkflowInboxMessageDeliveryOptions.cs b/src/modules/Elsa.Workflows.Runtime/Options/WorkflowInboxMessageDeliveryOptions.cs index 815de54dc3..4b874e2106 100644 --- a/src/modules/Elsa.Workflows.Runtime/Options/WorkflowInboxMessageDeliveryOptions.cs +++ b/src/modules/Elsa.Workflows.Runtime/Options/WorkflowInboxMessageDeliveryOptions.cs @@ -1,7 +1,3 @@ -using Elsa.Mediator; -using Elsa.Mediator.Contracts; -using Elsa.Workflows.Runtime.Notifications; - namespace Elsa.Workflows.Runtime.Options; /// @@ -10,7 +6,7 @@ namespace Elsa.Workflows.Runtime.Options; public class WorkflowInboxMessageDeliveryOptions { /// - /// The strategy to use when publishing the notification. + /// Whether to dispatch the message to the workflow dispatcher or send immediately. /// - public IEventPublishingStrategy EventPublishingStrategy { get; set; } = NotificationStrategy.Background; + public bool DispatchAsynchronously { get; set; } = true; } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Results/DeliverWorkflowInboxMessageResult.cs b/src/modules/Elsa.Workflows.Runtime/Results/DeliverWorkflowInboxMessageResult.cs index a05749e6a3..5a786261fa 100644 --- a/src/modules/Elsa.Workflows.Runtime/Results/DeliverWorkflowInboxMessageResult.cs +++ b/src/modules/Elsa.Workflows.Runtime/Results/DeliverWorkflowInboxMessageResult.cs @@ -3,4 +3,4 @@ namespace Elsa.Workflows.Runtime.Results; /// /// Result of delivering a workflow inbox message. /// -public record DeliverWorkflowInboxMessageResult; \ No newline at end of file +public record DeliverWorkflowInboxMessageResult(ICollection WorkflowExecutionResults); \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowInbox.cs b/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowInbox.cs index 7f2fb4dda6..1b9259f690 100644 --- a/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowInbox.cs +++ b/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowInbox.cs @@ -1,4 +1,5 @@ using Elsa.Common.Contracts; +using Elsa.Mediator; using Elsa.Mediator.Contracts; using Elsa.Workflows.Contracts; using Elsa.Workflows.Runtime.Contracts; @@ -16,6 +17,7 @@ namespace Elsa.Workflows.Runtime.Services; public class DefaultWorkflowInbox : IWorkflowInbox { private readonly IWorkflowDispatcher _workflowDispatcher; + private readonly IWorkflowRuntime _workflowRuntime; private readonly IWorkflowInboxMessageStore _messageStore; private readonly INotificationSender _notificationSender; private readonly ISystemClock _systemClock; @@ -27,6 +29,7 @@ public class DefaultWorkflowInbox : IWorkflowInbox /// public DefaultWorkflowInbox( IWorkflowDispatcher workflowDispatcher, + IWorkflowRuntime workflowRuntime, IWorkflowInboxMessageStore messageStore, INotificationSender notificationSender, ISystemClock systemClock, @@ -34,6 +37,7 @@ public DefaultWorkflowInbox( IBookmarkHasher bookmarkHasher) { _workflowDispatcher = workflowDispatcher; + _workflowRuntime = workflowRuntime; _messageStore = messageStore; _notificationSender = notificationSender; _systemClock = systemClock; @@ -72,10 +76,9 @@ public async ValueTask SubmitAsync(NewWorkflow await _messageStore.SaveAsync(message, cancellationToken); // Send a notification. - var strategy = options.EventPublishingStrategy; var workflowExecutionResults = new List(); - var notification = new WorkflowInboxMessageReceived(message); - await _notificationSender.SendAsync(notification, strategy, cancellationToken); + var notification = new WorkflowInboxMessageReceived(message, options, workflowExecutionResults); + await _notificationSender.SendAsync(notification, NotificationStrategy.Sequential, cancellationToken); // Return the result. return new SubmitWorkflowInboxMessageResult(message, workflowExecutionResults); @@ -84,19 +87,12 @@ public async ValueTask SubmitAsync(NewWorkflow /// public async ValueTask DeliverAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default) { - await ResumeWorkflowsAsync(message, cancellationToken); - return new DeliverWorkflowInboxMessageResult(); + await ResumeWorkflowsAsynchronouslyAsync(message, cancellationToken); + return new DeliverWorkflowInboxMessageResult(new List()); } /// - public async ValueTask BroadcastAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default) - { - await TriggerWorkflowsAsync(message, cancellationToken); - - return new DeliverWorkflowInboxMessageResult(); - } - - private async Task TriggerWorkflowsAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default) + public async ValueTask BroadcastAsync(WorkflowInboxMessage message, BroadcastWorkflowInboxMessageOptions? options, CancellationToken cancellationToken = default) { var activityTypeName = message.ActivityTypeName; var correlationId = message.CorrelationId; @@ -107,8 +103,28 @@ private async Task TriggerWorkflowsAsync(WorkflowInboxMessage message, Cancellat if (workflowInstanceId != null) { - await ResumeWorkflowsAsync(message, cancellationToken); - return; + if (options?.DispatchAsynchronously == true) + { + await ResumeWorkflowsAsynchronouslyAsync(message, cancellationToken); + return new DeliverWorkflowInboxMessageResult(new List()); + } + + var results = await ResumeWorkflowsSynchronouslyAsync(message, cancellationToken); + return new DeliverWorkflowInboxMessageResult(results.ToList()); + } + + if (options?.DispatchAsynchronously == false) + { + var results = await _workflowRuntime.TriggerWorkflowsAsync(activityTypeName, bookmarkPayload, new TriggerWorkflowsOptions + { + CorrelationId = correlationId, + WorkflowInstanceId = workflowInstanceId, + ActivityInstanceId = activityInstanceId, + Input = input, + CancellationTokens = cancellationToken + }); + + return new DeliverWorkflowInboxMessageResult(results.TriggeredWorkflows); } await _workflowDispatcher.DispatchAsync(new DispatchTriggerWorkflowsRequest(activityTypeName, bookmarkPayload) @@ -118,9 +134,11 @@ await _workflowDispatcher.DispatchAsync(new DispatchTriggerWorkflowsRequest(acti ActivityInstanceId = activityInstanceId, Input = input }, cancellationToken); + + return new DeliverWorkflowInboxMessageResult(new List()); } - private async Task ResumeWorkflowsAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default) + private async Task ResumeWorkflowsAsynchronouslyAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default) { var activityTypeName = message.ActivityTypeName; var correlationId = message.CorrelationId; @@ -138,6 +156,25 @@ await _workflowDispatcher.DispatchAsync(new DispatchResumeWorkflowsRequest(activ }, cancellationToken); } + private async Task> ResumeWorkflowsSynchronouslyAsync(WorkflowInboxMessage message, CancellationToken cancellationToken = default) + { + var activityTypeName = message.ActivityTypeName; + var correlationId = message.CorrelationId; + var workflowInstanceId = message.WorkflowInstanceId; + var activityInstanceId = message.ActivityInstanceId; + var bookmarkPayload = message.BookmarkPayload; + var input = message.Input; + + return await _workflowRuntime.ResumeWorkflowsAsync(activityTypeName, bookmarkPayload, new TriggerWorkflowsOptions + { + CorrelationId = correlationId, + WorkflowInstanceId = workflowInstanceId, + ActivityInstanceId = activityInstanceId, + Input = input, + CancellationTokens = cancellationToken + }); + } + /// public async ValueTask> FindManyAsync(WorkflowInboxMessageFilter filter, CancellationToken cancellationToken = default) { diff --git a/src/modules/Elsa.Workflows.Runtime/Services/EventPublisher.cs b/src/modules/Elsa.Workflows.Runtime/Services/EventPublisher.cs index ff6d59e0c6..1acd1159ff 100644 --- a/src/modules/Elsa.Workflows.Runtime/Services/EventPublisher.cs +++ b/src/modules/Elsa.Workflows.Runtime/Services/EventPublisher.cs @@ -31,7 +31,7 @@ public async Task> PublishAsync( IDictionary? input = default, CancellationToken cancellationToken = default) { - return await PublishInternalAsync(eventName, NotificationStrategy.Sequential, correlationId, workflowInstanceId, activityInstanceId, input, cancellationToken); + return await PublishInternalAsync(eventName, false, correlationId, workflowInstanceId, activityInstanceId, input, cancellationToken); } /// @@ -43,12 +43,12 @@ public async Task DispatchAsync( IDictionary? input = default, CancellationToken cancellationToken = default) { - await PublishInternalAsync(eventName, NotificationStrategy.FireAndForget, correlationId, workflowInstanceId, activityInstanceId, input, cancellationToken); + await PublishInternalAsync(eventName, true, correlationId, workflowInstanceId, activityInstanceId, input, cancellationToken); } private async Task> PublishInternalAsync( string eventName, - IEventPublishingStrategy publishingStrategy, + bool dispatchAsynchronously, string? correlationId = default, string? workflowInstanceId = default, string? activityInstanceId = default, @@ -59,7 +59,7 @@ private async Task> PublishInternalAsync( var message = NewWorkflowInboxMessage.For(eventBookmark, workflowInstanceId, correlationId, activityInstanceId, input); var options = new WorkflowInboxMessageDeliveryOptions { - EventPublishingStrategy = publishingStrategy, + DispatchAsynchronously = dispatchAsynchronously }; var result = await _workflowInbox.SubmitAsync(message, options, cancellationToken); From 3b48636ced046ab544da623241b445263b37e218 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 12:12:52 +0100 Subject: [PATCH 14/23] Update FastEndpoints packages to version 5.21.2 The current commit updates the version of all FastEndpoints packages from 5.20.1.7-beta to 5.21.2 in the 'Elsa.Api.Common' project. This ensures we are using the most recent stable release of these packages. #4747 --- src/common/Elsa.Api.Common/Elsa.Api.Common.csproj | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/common/Elsa.Api.Common/Elsa.Api.Common.csproj b/src/common/Elsa.Api.Common/Elsa.Api.Common.csproj index 9c992b1dfa..eab2e3e1be 100644 --- a/src/common/Elsa.Api.Common/Elsa.Api.Common.csproj +++ b/src/common/Elsa.Api.Common/Elsa.Api.Common.csproj @@ -19,9 +19,9 @@ - - - + + + From 70530aa9fea90758b3dffd4e34b265203e71dbde Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 14:44:31 +0100 Subject: [PATCH 15/23] Add decimal check in PolymorphicObjectConverter In the PolymorphicObjectConverter class, the check for primitive types and specific object types was updated to include decimal. Fixes #4714 --- .../Serialization/Converters/PolymorphicObjectConverter.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modules/Elsa.Workflows.Core/Serialization/Converters/PolymorphicObjectConverter.cs b/src/modules/Elsa.Workflows.Core/Serialization/Converters/PolymorphicObjectConverter.cs index 72d2c3183e..0cc00867aa 100644 --- a/src/modules/Elsa.Workflows.Core/Serialization/Converters/PolymorphicObjectConverter.cs +++ b/src/modules/Elsa.Workflows.Core/Serialization/Converters/PolymorphicObjectConverter.cs @@ -158,7 +158,7 @@ public override void Write(Utf8JsonWriter writer, object value, JsonSerializerOp var newOptions = new JsonSerializerOptions(options); var type = value.GetType(); - if (type.IsPrimitive || value is string or DateTimeOffset or DateTime or DateOnly or TimeOnly or JsonElement or Guid or TimeSpan or Uri or Version or Enum) + if (type.IsPrimitive || value is string or decimal or DateTimeOffset or DateTime or DateOnly or TimeOnly or JsonElement or Guid or TimeSpan or Uri or Version or Enum) { // Remove the converter so that we don't end up in an infinite loop. newOptions.Converters.RemoveWhere(x => x is PolymorphicObjectConverterFactory); From 6dee8ee26b340f85af9ea7fa6de63844150ff27c Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 15:16:57 +0100 Subject: [PATCH 16/23] Refactor Dapper workflow and update migrations Modified the store service to optimize the SaveManyAsync method by converting input to list only once. Also, enhanced deletion query in the store service to enable usage of different primary keys. Made changes in the Dapper migrations, replacing "NodeId" with "ActivityNodeId". --- .../Elsa.Dapper.Migrations/Runtime/Initial.cs | 4 ++-- .../Stores/DapperWorkflowInboxMessageStore.cs | 2 +- src/modules/Elsa.Dapper/Services/Store.cs | 22 ++++++++++++------- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/modules/Elsa.Dapper.Migrations/Runtime/Initial.cs b/src/modules/Elsa.Dapper.Migrations/Runtime/Initial.cs index ef7acebe44..04fe015931 100644 --- a/src/modules/Elsa.Dapper.Migrations/Runtime/Initial.cs +++ b/src/modules/Elsa.Dapper.Migrations/Runtime/Initial.cs @@ -61,7 +61,7 @@ public override void Up() .WithColumn("ActivityType").AsString().NotNullable().Indexed() .WithColumn("ActivityTypeVersion").AsInt32().NotNullable().Indexed() .WithColumn("ActivityName").AsString().Nullable().Indexed() - .WithColumn("NodeId").AsString().NotNullable().Indexed() + .WithColumn("ActivityNodeId").AsString().NotNullable().Indexed() .WithColumn("EventName").AsString().Nullable().Indexed() .WithColumn("Message").AsString().Nullable() .WithColumn("Source").AsString().Nullable() @@ -86,7 +86,7 @@ public override void Up() .WithColumn("ActivityType").AsString().NotNullable().Indexed() .WithColumn("ActivityTypeVersion").AsInt32().NotNullable().Indexed() .WithColumn("ActivityName").AsString().Nullable().Indexed() - .WithColumn("NodeId").AsString().NotNullable().Indexed() + .WithColumn("ActivityNodeId").AsString().NotNullable().Indexed() .WithColumn("EventName").AsString().Nullable().Indexed() .WithColumn("Message").AsString().Nullable() .WithColumn("Source").AsString().Nullable() diff --git a/src/modules/Elsa.Dapper/Modules/Runtime/Stores/DapperWorkflowInboxMessageStore.cs b/src/modules/Elsa.Dapper/Modules/Runtime/Stores/DapperWorkflowInboxMessageStore.cs index e8d258d4fa..1420dfbdaf 100644 --- a/src/modules/Elsa.Dapper/Modules/Runtime/Stores/DapperWorkflowInboxMessageStore.cs +++ b/src/modules/Elsa.Dapper/Modules/Runtime/Stores/DapperWorkflowInboxMessageStore.cs @@ -61,7 +61,7 @@ public async ValueTask DeleteManyAsync(WorkflowInboxMessageFilter filter, if (pageArgs == null) return await _store.DeleteAsync(q => ApplyFilter(q, filter), cancellationToken); - return await _store.DeleteAsync(q => ApplyFilter(q, filter), pageArgs, new[] { new OrderField(nameof(WorkflowInboxMessage.CreatedAt), OrderDirection.Ascending) }, cancellationToken); + return await _store.DeleteAsync(q => ApplyFilter(q, filter), pageArgs, new[] { new OrderField(nameof(WorkflowInboxMessage.CreatedAt), OrderDirection.Ascending) }, cancellationToken: cancellationToken); } private void ApplyFilter(ParameterizedQuery query, params WorkflowInboxMessageFilter[] filters) diff --git a/src/modules/Elsa.Dapper/Services/Store.cs b/src/modules/Elsa.Dapper/Services/Store.cs index e5dc08b9ad..eb6fde73c3 100644 --- a/src/modules/Elsa.Dapper/Services/Store.cs +++ b/src/modules/Elsa.Dapper/Services/Store.cs @@ -204,10 +204,15 @@ public async Task SaveAsync(T record, string primaryKey = "Id", CancellationToke /// The cancellation token. public async Task SaveManyAsync(IEnumerable records, string primaryKey = "Id", CancellationToken cancellationToken = default) { + var recordsList = records.ToList(); + + if (!recordsList.Any()) + return; + var query = new ParameterizedQuery(_dbConnectionProvider.Dialect); var currentIndex = 0; - foreach (var record in records) + foreach (var record in recordsList) { var index = currentIndex; query.Upsert(TableName, primaryKey, record, field => $"{field}_{index}"); @@ -217,7 +222,7 @@ public async Task SaveManyAsync(IEnumerable records, string primaryKey = "Id" using var connection = _dbConnectionProvider.GetConnection(); await query.ExecuteAsync(connection); } - + /// /// Adds the specified record. /// @@ -243,26 +248,27 @@ public async Task DeleteAsync(Action filter, Cancellat using var connection = _dbConnectionProvider.GetConnection(); return await query.ExecuteAsync(connection); } - + /// /// Deletes all records matching the specified query. /// /// The conditions to apply to the query. /// The page arguments. /// The fields by which to order the results. + /// The primary key. /// The cancellation token. /// The number of records deleted. - public async Task DeleteAsync(Action filter, PageArgs pageArgs, IEnumerable orderFields, CancellationToken cancellationToken = default) + public async Task DeleteAsync(Action filter, PageArgs pageArgs, IEnumerable orderFields, string primaryKey = "Id", CancellationToken cancellationToken = default) { - var selectQuery = _dbConnectionProvider.CreateQuery().From(TableName, "rowid"); + var selectQuery = _dbConnectionProvider.CreateQuery().From(TableName, primaryKey); filter(selectQuery); selectQuery = selectQuery.OrderBy(orderFields.ToArray()).Page(pageArgs); - + var deleteQuery = _dbConnectionProvider.CreateQuery().Delete(TableName, selectQuery); using var connection = _dbConnectionProvider.GetConnection(); return await deleteQuery.ExecuteAsync(connection); } - + /// /// Returns true if any records match the specified query. /// @@ -276,7 +282,7 @@ public async Task AnyAsync(Action filter, Cancellation using var connection = _dbConnectionProvider.GetConnection(); return await connection.QueryFirstOrDefaultAsync(query.Sql.ToString(), query.Parameters) != null; } - + /// /// Returns the number of records matching the specified query. /// From c135e1ec9112d35cdd45466ba7c923df26febc47 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 15:20:38 +0100 Subject: [PATCH 17/23] Update workflow to use main branch and version 3.1.0 The workflow has been updated to work with updates on the 'main' branch rather than the 'v3.0.1' branch. Also, the version number for the 'VERSION' variable in preview mode has been updated to 3.1.0 from 3.0.1. --- .github/workflows/packages.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/packages.yml b/.github/workflows/packages.yml index 2482632b59..1a24c9b44a 100644 --- a/.github/workflows/packages.yml +++ b/.github/workflows/packages.yml @@ -3,7 +3,7 @@ on: workflow_dispatch: push: branches: - - v3.0.1 + - main release: types: [ prereleased, published ] env: @@ -18,10 +18,10 @@ jobs: steps: - name: Checkout uses: actions/checkout@v3 - - name: Verify commit exists in origin/v3.0.1 + - name: Verify commit exists in origin/main run: | git fetch --no-tags --prune --depth=1 origin +refs/heads/*:refs/remotes/origin/* - git branch --remote --contains | grep origin/v3.0.1 + git branch --remote --contains | grep origin/main - name: Set VERSION variable run: | if [[ "${{ github.ref }}" == refs/tags/* && "${{ github.event_name }}" == "release" && "${{ github.event.action }}" == "published" ]]; then @@ -29,7 +29,7 @@ jobs: TAG_NAME=${TAG_NAME#refs/tags/} # remove the refs/tags/ prefix echo "VERSION=${TAG_NAME}" >> $GITHUB_ENV else - echo "VERSION=3.0.1-preview.${{github.run_number}}" >> $GITHUB_ENV + echo "VERSION=3.1.0-preview.${{github.run_number}}" >> $GITHUB_ENV fi - name: Build run: dotnet build --configuration Release /p:Version=${VERSION} From c3741fb268f95f4fc11ee88ec9fc420ac1491690 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 17:03:14 +0100 Subject: [PATCH 18/23] Remove unnecessary Elsa Server activities Deleted sample files: DataSourceActivity, MyEndpoint, and MyEventWorkflow from the Elsa Server Web bundle as part of our ongoing codebase optimization strategy. These files were no longer required and their removal simplifies our code structure. --- .../Elsa.Server.Web/DataSourceActivity.cs | 30 -------------- src/bundles/Elsa.Server.Web/MyEndpoint.cs | 27 ------------- .../Elsa.Server.Web/MyEventWorkflow.cs | 39 ------------------- 3 files changed, 96 deletions(-) delete mode 100644 src/bundles/Elsa.Server.Web/DataSourceActivity.cs delete mode 100644 src/bundles/Elsa.Server.Web/MyEndpoint.cs delete mode 100644 src/bundles/Elsa.Server.Web/MyEventWorkflow.cs diff --git a/src/bundles/Elsa.Server.Web/DataSourceActivity.cs b/src/bundles/Elsa.Server.Web/DataSourceActivity.cs deleted file mode 100644 index 35b8c1f315..0000000000 --- a/src/bundles/Elsa.Server.Web/DataSourceActivity.cs +++ /dev/null @@ -1,30 +0,0 @@ -using Bogus; -using Elsa.Extensions; -using Elsa.Workflows; -using Elsa.Workflows.Attributes; -using Elsa.Workflows.Models; - -namespace Elsa.Server.Web; - -[Activity("Demo", "Data Source", "Generates a collection of random strings.")] -public class DataSourceActivity : CodeActivity> -{ - [Input(Description = "The number of items to generate.")] - public Input NumberOfItems { get; set; } = default!; - - - protected override void Execute(ActivityExecutionContext context) - { - var randomizer = new Randomizer(); - var randomStrings = new List(); - var numberOfItems = context.Get(NumberOfItems); - - for (var i = 0; i < numberOfItems; i++) - { - var randomString = randomizer.String2(10); - randomStrings.Add(randomString); - } - - Result.Set(context, randomStrings); - } -} \ No newline at end of file diff --git a/src/bundles/Elsa.Server.Web/MyEndpoint.cs b/src/bundles/Elsa.Server.Web/MyEndpoint.cs deleted file mode 100644 index 12ff777c29..0000000000 --- a/src/bundles/Elsa.Server.Web/MyEndpoint.cs +++ /dev/null @@ -1,27 +0,0 @@ -using Elsa.Abstractions; -using Elsa.Workflows.Runtime.Contracts; - -namespace Elsa.Server.Web; - -public class MyEndpoint : ElsaEndpointWithoutRequest -{ - private readonly IEventPublisher _eventPublisher; - - public MyEndpoint(IEventPublisher eventPublisher) - { - _eventPublisher = eventPublisher; - } - - public override void Configure() - { - Get("/my-event-workflow"); - AllowAnonymous(); - } - - public override async Task HandleAsync(CancellationToken ct) - { - Console.WriteLine("Publishing MyEvent"); - var results = await _eventPublisher.PublishAsync("MyEvent", cancellationToken: ct); - Console.WriteLine($"Affected workflows: {results.Count}"); - } -} \ No newline at end of file diff --git a/src/bundles/Elsa.Server.Web/MyEventWorkflow.cs b/src/bundles/Elsa.Server.Web/MyEventWorkflow.cs deleted file mode 100644 index 2439e537cb..0000000000 --- a/src/bundles/Elsa.Server.Web/MyEventWorkflow.cs +++ /dev/null @@ -1,39 +0,0 @@ -using Elsa.Workflows; -using Elsa.Workflows.Activities; -using Elsa.Workflows.Contracts; -using Elsa.Workflows.Runtime.Activities; - -namespace Elsa.Server.Web; - -public class OnMyEventWorkflow : WorkflowBase -{ - protected override void Build(IWorkflowBuilder builder) - { - builder.Version = 1; - builder.Id = "OnMyEventWorkflow"; - builder.Root = new Sequence - { - Activities = - { - new Event("MyEvent") - { - CanStartWorkflow = true - }, - new Inline(async () => - { - // IEventPublisher.PublishAsync returns before this executes - await SomeCallAsync(); - }), - new WriteLine("End of workflow"), - new Finish() - } - }; - } - - private async Task SomeCallAsync() - { - Console.WriteLine("Hello from OnMyEventWorkflow"); - await Task.Delay(1000); - Console.WriteLine("Goodbye from OnMyEventWorkflow"); - } -} \ No newline at end of file From 530d6a7a6d6bed7dc370af5000fd7197e2cabdb7 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 17:10:59 +0100 Subject: [PATCH 19/23] Update package workflow to reference v3.0.1 The GitHub actions workflow has been updated to pull from branch v3.0.1 instead of main. This change affects the commit verification and version setting steps, now using version 3.0.1-preview in the workflow process. --- .github/workflows/packages.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/packages.yml b/.github/workflows/packages.yml index 1a24c9b44a..2482632b59 100644 --- a/.github/workflows/packages.yml +++ b/.github/workflows/packages.yml @@ -3,7 +3,7 @@ on: workflow_dispatch: push: branches: - - main + - v3.0.1 release: types: [ prereleased, published ] env: @@ -18,10 +18,10 @@ jobs: steps: - name: Checkout uses: actions/checkout@v3 - - name: Verify commit exists in origin/main + - name: Verify commit exists in origin/v3.0.1 run: | git fetch --no-tags --prune --depth=1 origin +refs/heads/*:refs/remotes/origin/* - git branch --remote --contains | grep origin/main + git branch --remote --contains | grep origin/v3.0.1 - name: Set VERSION variable run: | if [[ "${{ github.ref }}" == refs/tags/* && "${{ github.event_name }}" == "release" && "${{ github.event.action }}" == "published" ]]; then @@ -29,7 +29,7 @@ jobs: TAG_NAME=${TAG_NAME#refs/tags/} # remove the refs/tags/ prefix echo "VERSION=${TAG_NAME}" >> $GITHUB_ENV else - echo "VERSION=3.1.0-preview.${{github.run_number}}" >> $GITHUB_ENV + echo "VERSION=3.0.1-preview.${{github.run_number}}" >> $GITHUB_ENV fi - name: Build run: dotnet build --configuration Release /p:Version=${VERSION} From 6a11d4e5f444cade37c083dd5882bd958838fed3 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 17:17:10 +0100 Subject: [PATCH 20/23] Update Elsa.Studio packages to version 3.0.1-preview.196 This commit involves updating the versions of `Elsa.Studio`, `Elsa.Studio.Core.BlazorWasm`, and `Elsa.Studio.Login.BlazorWasm` packages in `Elsa.ServerAndStudio.Web.csproj` and `ElsaStudioWebAssembly.csproj` files to 3.0.1-preview.196. This update will incorporate the new changes and improvements included in this newer version. --- .../Elsa.ServerAndStudio.Web.csproj | 4 ++-- .../ElsaStudioWebAssembly/ElsaStudioWebAssembly.csproj | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/bundles/Elsa.ServerAndStudio.Web/Elsa.ServerAndStudio.Web.csproj b/src/bundles/Elsa.ServerAndStudio.Web/Elsa.ServerAndStudio.Web.csproj index c56e03dcd6..a6c4005bb7 100644 --- a/src/bundles/Elsa.ServerAndStudio.Web/Elsa.ServerAndStudio.Web.csproj +++ b/src/bundles/Elsa.ServerAndStudio.Web/Elsa.ServerAndStudio.Web.csproj @@ -34,8 +34,8 @@ - - + + \ No newline at end of file diff --git a/src/bundles/ElsaStudioWebAssembly/ElsaStudioWebAssembly.csproj b/src/bundles/ElsaStudioWebAssembly/ElsaStudioWebAssembly.csproj index 27d7ecd1fb..cd6e143978 100644 --- a/src/bundles/ElsaStudioWebAssembly/ElsaStudioWebAssembly.csproj +++ b/src/bundles/ElsaStudioWebAssembly/ElsaStudioWebAssembly.csproj @@ -19,9 +19,9 @@ - - - + + + From fff67f4084d9988e9ebf14907a4f7e4b0d637a51 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 17:18:11 +0100 Subject: [PATCH 21/23] Update Elsa.Studio package versions The Elsa.Studio and Elsa.Studio.Login.BlazorWasm package versions have been updated in the Elsa.Studio.Web project. Both package versions have been upgraded from 3.0.0 to 3.0.1-preview.196. --- src/bundles/Elsa.Studio.Web/Elsa.Studio.Web.csproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bundles/Elsa.Studio.Web/Elsa.Studio.Web.csproj b/src/bundles/Elsa.Studio.Web/Elsa.Studio.Web.csproj index f3c053fe84..d7ec217e4a 100644 --- a/src/bundles/Elsa.Studio.Web/Elsa.Studio.Web.csproj +++ b/src/bundles/Elsa.Studio.Web/Elsa.Studio.Web.csproj @@ -21,8 +21,8 @@ - - + + From aac94d54f1f77a7ed249ba78e3415a7fe6726313 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 17:22:29 +0100 Subject: [PATCH 22/23] Remove unused Workflow models and simplify AddStorageDriver method The commit removes BackgroundExecutionOutcome and BackgroundExecutionResult models from Elsa.Workflows.Core, as they are no longer in use. Additionally, it simplifies the AddStorageDriver extension method in ModuleExtensions.cs, now it directly adds the service as an implementation of the IStorageDriver interface. --- .../Elsa.Workflows.Core/Extensions/ModuleExtensions.cs | 4 +--- .../Models/BackgroundExecutionOutcome.cs | 3 --- .../Models/BackgroundExecutionResult.cs | 8 -------- 3 files changed, 1 insertion(+), 14 deletions(-) delete mode 100644 src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionOutcome.cs delete mode 100644 src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionResult.cs diff --git a/src/modules/Elsa.Workflows.Core/Extensions/ModuleExtensions.cs b/src/modules/Elsa.Workflows.Core/Extensions/ModuleExtensions.cs index a591ccba71..70f989e5b8 100644 --- a/src/modules/Elsa.Workflows.Core/Extensions/ModuleExtensions.cs +++ b/src/modules/Elsa.Workflows.Core/Extensions/ModuleExtensions.cs @@ -16,8 +16,6 @@ public static IModule UseWorkflows(this IModule configuration, Action(this IServiceCollection services) where T : class, IStorageDriver { - return services - .AddSingleton() - .AddSingleton(); + return services.AddSingleton(); } } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionOutcome.cs b/src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionOutcome.cs deleted file mode 100644 index 8d462bb743..0000000000 --- a/src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionOutcome.cs +++ /dev/null @@ -1,3 +0,0 @@ -namespace Elsa.Workflows.Models; - -public record BackgroundExecutionOutcome(string Name, object? Payload); \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionResult.cs b/src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionResult.cs deleted file mode 100644 index 133d3e6b60..0000000000 --- a/src/modules/Elsa.Workflows.Core/Models/BackgroundExecutionResult.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace Elsa.Workflows.Models; - -public class BackgroundExecutionResult -{ - public ICollection Outcomes { get; set; } = new List(); - public ICollection ExecutionLog { get; set; } = new List(); - public IDictionary JournalData { get; } = new Dictionary(); -} \ No newline at end of file From 30c6479faaf4c0f648227b6a494e6c6bb8cac62f Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Sat, 6 Jan 2024 17:24:18 +0100 Subject: [PATCH 23/23] Remove unnecessary dependencies in DefaultBackgroundActivityInvoker Dependencies on IBookmarksPersister and IWorkflowStateExtractor have been removed in the DefaultBackgroundActivityInvoker.cs file. Additionally, an unused `using` statement for Elsa.Workflows.Helpers has been eliminated. This commit aims to declutter the code and increase its maintainability by eliminating unnecessary dependencies. --- .../Services/DefaultBackgroundActivityInvoker.cs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs b/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs index 372271f225..1e46a389e5 100644 --- a/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs +++ b/src/modules/Elsa.Workflows.Runtime/Services/DefaultBackgroundActivityInvoker.cs @@ -1,7 +1,6 @@ using System.Text.Json; using Elsa.Common.Models; using Elsa.Workflows.Contracts; -using Elsa.Workflows.Helpers; using Elsa.Workflows.Management.Contracts; using Elsa.Workflows.Memory; using Elsa.Workflows.Models; @@ -24,8 +23,6 @@ public class DefaultBackgroundActivityInvoker : IBackgroundActivityInvoker private readonly IWorkflowDefinitionService _workflowDefinitionService; private readonly IVariablePersistenceManager _variablePersistenceManager; private readonly IActivityInvoker _activityInvoker; - private readonly IBookmarksPersister _bookmarksPersister; - private readonly IWorkflowStateExtractor _workflowStateExtractor; private readonly IServiceProvider _serviceProvider; private readonly ILogger _logger; @@ -38,8 +35,6 @@ public DefaultBackgroundActivityInvoker( IWorkflowDefinitionService workflowDefinitionService, IVariablePersistenceManager variablePersistenceManager, IActivityInvoker activityInvoker, - IBookmarksPersister bookmarksPersister, - IWorkflowStateExtractor workflowStateExtractor, IServiceProvider serviceProvider, ILogger logger) { @@ -48,8 +43,6 @@ public DefaultBackgroundActivityInvoker( _workflowDefinitionService = workflowDefinitionService; _variablePersistenceManager = variablePersistenceManager; _activityInvoker = activityInvoker; - _bookmarksPersister = bookmarksPersister; - _workflowStateExtractor = workflowStateExtractor; _serviceProvider = serviceProvider; _logger = logger; } @@ -58,7 +51,6 @@ public DefaultBackgroundActivityInvoker( public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundActivity, CancellationToken cancellationToken = default) { var workflowInstanceId = scheduledBackgroundActivity.WorkflowInstanceId; - var workflowState = await _workflowRuntime.ExportWorkflowStateAsync(workflowInstanceId, cancellationToken); if (workflowState == null)