Skip to content

Commit

Permalink
Add scheduling function for background activities
Browse files Browse the repository at this point in the history
This commit achieves two main goals. Firstly, it introduces two new classes called ScheduledActivity and ScheduledActivityOptions to store scheduled activities' information. Secondly, it modifies how activities are executed in the background by capturing the scheduling information as a serializable format and storing it in the workflow execution context properties dictionary. This change allows the workflow execution context to resume the activity execution context.
  • Loading branch information
sfmskywalker committed Jan 5, 2024
1 parent 2b6294b commit a950a6d
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,27 @@ public async ValueTask ScheduleActivityAsync(ActivityNode? activityNode, Activit
{
if (this.GetIsBackgroundExecution())
{
// TODO: Capture the information in a serializable format and store it in the workflow execution context Properties dictionary.
// The information should be stored in a way that allows the workflow execution context to resume the activity execution context.
var scheduledActivity = new ScheduledActivity
{
ActivityNodeId = activityNode?.NodeId,
OwnerActivityInstanceId = owner?.Id,
Options = options != null ? new ScheduledActivityOptions
{
CompletionCallback = options?.CompletionCallback?.Method.Name,
Tag = options?.Tag,
ExistingActivityInstanceId = options?.ExistingActivityExecutionContext?.Id,
PreventDuplicateScheduling = options?.PreventDuplicateScheduling ?? false,
Variables = options?.Variables?.ToList(),
Input = options?.Input
} : default
};

var scheduledActivities = this.GetBackgroundScheduledActivities().ToList();
scheduledActivities.Add(scheduledActivity);
this.SetBackgroundScheduledActivities(scheduledActivities);
return;
}

var completionCallback = options?.CompletionCallback;
owner ??= this;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using Elsa.Workflows.Models;

namespace Elsa.Workflows;

/// <summary>
Expand Down Expand Up @@ -42,4 +44,22 @@ public static IEnumerable<string> GetBackgroundOutcomes(this ActivityExecutionCo
{
return activityExecutionContext.GetProperty<IEnumerable<string>>("BackgroundOutcomes") ?? Enumerable.Empty<string>();
}

/// <summary>
/// Sets the background scheduled activities.
/// </summary>
public static void SetBackgroundScheduledActivities(this ActivityExecutionContext activityExecutionContext, IEnumerable<ScheduledActivity> scheduledActivities)
{
var scheduledActivitiesList = scheduledActivities.ToList();
activityExecutionContext.SetProperty("BackgroundScheduledActivities", scheduledActivitiesList);
}

/// <summary>
/// Gets the background scheduled activities.
/// </summary>
/// <param name="activityExecutionContext"></param>
public static IEnumerable<ScheduledActivity> GetBackgroundScheduledActivities(this ActivityExecutionContext activityExecutionContext)
{
return activityExecutionContext.GetProperty<IEnumerable<ScheduledActivity>>("BackgroundScheduledActivities") ?? Enumerable.Empty<ScheduledActivity>();
}
}
8 changes: 8 additions & 0 deletions src/modules/Elsa.Workflows.Core/Models/ScheduledActivity.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Elsa.Workflows.Models;

public class ScheduledActivity
{
public string? ActivityNodeId { get; set; }
public string? OwnerActivityInstanceId { get; set; }
public ScheduledActivityOptions? Options { get; set; }
}
13 changes: 13 additions & 0 deletions src/modules/Elsa.Workflows.Core/Models/ScheduledActivityOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Elsa.Workflows.Memory;

namespace Elsa.Workflows.Models;

public class ScheduledActivityOptions
{
public string? CompletionCallback { get; set; }
public object? Tag { get; set; }
public ICollection<Variable>? Variables { get; set; }
public string? ExistingActivityInstanceId { get; set; }
public bool PreventDuplicateScheduling { get; set; }
public IDictionary<string,object>? Input { get; set; }
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Text.Json;
using Elsa.Extensions;
using Elsa.Workflows.Middleware.Activities;
using Elsa.Workflows.Models;
using Elsa.Workflows.Options;
using Elsa.Workflows.Pipelines.ActivityExecution;
using Elsa.Workflows.Runtime.Bookmarks;
using Elsa.Workflows.Runtime.Middleware.Workflows;
Expand All @@ -17,6 +19,7 @@ public class BackgroundActivityInvokerMiddleware : DefaultActivityInvokerMiddlew
internal static string GetBackgroundActivityOutputKey(string activityNodeId) => $"__BackgroundActivityOutput:{activityNodeId}";
internal static string GetBackgroundActivityOutcomesKey(string activityNodeId) => $"__BackgroundActivityOutcomes:{activityNodeId}";
internal static string GetBackgroundActivityJournalDataKey(string activityNodeId) => $"__BackgroundActivityJournalData:{activityNodeId}";
internal static string GetBackgroundActivityScheduledActivitiesKey(string activityNodeId) => $"__BackgroundActivityScheduledActivities:{activityNodeId}";
internal static readonly object BackgroundActivitySchedulesKey = new();
internal const string BackgroundActivityBookmarkName = "BackgroundActivity";

Expand All @@ -42,7 +45,8 @@ protected override async ValueTask ExecuteActivityAsync(ActivityExecutionContext
{
CaptureOutputIfAny(context);
CaptureJournalData(context);
await CompleteBackgroundActivityAsync(context);
await CompleteBackgroundActivityOutcomesAsync(context);
await CompleteBackgroundActivityScheduledActivitiesAsync(context);
}
}
}
Expand Down Expand Up @@ -86,10 +90,10 @@ private static void CaptureOutputIfAny(ActivityExecutionContext context)
var activity = context.Activity;
var inputKey = GetBackgroundActivityOutputKey(activity.NodeId);
var capturedOutput = context.WorkflowExecutionContext.GetProperty<IDictionary<string, object>>(inputKey);
if(capturedOutput == null)

if (capturedOutput == null)
return;

foreach (var outputEntry in capturedOutput)
{
var outputDescriptor = context.ActivityDescriptor.Outputs.FirstOrDefault(x => x.Name == outputEntry.Key);
Expand All @@ -101,7 +105,7 @@ private static void CaptureOutputIfAny(ActivityExecutionContext context)
context.Set(output, outputEntry.Value);
}
}

private void CaptureJournalData(ActivityExecutionContext context)
{
var activity = context.Activity;
Expand All @@ -114,18 +118,49 @@ private void CaptureJournalData(ActivityExecutionContext context)
foreach (var journalEntry in journalData)
context.JournalData[journalEntry.Key] = journalEntry.Value;
}
private async Task CompleteBackgroundActivityAsync(ActivityExecutionContext context)

private async Task CompleteBackgroundActivityOutcomesAsync(ActivityExecutionContext context)
{
var outcomesKey = GetBackgroundActivityOutcomesKey(context.NodeId);
var outcomes = context.WorkflowExecutionContext.GetProperty<ICollection<string>>(outcomesKey);

if (outcomes != null)
{
await context.CompleteActivityWithOutcomesAsync(outcomes.ToArray());

// Remove the outcomes from the workflow execution context.
context.WorkflowExecutionContext.Properties.Remove(outcomesKey);
}
}

private async Task CompleteBackgroundActivityScheduledActivitiesAsync(ActivityExecutionContext context)
{
var scheduledActivitiesKey = GetBackgroundActivityScheduledActivitiesKey(context.NodeId);
var scheduledActivitiesJson = context.WorkflowExecutionContext.GetProperty<string>(scheduledActivitiesKey);
var scheduledActivities = scheduledActivitiesJson != null ? JsonSerializer.Deserialize<ICollection<ScheduledActivity>>(scheduledActivitiesJson) : null;

if (scheduledActivities != null)
{
foreach (var scheduledActivity in scheduledActivities)
{
var activityNode = scheduledActivity.ActivityNodeId != null ? context.WorkflowExecutionContext.FindActivityByNodeId(scheduledActivity.ActivityNodeId) : null;
var owner = scheduledActivity.OwnerActivityInstanceId != null ? context.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x.Id == scheduledActivity.OwnerActivityInstanceId) : null;
var options = scheduledActivity.Options != null
? new ScheduleWorkOptions
{
ExistingActivityExecutionContext = scheduledActivity.Options.ExistingActivityInstanceId != null ? context.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x.Id == scheduledActivity.Options.ExistingActivityInstanceId) : null,
Variables = scheduledActivity.Options?.Variables,
CompletionCallback = !string.IsNullOrEmpty(scheduledActivity.Options?.CompletionCallback) && owner != null ? owner.Activity.GetActivityCompletionCallback(scheduledActivity.Options.CompletionCallback) : default,
PreventDuplicateScheduling = scheduledActivity.Options?.PreventDuplicateScheduling ?? false,
Input = scheduledActivity.Options?.Input,
Tag = scheduledActivity.Options?.Tag
}
: default;
await context.ScheduleActivityAsync(activityNode, owner, options);
}

// Remove the scheduled activities from the workflow execution context.
context.WorkflowExecutionContext.Properties.Remove(scheduledActivitiesKey);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Text.Json;
using Elsa.Common.Models;
using Elsa.Workflows.Contracts;
using Elsa.Workflows.Helpers;
Expand Down Expand Up @@ -57,7 +58,7 @@ public class DefaultBackgroundActivityInvoker : IBackgroundActivityInvoker
public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundActivity, CancellationToken cancellationToken = default)
{
var workflowInstanceId = scheduledBackgroundActivity.WorkflowInstanceId;

var workflowState = await _workflowRuntime.ExportWorkflowStateAsync(workflowInstanceId, cancellationToken);

if (workflowState == null)
Expand Down Expand Up @@ -85,7 +86,6 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc
// Capture any activity output produced by the activity (but only if the associated memory block is stored in the workflow itself).
var outputDescriptors = activityExecutionContext.ActivityDescriptor.Outputs;
var outputValues = new Dictionary<string, object>();
var outcomes = activityExecutionContext.GetBackgroundOutcomes().ToList();

foreach (var outputDescriptor in outputDescriptors)
{
Expand All @@ -112,19 +112,23 @@ public async Task ExecuteAsync(ScheduledBackgroundActivity scheduledBackgroundAc
outputValues[outputDescriptor.Name] = outputValue;
}

// Resume the workflow, passing along the activity output.
// Resume the workflow, passing along activity output, outcomes and scheduled activities.
var bookmarkId = scheduledBackgroundActivity.BookmarkId;
var inputKey = BackgroundActivityInvokerMiddleware.GetBackgroundActivityOutputKey(activityNodeId);
var outcomesKey = BackgroundActivityInvokerMiddleware.GetBackgroundActivityOutcomesKey(activityNodeId);
var journalDataKey = BackgroundActivityInvokerMiddleware.GetBackgroundActivityJournalDataKey(activityNodeId);
var scheduledActivitiesKey = BackgroundActivityInvokerMiddleware.GetBackgroundActivityScheduledActivitiesKey(activityNodeId);
var outcomes = activityExecutionContext.GetBackgroundOutcomes().ToList();
var scheduledActivities = activityExecutionContext.GetBackgroundScheduledActivities().ToList();

var dispatchRequest = new DispatchWorkflowInstanceRequest
{
InstanceId = workflowInstanceId,
BookmarkId = bookmarkId,
InstanceId = workflowInstanceId,
BookmarkId = bookmarkId,
Properties = new Dictionary<string, object>
{
[outcomesKey] = outcomes,
[scheduledActivitiesKey] = JsonSerializer.Serialize(scheduledActivities),
[inputKey] = outputValues,
[journalDataKey] = activityExecutionContext.JournalData
}
Expand Down

0 comments on commit a950a6d

Please sign in to comment.