Skip to content

Commit

Permalink
Add background execution handling to activity context
Browse files Browse the repository at this point in the history
This commit adds the ability to manage the background execution state directly within the activity execution context. This includes adding methods to set and verify the background execution state, and modifying the existing code to use these new methods. A method for handling activity scheduling during background execution has also been started, but its implementation is not finished yet. The HTTP Request activities were updated accordingly to reflect these changes.
  • Loading branch information
sfmskywalker committed Jan 5, 2024
1 parent 853355b commit 2b6294b
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 21 deletions.
3 changes: 2 additions & 1 deletion src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ namespace Elsa.Http;
/// <summary>
/// Send an HTTP request.
/// </summary>
[Activity("Elsa", "HTTP", "Send an HTTP request.", DisplayName = "HTTP Request (flow)", Kind = ActivityKind.Action)]
[Activity("Elsa", "HTTP", "Send an HTTP request. This activity yis deprecated in favor of the SendHttpRequestTask activity", DisplayName = "HTTP Request (flow) [Deprecated] ", Kind = ActivityKind.Task)]
[Obsolete("Use SendHttpRequestTask instead.")]
public class FlowSendHttpRequest : SendHttpRequestBase, IActivityPropertyDefaultValueProvider
{
/// <inheritdoc />
Expand Down
2 changes: 1 addition & 1 deletion src/modules/Elsa.Http/Activities/SendHttpRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Elsa.Http;
/// <summary>
/// Send an HTTP request.
/// </summary>
[Activity("Elsa", "HTTP", "Send an HTTP request.", DisplayName = "HTTP Request", Kind = ActivityKind.Action)]
[Activity("Elsa", "HTTP", "Send an HTTP request.", DisplayName = "HTTP Request", Kind = ActivityKind.Task)]
public class SendHttpRequest : SendHttpRequestBase
{
/// <inheritdoc />
Expand Down
20 changes: 10 additions & 10 deletions src/modules/Elsa.Http/Activities/SendHttpRequestTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Elsa.Http;
/// <summary>
/// Sends HTTP requests from a background task.
/// </summary>
[Activity("Elsa", "HTTP", "Send an HTTP request from a background task.", DisplayName = "HTTP Request Task", Kind = ActivityKind.Job)]
[Activity("Elsa", "HTTP", "Send an HTTP request from a background task.", DisplayName = "HTTP Request Task", Kind = ActivityKind.Task)]
public class SendHttpRequestTask : Activity<HttpStatusCode>
{
/// <inheritdoc />
Expand Down Expand Up @@ -115,23 +115,23 @@ private async Task TrySendAsync(ActivityExecutionContext context)
context.SetResult(response.StatusCode);
context.Set(ParsedContent, parsedContent);

HandleResponse(context, response);
await HandleResponseAsync(context, response);
}
catch (HttpRequestException e)
{
context.AddExecutionLogEntry("Error", e.Message, payload: new { StackTrace = e.StackTrace });
context.JournalData.Add("Error", e.Message);
HandleRequestException(context, e);
await HandleRequestExceptionAsync(context, e);
}
catch (TaskCanceledException e)
{
context.AddExecutionLogEntry("Error", e.Message, payload: new { StackTrace = e.StackTrace });
context.JournalData.Add("Cancelled", true);
HandleTaskCanceledException(context, e);
await HandleTaskCanceledExceptionAsync(context, e);
}
}

private void HandleResponse(ActivityExecutionContext context, HttpResponseMessage response)
private async Task HandleResponseAsync(ActivityExecutionContext context, HttpResponseMessage response)
{
var expectedStatusCodes = ExpectedStatusCodes.GetOrDefault(context) ?? new List<int>(0);
var statusCode = (int)response.StatusCode;
Expand All @@ -144,17 +144,17 @@ private void HandleResponse(ActivityExecutionContext context, HttpResponseMessag

outcomes.Add("Done");
context.JournalData["StatusCode"] = statusCode;
context.SetBackgroundOutcomes(outcomes);
await context.CompleteActivityWithOutcomesAsync(outcomes.ToArray());
}

private void HandleRequestException(ActivityExecutionContext context, HttpRequestException exception)
private async Task HandleRequestExceptionAsync(ActivityExecutionContext context, HttpRequestException exception)
{
context.SetBackgroundOutcomes(new[] { "Failed to connect" });
await context.CompleteActivityWithOutcomesAsync("Failed to connect");
}

private void HandleTaskCanceledException(ActivityExecutionContext context, TaskCanceledException exception)
private async Task HandleTaskCanceledExceptionAsync(ActivityExecutionContext context, TaskCanceledException exception)
{
context.SetBackgroundOutcomes(new[] { "Timeout" });
await context.CompleteActivityWithOutcomesAsync("Timeout");
}

private async Task<object?> ParseContentAsync(ActivityExecutionContext context, HttpContent httpContent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public async ValueTask InvokeAsync(ActivityExecutionContext context)
}

// Check if this is a background execution.
var isBackgroundExecution = context.TransientProperties.GetValueOrDefault<object, bool>(BackgroundActivityInvokerMiddleware.IsBackgroundExecution);
var isBackgroundExecution = context.GetIsBackgroundExecution();

// Is the activity configured to load the context?
foreach (var providerType in providerTypes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ public async ValueTask ScheduleActivityAsync(IActivity? activity, ActivityExecut
/// <param name="options">The options used to schedule the activity.</param>
public async ValueTask ScheduleActivityAsync(ActivityNode? activityNode, ActivityExecutionContext? owner = default, ScheduleWorkOptions? options = default)
{
if (this.GetIsBackgroundExecution())
{
// TODO: Capture the information in a serializable format and store it in the workflow execution context Properties dictionary.
// The information should be stored in a way that allows the workflow execution context to resume the activity execution context.
}

var completionCallback = options?.CompletionCallback;
owner ??= this;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,16 @@ public static async ValueTask CompleteActivityAsync(this ActivityCompletedContex
/// </summary>
public static async ValueTask CompleteActivityAsync(this ActivityExecutionContext context, object? result = default)
{
var outcomes = result as Outcomes;

// If the activity is executing in the background, simply capture the result and return.
if (context.GetIsBackgroundExecution())
{
if (outcomes != null)
context.SetBackgroundOutcomes(outcomes.Names);
return;
}

// If the activity is not running, do nothing.
if (context.Status != ActivityStatus.Running)
return;
Expand All @@ -470,7 +480,7 @@ public static async ValueTask CompleteActivityAsync(this ActivityExecutionContex
context.Status = ActivityStatus.Completed;

// Record the outcomes, if any.
if (result is Outcomes outcomes)
if (outcomes != null)
context.JournalData["Outcomes"] = outcomes.Names;

// Record the output, if any.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,27 @@ namespace Elsa.Workflows;
/// </summary>
public static class BackgroundActivityExecutionContextExtensions
{
/// <summary>
/// A key into the activity execution context's transient properties that indicates whether the current activity is being executed in the background.
/// </summary>
public static readonly object IsBackgroundExecution = new();

/// <summary>
/// Configures the activity execution context to execute the current activity in the background.
/// </summary>
public static void SetIsBackgroundExecution(this ActivityExecutionContext activityExecutionContext, bool value = true)
{
activityExecutionContext.TransientProperties[IsBackgroundExecution] = value;
}

/// <summary>
/// Gets a value indicating whether the current activity is being executed in the background.
/// </summary>
public static bool GetIsBackgroundExecution(this ActivityExecutionContext activityExecutionContext)
{
return activityExecutionContext.TransientProperties.ContainsKey(IsBackgroundExecution);
}

/// <summary>
/// Sets the background outcomes.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ namespace Elsa.Workflows.Runtime.Middleware.Activities;
/// </summary>
public class BackgroundActivityInvokerMiddleware : DefaultActivityInvokerMiddleware
{
/// <summary>
/// A key into the activity execution context's transient properties that indicates whether the current activity is being executed in the background.
/// </summary>
public static readonly object IsBackgroundExecution = new();

internal static string GetBackgroundActivityOutputKey(string activityNodeId) => $"__BackgroundActivityOutput:{activityNodeId}";
internal static string GetBackgroundActivityOutcomesKey(string activityNodeId) => $"__BackgroundActivityOutcomes:{activityNodeId}";
internal static string GetBackgroundActivityJournalDataKey(string activityNodeId) => $"__BackgroundActivityJournalData:{activityNodeId}";
Expand Down Expand Up @@ -66,7 +61,7 @@ private static bool GetShouldRunInBackground(ActivityExecutionContext context)
&& (kind is ActivityKind.Job || (kind == ActivityKind.Task && activity.GetRunAsynchronously()));
}

private static bool GetIsBackgroundExecution(ActivityExecutionContext context) => context.TransientProperties.ContainsKey(IsBackgroundExecution);
private static bool GetIsBackgroundExecution(ActivityExecutionContext context) => context.TransientProperties.ContainsKey(BackgroundActivityExecutionContextExtensions.IsBackgroundExecution);

/// <summary>
/// Schedules the current activity for execution in the background.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc
await _variablePersistenceManager.LoadVariablesAsync(workflowExecutionContext);

// Mark the activity as being invoked from a background worker.
activityExecutionContext.TransientProperties[BackgroundActivityInvokerMiddleware.IsBackgroundExecution] = true;
activityExecutionContext.SetIsBackgroundExecution();

// Invoke the activity.
await _activityInvoker.InvokeAsync(activityExecutionContext);
Expand Down

0 comments on commit 2b6294b

Please sign in to comment.