Skip to content

Commit

Permalink
Add tests for implicit joins and fix duplicate activity scheduling
Browse files Browse the repository at this point in the history
This commit includes new integration tests for implicit join scenarios. The tests ensure that join and parallel join activities execute correctly and complete as expected. Additionally, a change has been made to the workflow execution context to prevent duplicate activity scheduling. Now, aside from checking if the activity is already scheduled, it also checks if the activity is scheduled for the specified owner.

Fixes elsa-workflows#4703
  • Loading branch information
sfmskywalker committed Dec 13, 2023
1 parent 318d956 commit 8b29388
Show file tree
Hide file tree
Showing 7 changed files with 576 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ public FlowJoin([CallerFilePath] string? source = default, [CallerLineNumber] in
/// <inheritdoc />
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
var flowchartExecutionContext = context.ParentActivityExecutionContext!;
var flowchart = (Flowchart)flowchartExecutionContext.Activity;
var flowchartContext = context.ParentActivityExecutionContext!;
var flowchart = (Flowchart)flowchartContext.Activity;
var inboundActivities = flowchart.Connections.LeftInboundActivities(this).ToList();
var flowScope = flowchartExecutionContext.GetProperty<FlowScope>(Flowchart.ScopeProperty)!;
var flowScope = flowchartContext.GetProperty<FlowScope>(Flowchart.ScopeProperty)!;
var executionCount = flowScope.GetExecutionCount(this);
var mode = context.Get(Mode);

Expand All @@ -49,28 +49,28 @@ protected override async ValueTask ExecuteAsync(ActivityExecutionContext context

if (haveAllInboundActivitiesExecuted)
{
await ClearBookmarksAsync(flowchart, context);
await CancelActivitiesInInboundPathAsync(flowchart, flowchartContext, context);
await context.CompleteActivityAsync();
}

break;
}
case FlowJoinMode.WaitAny:
{
await ClearBookmarksAsync(flowchart, context);
await CancelActivitiesInInboundPathAsync(flowchart, flowchartContext, context);
await context.CompleteActivityAsync();
break;
}
}
}

private async Task ClearBookmarksAsync(Flowchart flowchart, ActivityExecutionContext context)
private async Task CancelActivitiesInInboundPathAsync(Flowchart flowchart, ActivityExecutionContext flowchartContext, ActivityExecutionContext joinContext)
{
// Cancel all activities between this join activity and its most recent fork.
var connections = flowchart.Connections;
var workflowExecutionContext = context.WorkflowExecutionContext;
var workflowExecutionContext = joinContext.WorkflowExecutionContext;
var inboundActivities = connections.LeftAncestorActivities(this).Select(x => workflowExecutionContext.FindNodeByActivity(x)).Select(x => x!.Activity).ToList();
var inboundActivityExecutionContexts = workflowExecutionContext.ActivityExecutionContexts.Where(x => inboundActivities.Contains(x.Activity)).ToList();
var inboundActivityExecutionContexts = workflowExecutionContext.ActivityExecutionContexts.Where(x => inboundActivities.Contains(x.Activity) && x.ParentActivityExecutionContext == flowchartContext).ToList();

// Cancel each inbound activity.
foreach (var activityExecutionContext in inboundActivityExecutionContexts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public static ActivityWorkItem ScheduleActivityExecutionContext(this WorkflowExe

if (options?.PreventDuplicateScheduling == true)
{
var existingWorkItem = scheduler.Find(x => x.Activity.NodeId == activityNode.NodeId);
// Check if the activity is already scheduled for the specified owner.
var existingWorkItem = scheduler.Find(x => x.Activity.NodeId == activityNode.NodeId && x.Owner == owner);

if (existingWorkItem != null)
return existingWorkItem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,11 @@
<None Update="Scenarios\JsonObjectSerialization\Workflows\instance-serialization.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Scenarios\ImplicitJoins\Workflows\join.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Scenarios\ImplicitJoins\Workflows\parallel-join.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Elsa.Common.Models;
using Elsa.IntegrationTests.Scenarios.ImplicitJoins.Workflows;
using Elsa.Testing.Shared;
using Elsa.Workflows.Core;
using Elsa.Workflows.Core.Contracts;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Filters;
using Microsoft.Extensions.DependencyInjection;
using Open.Linq.AsyncExtensions;
using Xunit;
using Xunit.Abstractions;

namespace Elsa.IntegrationTests.Scenarios.ImplicitJoins;

public class JoinRunsOnceTests
{
private readonly IWorkflowRunner _workflowRunner;
private readonly CapturingTextWriter _capturingTextWriter = new();
private readonly IServiceProvider _services;

public JoinRunsOnceTests(ITestOutputHelper testOutputHelper)
{
_services = new TestApplicationBuilder(testOutputHelper).WithCapturingTextWriter(_capturingTextWriter).Build();
_workflowRunner = _services.GetRequiredService<IWorkflowRunner>();
}

[Fact(DisplayName = "The Join activity executes only once, not twice")]
public async Task Test1()
{
// Populate registries.
await _services.PopulateRegistriesAsync();

// Import workflow.
var workflowDefinition = await _services.ImportWorkflowDefinitionAsync($"Scenarios/ImplicitJoins/Workflows/join.json");

// Execute.
var state = await _services.RunWorkflowUntilEndAsync(workflowDefinition.DefinitionId);

// Assert.
var journal = await _services.GetRequiredService<IWorkflowExecutionLogStore>().FindManyAsync(new WorkflowExecutionLogRecordFilter
{
WorkflowInstanceId = state.Id,
ActivityId = "802725996be1b582",
EventName = "Completed"
}, PageArgs.All);

Assert.Single(journal.Items);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using Elsa.Common.Models;
using Elsa.IntegrationTests.Scenarios.ImplicitJoins.Workflows;
using Elsa.Testing.Shared;
using Elsa.Workflows.Core;
using Elsa.Workflows.Core.Contracts;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Filters;
using Microsoft.Extensions.DependencyInjection;
using Open.Linq.AsyncExtensions;
using Xunit;
using Xunit.Abstractions;

namespace Elsa.IntegrationTests.Scenarios.ImplicitJoins;

public class ParallelJoinCompletesTests
{
private readonly IWorkflowRunner _workflowRunner;
private readonly CapturingTextWriter _capturingTextWriter = new();
private readonly IServiceProvider _services;

public ParallelJoinCompletesTests(ITestOutputHelper testOutputHelper)
{
_services = new TestApplicationBuilder(testOutputHelper).WithCapturingTextWriter(_capturingTextWriter).Build();
_workflowRunner = _services.GetRequiredService<IWorkflowRunner>();
}

[Fact(DisplayName = "The ParallelForEach activity completes when its Body contains a Join activity")]
public async Task Test1()
{
// Populate registries.
await _services.PopulateRegistriesAsync();

// Import workflow.
var workflowDefinition = await _services.ImportWorkflowDefinitionAsync($"Scenarios/ImplicitJoins/Workflows/parallel-join.json");

// Execute.
var state = await _services.RunWorkflowUntilEndAsync(workflowDefinition.DefinitionId);

// Assert.
var journal = await _services.GetRequiredService<IWorkflowExecutionLogStore>().FindManyAsync(new WorkflowExecutionLogRecordFilter
{
WorkflowInstanceId = state.Id,
ActivityId = "70fc1183cd5800f2",
EventName = "Completed"
}, PageArgs.All);

Assert.Single(journal.Items);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
{
"id": "825e9d347e2f3094",
"definitionId": "f64e42304661f407",
"name": "Joins",
"createdAt": "2023-12-13T21:41:53.628415+00:00",
"version": 3,
"toolVersion": "3.0.0.0",
"variables": [],
"inputs": [],
"outputs": [],
"outcomes": [],
"customProperties": {
"Elsa:WorkflowContextProviderTypes": []
},
"isReadonly": false,
"isLatest": true,
"isPublished": false,
"options": {
"autoUpdateConsumingWorkflows": false
},
"root": {
"type": "Elsa.Flowchart",
"version": 1,
"id": "8a03bf79d15fd48a",
"nodeId": "Workflow1:8a03bf79d15fd48a",
"metadata": {},
"customProperties": {
"source": "FlowchartJsonConverter.cs:45",
"notFoundConnections": [],
"canStartWorkflow": false,
"runAsynchronously": false
},
"activities": [
{
"id": "c7624645ff77c7d0",
"nodeId": "Workflow1:8a03bf79d15fd48a:c7624645ff77c7d0",
"name": "Start1",
"type": "Elsa.Start",
"version": 1,
"customProperties": {
"canStartWorkflow": false,
"runAsynchronously": false
},
"metadata": {
"designer": {
"position": {
"x": -400.5,
"y": 212
},
"size": {
"width": 102.21875,
"height": 50
}
}
}
},
{
"text": null,
"id": "281e9ea6d278b231",
"nodeId": "Workflow1:8a03bf79d15fd48a:281e9ea6d278b231",
"name": "WriteLine2",
"type": "Elsa.WriteLine",
"version": 1,
"customProperties": {
"canStartWorkflow": false,
"runAsynchronously": false
},
"metadata": {
"designer": {
"position": {
"x": -160.5,
"y": 305
},
"size": {
"width": 139.296875,
"height": 50
}
}
}
},
{
"mode": {
"typeName": "Elsa.Workflows.Core.Activities.Flowchart.Models.FlowJoinMode, Elsa.Workflows.Core",
"expression": {
"type": "Literal",
"value": "WaitAny"
},
"memoryReference": {
"id": "802725996be1b582:input-0"
}
},
"id": "802725996be1b582",
"nodeId": "Workflow1:8a03bf79d15fd48a:802725996be1b582",
"name": "FlowJoin1",
"type": "Elsa.FlowJoin",
"version": 1,
"customProperties": {
"canStartWorkflow": false,
"runAsynchronously": false
},
"metadata": {
"designer": {
"position": {
"x": 80,
"y": 200
},
"size": {
"width": 98.265625,
"height": 50
}
}
}
},
{
"text": null,
"id": "bfc88f064cf394ed",
"nodeId": "Workflow1:8a03bf79d15fd48a:bfc88f064cf394ed",
"name": "WriteLine1",
"type": "Elsa.WriteLine",
"version": 1,
"customProperties": {
"canStartWorkflow": false,
"runAsynchronously": false
},
"metadata": {
"designer": {
"position": {
"x": -160,
"y": 120
},
"size": {
"width": 139.296875,
"height": 50
}
}
}
},
{
"id": "5467123c7973d011",
"nodeId": "Workflow1:8a03bf79d15fd48a:5467123c7973d011",
"name": "End1",
"type": "Elsa.End",
"version": 1,
"customProperties": {
"canStartWorkflow": false,
"runAsynchronously": false
},
"metadata": {
"designer": {
"position": {
"x": 274.5,
"y": 200
},
"size": {
"width": 94.40625,
"height": 50
}
}
}
}
],
"connections": [
{
"source": {
"activity": "c7624645ff77c7d0",
"port": "Done"
},
"target": {
"activity": "bfc88f064cf394ed",
"port": "In"
}
},
{
"source": {
"activity": "bfc88f064cf394ed",
"port": "Done"
},
"target": {
"activity": "802725996be1b582",
"port": "In"
}
},
{
"source": {
"activity": "c7624645ff77c7d0",
"port": "Done"
},
"target": {
"activity": "281e9ea6d278b231",
"port": "In"
}
},
{
"source": {
"activity": "281e9ea6d278b231",
"port": "Done"
},
"target": {
"activity": "802725996be1b582",
"port": "In"
}
},
{
"source": {
"activity": "802725996be1b582",
"port": "Done"
},
"target": {
"activity": "5467123c7973d011",
"port": "In"
}
}
]
}
}
Loading

0 comments on commit 8b29388

Please sign in to comment.