Skip to content

Commit

Permalink
Add Bulk Dispatch Workflow activity (elsa-workflows#4683)
Browse files Browse the repository at this point in the history
* Initial version of BulkDispatchWorkflow

* Update comments

* Refactor options models for running workflows and add Properties

* Fix resumption

* Add ability to provide arguments when evaluating expressions

* Update args syntax for JS

* Fix ProtoActor Runtime Properties mapping

* Use default constructor

* Provide completed workflow instance ID as input to BulkDispatchWorkflows

* Implement ChildFaulted port

* Switch to Publish to make Memory service bus work

* Update input evaluation and activity output retrieval

Refined the input evaluation process to only consider inputs with AutoEvaluate set to true. Added a comment to clarify the purpose of the GetActivitiesWithOutputs method. Removed the unnecessary check for AutoEvaluate from the EvaluateInputPropertyAsync method.
  • Loading branch information
sfmskywalker committed Dec 6, 2023
1 parent 0146a8a commit 683f0ee
Show file tree
Hide file tree
Showing 94 changed files with 1,375 additions and 584 deletions.
3 changes: 2 additions & 1 deletion Elsa.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
<s:Boolean x:Key="/Default/UserDictionary/Words/=materializers/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Populator/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Postgre/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=startable/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
<s:Boolean x:Key="/Default/UserDictionary/Words/=startable/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Telnyx/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
4 changes: 2 additions & 2 deletions src/bundles/Elsa.WorkflowServer.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
const bool useMongoDb = false;
const bool useSqlServer = false;
const bool useDapper = false;
const bool useProtoActor = false;
const bool useProtoActor = true;
const bool useHangfire = false;
const bool useQuartz = true;
const bool useMassTransitAzureServiceBus = true;
const bool useMassTransitAzureServiceBus = false;
const bool useMassTransitRabbitMq = false;

var builder = WebApplication.CreateBuilder(args);
Expand Down
8 changes: 6 additions & 2 deletions src/common/Elsa.Testing.Shared/ServiceProviderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,19 @@ public static async Task<WorkflowDefinition> ImportWorkflowDefinitionAsync(this
/// <returns>The workflow state.</returns>
public static async Task<WorkflowState> RunWorkflowUntilEndAsync(this IServiceProvider services, string workflowDefinitionId, IDictionary<string, object>? input = default)
{
var startWorkflowOptions = new StartWorkflowRuntimeOptions(null, input, VersionOptions.Published);
var startWorkflowOptions = new StartWorkflowRuntimeOptions
{
Input = input,
VersionOptions = VersionOptions.Published
};
var workflowRuntime = services.GetRequiredService<IWorkflowRuntime>();
var result = await workflowRuntime.StartWorkflowAsync(workflowDefinitionId, startWorkflowOptions);
var bookmarks = new Stack<Bookmark>(result.Bookmarks);

// Continue resuming the workflow for as long as there are bookmarks to resume and the workflow is not Finished.
while (result.Status != WorkflowStatus.Finished && bookmarks.TryPop(out var bookmark))
{
var resumeOptions = new ResumeWorkflowRuntimeOptions(BookmarkId: bookmark.Id);
var resumeOptions = new ResumeWorkflowRuntimeOptions { BookmarkId = bookmark.Id };
var resumeResult = await workflowRuntime.ResumeWorkflowAsync(result.WorkflowInstanceId, resumeOptions);

foreach (var newBookmark in resumeResult!.Bookmarks)
Expand Down
4 changes: 3 additions & 1 deletion src/modules/Elsa.CSharp/Activities/RunCSharp/RunCSharp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Elsa.CSharp.Contracts;
using Elsa.CSharp.Extensions;
using Elsa.CSharp.Models;
using Elsa.Expressions.Models;
using Elsa.Extensions;
using Elsa.Workflows.Core;
using Elsa.Workflows.Core.Attributes;
Expand Down Expand Up @@ -56,7 +57,8 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context
var evaluator = context.GetRequiredService<ICSharpEvaluator>();

// Run the script.
var result = await evaluator.EvaluateAsync(script, typeof(object), context.ExpressionExecutionContext, context.CancellationToken);
var options = new ExpressionEvaluatorOptions();
var result = await evaluator.EvaluateAsync(script, typeof(object), context.ExpressionExecutionContext, options, context.CancellationToken);

// Set the result as output, if any.
if (result is not null)
Expand Down
2 changes: 2 additions & 0 deletions src/modules/Elsa.CSharp/Contracts/ICSharpEvaluator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public interface ICSharpEvaluator
/// <param name="expression">The expression to evaluate.</param>
/// <param name="returnType">The type of the return value.</param>
/// <param name="context">The context in which the expression is evaluated.</param>
/// <param name="options">A set of options.</param>
/// <param name="configureScriptOptions">An optional callback to configure the script options.</param>
/// <param name="configureScript">An optional callback to configure the script.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
Expand All @@ -24,6 +25,7 @@ public interface ICSharpEvaluator
string expression,
Type returnType,
ExpressionExecutionContext context,
ExpressionEvaluatorOptions options,
Func<ScriptOptions, ScriptOptions>? configureScriptOptions = default,
Func<Script<object>, Script<object>>? configureScript = default,
CancellationToken cancellationToken = default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Elsa.Expressions.Contracts;
using Elsa.Expressions.Helpers;
using Elsa.Expressions.Models;
using Microsoft.CodeAnalysis.Scripting;

namespace Elsa.CSharp.Expressions;

Expand All @@ -21,9 +22,9 @@ public CSharpExpressionHandler(ICSharpEvaluator cSharpEvaluator)
}

/// <inheritdoc />
public async ValueTask<object?> EvaluateAsync(Expression expression, Type returnType, ExpressionExecutionContext context)
public async ValueTask<object?> EvaluateAsync(Expression expression, Type returnType, ExpressionExecutionContext context, ExpressionEvaluatorOptions options)
{
var script = expression.Value.ConvertTo<string>() ?? "";
return await _cSharpEvaluator.EvaluateAsync(script, returnType, context);
return await _cSharpEvaluator.EvaluateAsync(script, returnType, context, options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ public static class CSharpEvaluatorExtensions
/// <param name="expression">The expression to evaluate.</param>
/// <param name="returnType">The type of the return value.</param>
/// <param name="context">The context in which the expression is evaluated.</param>
/// <param name="options">An set of options.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The result of the evaluation.</returns>
public static async Task<object?> EvaluateAsync(this ICSharpEvaluator evaluator,
string expression,
Type returnType,
ExpressionExecutionContext context,
ExpressionEvaluatorOptions options,
CancellationToken cancellationToken = default)
{
return await evaluator.EvaluateAsync(expression, returnType, context, null, null, cancellationToken);
return await evaluator.EvaluateAsync(expression, returnType, context, options, null, null, cancellationToken);
}
}
45 changes: 45 additions & 0 deletions src/modules/Elsa.CSharp/Handlers/GenerateArgumentAccessors.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System.Text;
using Elsa.CSharp.Notifications;
using Elsa.Expressions.Models;
using Elsa.Extensions;
using Elsa.Mediator.Contracts;
using JetBrains.Annotations;

namespace Elsa.CSharp.Handlers;

/// <summary>
/// Configures the C# evaluator with methods to access workflow variables.
/// </summary>
[UsedImplicitly]
public class GenerateArgumentAccessors : INotificationHandler<EvaluatingCSharp>
{
/// <inheritdoc />
public Task HandleAsync(EvaluatingCSharp notification, CancellationToken cancellationToken)
{
var arguments = notification.Options.Arguments;
var sb = new StringBuilder();
sb.AppendLine("public partial class ArgumentsProxy {");
sb.AppendLine("\tpublic ArgumentsProxy(IDictionary<string, object> arguments) => Arguments = arguments;");
sb.AppendLine("\tpublic IDictionary<string, object> Arguments { get; }");
sb.AppendLine();
sb.AppendLine("\tpublic T? Get<T>(string name) => Arguments.TryGetValue(name, out var v) ? (T?)v : default;");
sb.AppendLine();
foreach (var argument in arguments)
{
// var d = new Dictionary<string, object>();
// d.TryGetValue("", out var f);
var argumentName = argument.Key;
var variableType = argument.Value.GetType();
var friendlyTypeName = variableType.GetFriendlyTypeName(Brackets.Angle);
sb.AppendLine($"\tpublic {friendlyTypeName} {argumentName}");
sb.AppendLine("\t{");
sb.AppendLine($"\t\tget => Get<{friendlyTypeName}>(\"{argumentName}\");");
sb.AppendLine("\t}");
}

sb.AppendLine("}");
sb.AppendLine("var Args = new ArgumentsProxy(Arguments);");
notification.AppendScript(sb.ToString());
return Task.CompletedTask;
}
}
47 changes: 47 additions & 0 deletions src/modules/Elsa.CSharp/Handlers/GenerateWorkflowInputAccessors.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System.Text;
using Elsa.CSharp.Notifications;
using Elsa.Expressions.Models;
using Elsa.Extensions;
using Elsa.Mediator.Contracts;
using Humanizer;
using JetBrains.Annotations;

namespace Elsa.CSharp.Handlers;

/// <summary>
/// Configures the C# evaluator with methods to access workflow variables.
/// </summary>
[UsedImplicitly]
public class GenerateWorkflowInputAccessors : INotificationHandler<EvaluatingCSharp>
{
/// <inheritdoc />
public Task HandleAsync(EvaluatingCSharp notification, CancellationToken cancellationToken)
{
var expressionExecutionContext = notification.Context;
var workflowInputs = expressionExecutionContext.GetWorkflowInputs().ToList();
var workflow = expressionExecutionContext.GetWorkflowExecutionContext().Workflow;
var inputDefinitions = workflow.Inputs.ToList();
var sb = new StringBuilder();
sb.AppendLine("public partial class WorkflowInputsProxy {");
sb.AppendLine("\tpublic WorkflowInputsProxy(ExecutionContextProxy executionContext) => ExecutionContext = executionContext;");
sb.AppendLine("\tpublic ExecutionContextProxy ExecutionContext { get; }");
sb.AppendLine();
sb.AppendLine("\tpublic T? Get<T>(string name) => ExecutionContext.GetInput<T>(name);");
sb.AppendLine();
foreach (var inputDefinition in inputDefinitions)
{
var inputName = inputDefinition.Name;
var variableType = inputDefinition.Type;
var friendlyTypeName = variableType.GetFriendlyTypeName(Brackets.Angle);
sb.AppendLine($"\tpublic {friendlyTypeName} {inputName}");
sb.AppendLine("\t{");
sb.AppendLine($"\t\tget => Get<{friendlyTypeName}>(\"{inputName}\");");
sb.AppendLine("\t}");
}

sb.AppendLine("}");
sb.AppendLine("var Inputs = new WorkflowInputsProxy(ExecutionContext);");
notification.AppendScript(sb.ToString());
return Task.CompletedTask;
}
}
5 changes: 5 additions & 0 deletions src/modules/Elsa.CSharp/Models/ExecutionContextProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ public ExecutionContextProxy(ExpressionExecutionContext expressionExecutionConte
/// Sets the value of the specified variable.
/// </summary>
public void SetVariable(string name, object? value) => ExpressionExecutionContext.SetVariable(name, value);

/// <summary>
/// Gets the value of the specified variable.
/// </summary>
public T? GetInput<T>(string name) => ExpressionExecutionContext.GetInput<T>(name);
}
8 changes: 7 additions & 1 deletion src/modules/Elsa.CSharp/Models/Globals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ public partial class Globals
/// <summary>
/// Initializes a new instance of the <see cref="Globals"/> class.
/// </summary>
public Globals(ExpressionExecutionContext expressionExecutionContext)
public Globals(ExpressionExecutionContext expressionExecutionContext, IDictionary<string, object> arguments)
{
ExpressionExecutionContext = expressionExecutionContext;
Arguments = arguments;
ExecutionContext = new ExecutionContextProxy(expressionExecutionContext);
Input = new InputProxy(expressionExecutionContext);
Output = new OutputProxy(expressionExecutionContext);
Expand Down Expand Up @@ -54,5 +55,10 @@ public Globals(ExpressionExecutionContext expressionExecutionContext)
set => ExpressionExecutionContext.GetWorkflowExecutionContext().CorrelationId = value;
}

/// <summary>
/// Gets additional arguments provided by the caller of the evaluator.
/// </summary>
public IDictionary<string, object> Arguments { get; }

private ExpressionExecutionContext ExpressionExecutionContext { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Elsa.CSharp.Notifications;
/// This notification is published every time a C# expression is about to be evaluated.
/// It gives subscribers a chance to configure the <see cref="ScriptOptions"/> with additional functions and variables.
/// </summary>
public record EvaluatingCSharp(Script Script, ScriptOptions ScriptOptions, ExpressionExecutionContext Context) : INotification
public record EvaluatingCSharp(ExpressionEvaluatorOptions Options, Script Script, ScriptOptions ScriptOptions, ExpressionExecutionContext Context) : INotification
{
/// <summary>
/// Configures the <see cref="ScriptOptions"/> with additional functions and variables.
Expand Down
6 changes: 4 additions & 2 deletions src/modules/Elsa.CSharp/Options/CSharpOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public class CSharpOptions
public ISet<Assembly> Assemblies { get; } = new HashSet<Assembly>(new[]
{
typeof(Globals).Assembly, // Elsa.CSharp
typeof(Enumerable).Assembly // System.Linq
typeof(Enumerable).Assembly, // System.Linq,
typeof(IDictionary<string, object>).Assembly, // System.Collections
});

/// <summary>
Expand All @@ -36,7 +37,8 @@ public class CSharpOptions
public ISet<string> Namespaces { get; } = new HashSet<string>(new[]
{
typeof(Globals).Namespace!, // Elsa.CSharp
typeof(Enumerable).Namespace! // System.Linq
typeof(Enumerable).Namespace!, // System.Linq
typeof(IDictionary<string, object>).Namespace!, // System.Collections.Generic
});

/// <summary>
Expand Down
5 changes: 3 additions & 2 deletions src/modules/Elsa.CSharp/Services/CSharpEvaluator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public CSharpEvaluator(INotificationSender notificationSender)
string expression,
Type returnType,
ExpressionExecutionContext context,
ExpressionEvaluatorOptions options,
Func<ScriptOptions, ScriptOptions>? configureScriptOptions = default,
Func<Script<object>, Script<object>>? configureScript = default,
CancellationToken cancellationToken = default)
Expand All @@ -37,13 +38,13 @@ public CSharpEvaluator(INotificationSender notificationSender)
if (configureScriptOptions != null)
scriptOptions = configureScriptOptions(scriptOptions);

var globals = new Globals(context);
var globals = new Globals(context, options.Arguments);
var script = CSharpScript.Create("", scriptOptions, typeof(Globals));

if (configureScript != null)
script = configureScript(script);

var notification = new EvaluatingCSharp(script, scriptOptions, context);
var notification = new EvaluatingCSharp(options, script, scriptOptions, context);
await _notificationSender.SendAsync(notification, cancellationToken);
scriptOptions = notification.ScriptOptions;
script = notification.Script.ContinueWith(expression, scriptOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ public interface IExpressionEvaluator
/// </summary>
/// <param name="expression">The expression to evaluate.</param>
/// <param name="context">The context in which the expression is evaluated.</param>
/// <param name="options">An optional set of options.</param>
/// <typeparam name="T">The type of the result.</typeparam>
/// <returns>The result of the evaluation.</returns>
ValueTask<T?> EvaluateAsync<T>(Expression expression, ExpressionExecutionContext context);
ValueTask<T?> EvaluateAsync<T>(Expression expression, ExpressionExecutionContext context, ExpressionEvaluatorOptions? options = default);

/// <summary>
/// Evaluates the specified expression and returns the result.
/// </summary>
/// <param name="expression">The expression to evaluate.</param>
/// <param name="returnType">The type of the result.</param>
/// <param name="context">The context in which the expression is evaluated.</param>
/// <param name="options">An optional set of options.</param>
/// <returns>The result of the evaluation.</returns>
ValueTask<object?> EvaluateAsync(Expression expression, Type returnType, ExpressionExecutionContext context);
ValueTask<object?> EvaluateAsync(Expression expression, Type returnType, ExpressionExecutionContext context, ExpressionEvaluatorOptions? options = default);
}
3 changes: 2 additions & 1 deletion src/modules/Elsa.Expressions/Contracts/IExpressionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public interface IExpressionHandler
/// <param name="expression">The expression to evaluate.</param>
/// <param name="returnType">The expected return type.</param>
/// <param name="context">The context in which the expression is evaluated.</param>
/// <param name="options">An optional set of options.</param>
/// <returns>The result of the evaluation.</returns>
ValueTask<object?> EvaluateAsync(Expression expression, Type returnType, ExpressionExecutionContext context);
ValueTask<object?> EvaluateAsync(Expression expression, Type returnType, ExpressionExecutionContext context, ExpressionEvaluatorOptions options);
}
2 changes: 1 addition & 1 deletion src/modules/Elsa.Expressions/DelegateExpressionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Elsa.Expressions;
public class DelegateExpressionHandler : IExpressionHandler
{
/// <inheritdoc />
public async ValueTask<object?> EvaluateAsync(Expression expression, Type returnType, ExpressionExecutionContext context)
public async ValueTask<object?> EvaluateAsync(Expression expression, Type returnType, ExpressionExecutionContext context, ExpressionEvaluatorOptions options)
{
var value = expression.Value is Func<ExpressionExecutionContext, ValueTask<object?>> @delegate ? await @delegate(context) : default;
return value;
Expand Down
2 changes: 1 addition & 1 deletion src/modules/Elsa.Expressions/LiteralExpression.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public LiteralExpressionHandler(IWellKnownTypeRegistry wellKnownTypeRegistry)
}

/// <inheritdoc />
public ValueTask<object?> EvaluateAsync(Expression expression, Type returnType, ExpressionExecutionContext context)
public ValueTask<object?> EvaluateAsync(Expression expression, Type returnType, ExpressionExecutionContext context, ExpressionEvaluatorOptions options)
{
var value = expression.Value.ConvertTo(returnType, new ObjectConverterOptions(WellKnownTypeRegistry: _wellKnownTypeRegistry));
return ValueTask.FromResult(value);
Expand Down
12 changes: 12 additions & 0 deletions src/modules/Elsa.Expressions/Models/ExpressionEvaluatorOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Elsa.Expressions.Models;

/// <summary>
/// Contains additional options for the expression evaluator.
/// </summary>
public class ExpressionEvaluatorOptions
{
/// <summary>
/// An extra set of variables to add to the expression context.
/// </summary>
public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>();
}
10 changes: 7 additions & 3 deletions src/modules/Elsa.Expressions/Services/ExpressionEvaluator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ public ExpressionEvaluator(IExpressionDescriptorRegistry registry, IServiceProvi
}

/// <inheritdoc />
public async ValueTask<T?> EvaluateAsync<T>(Expression expression, ExpressionExecutionContext context) => (T?)await EvaluateAsync(expression, typeof(T), context);
public async ValueTask<T?> EvaluateAsync<T>(Expression expression, ExpressionExecutionContext context, ExpressionEvaluatorOptions? options = default)
{
return (T?)await EvaluateAsync(expression, typeof(T), context, options);
}

/// <inheritdoc />
public async ValueTask<object?> EvaluateAsync(Expression expression, Type returnType, ExpressionExecutionContext context)
public async ValueTask<object?> EvaluateAsync(Expression expression, Type returnType, ExpressionExecutionContext context, ExpressionEvaluatorOptions? options = default)
{
var expressionType = expression.Type;
var expressionDescriptor = _registry.Find(expressionType);
Expand All @@ -31,6 +34,7 @@ public ExpressionEvaluator(IExpressionDescriptorRegistry registry, IServiceProvi
throw new Exception($"Could not find an descriptor for expression type \"{expressionType}\".");

var handler = expressionDescriptor.HandlerFactory(_serviceProvider);
return await handler.EvaluateAsync(expression, returnType, context);
options ??= new ExpressionEvaluatorOptions();
return await handler.EvaluateAsync(expression, returnType, context, options);
}
}
Loading

0 comments on commit 683f0ee

Please sign in to comment.