Skip to content

Commit

Permalink
V3 default runtime (elsa-workflows#3285)
Browse files Browse the repository at this point in the history
* Incremental work on default runtime

* Fix workflow rooting
  • Loading branch information
sfmskywalker committed Sep 2, 2022
1 parent 220a1b4 commit bfefd3f
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 33 deletions.
5 changes: 1 addition & 4 deletions src/bundles/Elsa.WorkflowServer.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using Elsa.Labels.EntityFrameworkCore.Sqlite;
using Elsa.Labels.Extensions;
using Elsa.Liquid.Extensions;
using Elsa.ProtoActor.Extensions;
using Elsa.Scheduling.Extensions;
using Elsa.WorkflowContexts.Extensions;
using Elsa.Workflows.Api.Extensions;
Expand All @@ -32,8 +31,6 @@
using FastEndpoints;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Data.Sqlite;
using Proto.Persistence.Sqlite;
using Event = Elsa.Workflows.Core.Activities.Event;

var builder = WebApplication.CreateBuilder(args);
Expand Down Expand Up @@ -68,7 +65,7 @@
identity.CreateDefaultUser = true;
identity.IdentityOptions = options => identitySection.Bind(options);
})
.UseRuntime(runtime => runtime.UseProtoActor(proto => proto.PersistenceProvider = _ => new SqliteProvider(new SqliteConnectionStringBuilder(dbConnectionString))))
//.UseRuntime(runtime => runtime.UseProtoActor(proto => proto.PersistenceProvider = _ => new SqliteProvider(new SqliteConnectionStringBuilder(dbConnectionString))))
.UseJobActivities()
.UseScheduling()
.UseWorkflowPersistence(p => p.UseEntityFrameworkCore(ef => ef.UseSqlite(dbConnectionString)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,20 @@ public async Task<ResumeWorkflowResult> ResumeWorkflowAsync(string instanceId, s
return new ResumeWorkflowResult();
}

public async Task<TriggerWorkflowsResult> TriggerWorkflowsAsync(object bookmarkPayload, TriggerWorkflowsOptions options, CancellationToken cancellationToken = default)
public async Task<TriggerWorkflowsResult> TriggerWorkflowsAsync(string bookmarkName, object bookmarkPayload, TriggerWorkflowsOptions options, CancellationToken cancellationToken = default)
{
var hash = _hasher.Hash(bookmarkPayload);
var client = _cluster.GetBookmarkGrain(hash);
var bookmarksResponse = await client.Resolve(new ResolveBookmarksRequest(), cancellationToken);
var request = new ResolveBookmarksRequest() { BookmarkName = bookmarkName };
var bookmarksResponse = await client.Resolve(request, cancellationToken);
var bookmarks = bookmarksResponse!.Bookmarks;

foreach (var bookmark in bookmarks)
{
var workflowInstanceId = bookmark.WorkflowInstanceId;
var resumeResult = await ResumeWorkflowAsync(workflowInstanceId, bookmark.BookmarkId, new ResumeWorkflowOptions(options.Input), cancellationToken);
var resumeResult = await ResumeWorkflowAsync(workflowInstanceId, bookmark.BookmarkId, new ResumeWorkflowOptions(options.Input), cancellationToken);
}

return new TriggerWorkflowsResult();
}
}
1 change: 1 addition & 0 deletions src/modules/Elsa.ProtoActor/Protos/Messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ message RemoveBookmarksByWorkflowRequest {
}

message ResolveBookmarksRequest {
string BookmarkName = 1;
}

message ResolveBookmarksResponse {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Threading;
using System.Threading.Tasks;
using Elsa.Abstractions;
using Elsa.Workflows.Core.Activities;
using Elsa.Workflows.Core.Helpers;
using Elsa.Workflows.Core.Models;
using Elsa.Workflows.Core.Services;
using Elsa.Workflows.Runtime.Services;
Expand All @@ -27,7 +29,8 @@ public override void Configure()
public override async Task HandleAsync(Request request, CancellationToken cancellationToken)
{
var eventBookmark = new EventBookmarkData(request.EventName);
var result = await _workflowRuntime.TriggerWorkflowsAsync(eventBookmark, new TriggerWorkflowsOptions(), cancellationToken);
var bookmarkName = ActivityTypeNameHelper.GenerateTypeName<Event>();
var result = await _workflowRuntime.TriggerWorkflowsAsync(bookmarkName, eventBookmark, new TriggerWorkflowsOptions(), cancellationToken);

if (!HttpContext.Response.HasStarted)
{
Expand Down
14 changes: 10 additions & 4 deletions src/modules/Elsa.Workflows.Core/Activities/Composite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Elsa.Workflows.Core.Activities;
/// Represents a composite activity that has a single <see cref="Root"/> activity. Like a workflow, but without workflow-level properties.
/// </summary>
[Activity("Elsa", "Workflows", "Execute a root activity that you can configure yourself")]
public class Composite : Activity
public class Composite : ActivityBase
{
/// <summary>
/// The activity to schedule when this activity executes.
Expand All @@ -22,7 +22,10 @@ protected override void Execute(ActivityExecutionContext context)
context.ScheduleActivity(Root, OnCompletedAsync);
}

protected virtual ValueTask OnCompletedAsync(ActivityExecutionContext context, ActivityExecutionContext childContext) => ValueTask.CompletedTask;
protected virtual async ValueTask OnCompletedAsync(ActivityExecutionContext context, ActivityExecutionContext childContext)
{
await context.CompleteActivityAsync();
}

protected static Inline From(Func<ActivityExecutionContext, ValueTask> activity) => new(activity);
protected static Inline From(Func<ValueTask> activity) => new(activity);
Expand All @@ -37,7 +40,7 @@ protected override void Execute(ActivityExecutionContext context)
/// <summary>
/// Represents a composite activity that has a single <see cref="Root"/> activity and returns a result.
/// </summary>
public class Composite<T> : Activity<T>
public class Composite<T> : ActivityBase
{
/// <summary>
/// The activity to schedule when this activity executes.
Expand All @@ -50,7 +53,10 @@ protected override void Execute(ActivityExecutionContext context)
context.ScheduleActivity(Root, OnCompletedAsync);
}

protected virtual ValueTask OnCompletedAsync(ActivityExecutionContext context, ActivityExecutionContext childContext) => ValueTask.CompletedTask;
protected virtual async ValueTask OnCompletedAsync(ActivityExecutionContext context, ActivityExecutionContext childContext)
{
await context.CompleteActivityAsync();
}

protected static Inline From(Func<ActivityExecutionContext, ValueTask> activity) => new(activity);
protected static Inline From(Func<ValueTask> activity) => new(activity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Elsa.Workflows.Core.ActivityNodeResolvers;
public class OutboundActivityPortResolver : IActivityPortResolver
{
public int Priority => -1;
public bool GetSupportsActivity(IActivity activity) => activity is Activity;
public bool GetSupportsActivity(IActivity activity) => true;

public ValueTask<IEnumerable<IActivity>> GetPortsAsync(IActivity activity, CancellationToken cancellationToken = default) =>
new(GetSinglePorts(activity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public async Task<RunWorkflowResult> RunAsync(WorkflowExecutionContext workflowE
ExecuteActivityDelegate? executeActivityDelegate,
CancellationToken cancellationToken)
{
var root = workflow.Root;
var root = workflow;

// Build graph.
var graph = await _activityWalker.WalkAsync(root, cancellationToken);
Expand Down
2 changes: 1 addition & 1 deletion src/modules/Elsa.Workflows.Core/Models/Bookmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ namespace Elsa.Workflows.Core.Models;
public record Bookmark(
string Id,
string Name,
string Hash,
string? Hash,
string? Data,
string ActivityId,
string ActivityInstanceId,
Expand Down
2 changes: 1 addition & 1 deletion src/modules/Elsa.Workflows.Core/Models/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public Workflow()
{
}

public static Workflow FromActivity(IActivity root) => new(root);
public static Workflow FromActivity(IActivity root) => root is Workflow workflow ? workflow : new(root);

/// <summary>
/// Creates a new memory register initialized with this workflow's variables.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Elsa.Mediator.Services;
using Elsa.Workflows.Core.Models;
using Elsa.Workflows.Persistence.Entities;
using Elsa.Workflows.Persistence.Services;
using Elsa.Workflows.Runtime.Notifications;
Expand All @@ -17,20 +18,36 @@ public BookmarkManager(IWorkflowBookmarkStore bookmarkStore, IEventPublisher eve
_eventPublisher = eventPublisher;
}

public async Task DeleteBookmarksAsync(IEnumerable<WorkflowBookmark> workflowBookmarks, CancellationToken cancellationToken = default)
public async Task DeleteAsync(IEnumerable<Bookmark> bookmarks, CancellationToken cancellationToken = default)
{
var list = workflowBookmarks.ToList();
var list = bookmarks.ToList();
var ids = list.Select(x => x.Id).ToList();
var bookmarks = list.Select(x => x.ToBookmark()).ToList();
await _bookmarkStore.DeleteManyAsync(ids, cancellationToken);
await _eventPublisher.PublishAsync(new WorkflowBookmarksDeleted(bookmarks), cancellationToken);
await _eventPublisher.PublishAsync(new WorkflowBookmarksDeleted(list), cancellationToken);
}

public async Task SaveBookmarksAsync(IEnumerable<WorkflowBookmark> workflowBookmarks, CancellationToken cancellationToken = default)
public async Task SaveAsync(WorkflowInstance workflowInstance, IEnumerable<Bookmark> bookmarks, CancellationToken cancellationToken = default)
{
var list = workflowBookmarks.ToList();
var bookmarks = list.Select(x => x.ToBookmark()).ToList();
await _bookmarkStore.SaveManyAsync(list, cancellationToken);
await _eventPublisher.PublishAsync(new WorkflowBookmarksSaved(bookmarks), cancellationToken);
var list = bookmarks.ToList();
var workflowBookmarks = list.Select(x => WorkflowBookmark.FromBookmark(x, workflowInstance)).ToList();
await _bookmarkStore.SaveManyAsync(workflowBookmarks, cancellationToken);
await _eventPublisher.PublishAsync(new WorkflowBookmarksSaved(list), cancellationToken);
}

public async Task<Bookmark?> FindByIdAsync(string id, CancellationToken cancellationToken)
{
var workflowBookmark = await _bookmarkStore.FindByIdAsync(id, cancellationToken);
return workflowBookmark?.ToBookmark();
}

public async Task<IEnumerable<Bookmark>> FindManyByWorkflowInstanceIdAsync(string workflowInstanceId, CancellationToken cancellationToken = default)
{
var workflowBookmarks = await _bookmarkStore.FindManyByWorkflowInstanceIdAsync(workflowInstanceId, cancellationToken);
return workflowBookmarks.Select(x => x.ToBookmark());
}

public async Task<IEnumerable<WorkflowBookmark>> FindManyByHashAsync(string bookmarkName, string hash, CancellationToken cancellationToken = default)
{
return await _bookmarkStore.FindManyAsync(bookmarkName, hash, cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,178 @@
using Elsa.Common.Services;
using Elsa.Mediator.Services;
using Elsa.Models;
using Elsa.Workflows.Core.Helpers;
using Elsa.Workflows.Core.Models;
using Elsa.Workflows.Core.Services;
using Elsa.Workflows.Core.State;
using Elsa.Workflows.Persistence.Entities;
using Elsa.Workflows.Persistence.Services;
using Elsa.Workflows.Runtime.Models;
using Elsa.Workflows.Runtime.Notifications;
using Elsa.Workflows.Runtime.Services;
using Open.Linq.AsyncExtensions;

namespace Elsa.Workflows.Runtime.Implementations;

public class DefaultWorkflowRuntime : IWorkflowRuntime
{
private readonly IWorkflowRunner _workflowRunner;
private readonly IWorkflowDefinitionService _workflowDefinitionService;
private readonly IWorkflowInstanceStore _workflowInstanceStore;
private readonly IBookmarkManager _bookmarkManager;
private readonly IHasher _hasher;
private readonly IEventPublisher _eventPublisher;
private readonly ISystemClock _systemClock;

public DefaultWorkflowRuntime(
IWorkflowRunner workflowRunner,
IWorkflowDefinitionService workflowDefinitionService,
IWorkflowInstanceStore workflowInstanceStore,
IBookmarkManager bookmarkManager,
IHasher hasher,
IEventPublisher eventPublisher,
ISystemClock systemClock)
{
_workflowRunner = workflowRunner;
_workflowDefinitionService = workflowDefinitionService;
_workflowInstanceStore = workflowInstanceStore;
_bookmarkManager = bookmarkManager;
_hasher = hasher;
_eventPublisher = eventPublisher;
_systemClock = systemClock;
}

public async Task<StartWorkflowResult> StartWorkflowAsync(string definitionId, StartWorkflowOptions options, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
var input = options.Input;
var versionOptions = options.VersionOptions;
var workflowDefinition = await _workflowDefinitionService.FindAsync(definitionId, versionOptions, cancellationToken);

if (workflowDefinition == null)
throw new Exception("Specified workflow definition and version does not exist");

var workflow = await _workflowDefinitionService.MaterializeWorkflowAsync(workflowDefinition, cancellationToken);
var workflowResult = await _workflowRunner.RunAsync(workflow, input, cancellationToken);
var workflowState = workflowResult.WorkflowState;
var finished = workflowResult.WorkflowState.Status == WorkflowStatus.Finished;

var workflowInstance = await SaveWorkflowInstanceAsync(workflowDefinition, workflowState, cancellationToken);
await UpdateBookmarksAsync(workflowInstance, new List<Bookmark>(), workflowResult.Bookmarks, cancellationToken);

return new StartWorkflowResult(workflowInstance.Id);
}

public async Task<ResumeWorkflowResult> ResumeWorkflowAsync(string instanceId, string bookmarkId, ResumeWorkflowOptions options, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
var workflowInstance = await _workflowInstanceStore.FindByIdAsync(instanceId, cancellationToken);

if (workflowInstance == null)
throw new Exception($"Workflow instance {instanceId} not found");

var workflowDefinition = await _workflowDefinitionService.FindAsync(workflowInstance.DefinitionId, VersionOptions.SpecificVersion(workflowInstance.Version), cancellationToken);

if (workflowDefinition == null)
throw new Exception("Specified workflow definition and version does not exist");

var input = options.Input;

var existingBookmarks = await _bookmarkManager.FindManyByWorkflowInstanceIdAsync(workflowInstance.Id, cancellationToken).ToList();
var bookmark = existingBookmarks.FirstOrDefault(x => x.Id == bookmarkId);

if (bookmark == null)
throw new Exception("Bookmark not found");

var workflow = await _workflowDefinitionService.MaterializeWorkflowAsync(workflowDefinition, cancellationToken);
var workflowState = workflowInstance.WorkflowState;
var workflowResult = await _workflowRunner.RunAsync(workflow, workflowState, bookmark, input, cancellationToken);
var finished = workflowResult.WorkflowState.Status == WorkflowStatus.Finished;

workflowInstance = await SaveWorkflowInstanceAsync(workflowDefinition, workflowState, cancellationToken);
await UpdateBookmarksAsync(workflowInstance, existingBookmarks, workflowResult.Bookmarks, cancellationToken);

return new ResumeWorkflowResult();
}

public async Task<TriggerWorkflowsResult> TriggerWorkflowsAsync(string bookmarkName, object bookmarkPayload, TriggerWorkflowsOptions options, CancellationToken cancellationToken = default)
{
var hash = _hasher.Hash(bookmarkPayload);
var bookmarks = await _bookmarkManager.FindManyByHashAsync(bookmarkName, hash, cancellationToken);

foreach (var bookmark in bookmarks)
{
var workflowInstanceId = bookmark.WorkflowInstanceId;
var resumeResult = await ResumeWorkflowAsync(workflowInstanceId, bookmark.Id, new ResumeWorkflowOptions(options.Input), cancellationToken);
}

return new TriggerWorkflowsResult();
}

private async Task<WorkflowInstance> SaveWorkflowInstanceAsync(WorkflowDefinition workflowDefinition, WorkflowState workflowState, CancellationToken cancellationToken)
{
var workflowInstance = FromWorkflowState(workflowState, workflowDefinition);
await _workflowInstanceStore.SaveAsync(workflowInstance, cancellationToken);
return workflowInstance;
}

private WorkflowInstance FromWorkflowState(WorkflowState workflowState, WorkflowDefinition workflowDefinition)
{
var workflowInstance = new WorkflowInstance
{
Id = workflowState.Id,
DefinitionId = workflowDefinition.DefinitionId,
DefinitionVersionId = workflowDefinition.Id,
Version = workflowDefinition.Version,
WorkflowState = workflowState,
Status = workflowState.Status,
SubStatus = workflowState.SubStatus,
CorrelationId = workflowState.CorrelationId,
Name = null,
};

// Update timestamps.
var now = _systemClock.UtcNow;

if (workflowInstance.Status == WorkflowStatus.Finished)
{
switch (workflowInstance.SubStatus)
{
case WorkflowSubStatus.Cancelled:
workflowInstance.CancelledAt = now;
break;
case WorkflowSubStatus.Faulted:
workflowInstance.FaultedAt = now;
break;
case WorkflowSubStatus.Finished:
workflowInstance.FinishedAt = now;
break;
}
}

return workflowInstance;
}

private async Task UpdateBookmarksAsync(WorkflowInstance workflowInstance, ICollection<Bookmark> previousBookmarks, ICollection<Bookmark> newBookmarks, CancellationToken cancellationToken)
{
await RemoveBookmarksAsync(previousBookmarks, cancellationToken);
await StoreBookmarksAsync(workflowInstance, newBookmarks, cancellationToken);
await PublishChangedBookmarksAsync(workflowInstance.WorkflowState, previousBookmarks, newBookmarks, cancellationToken);
}

private async Task StoreBookmarksAsync(WorkflowInstance workflowInstance, ICollection<Bookmark> bookmarks, CancellationToken cancellationToken)
{
await _bookmarkManager.SaveAsync(workflowInstance, bookmarks, cancellationToken);
}

private async Task RemoveBookmarksAsync(IEnumerable<Bookmark> bookmarks, CancellationToken cancellationToken)
{
await _bookmarkManager.DeleteAsync(bookmarks, cancellationToken);
}

public Task<TriggerWorkflowsResult> TriggerWorkflowsAsync(object bookmarkPayload, TriggerWorkflowsOptions options, CancellationToken cancellationToken = default)
private async Task PublishChangedBookmarksAsync(WorkflowState workflowState, ICollection<Bookmark> originalBookmarks, ICollection<Bookmark> updatedBookmarks, CancellationToken cancellationToken)
{
throw new NotImplementedException();
var diff = Diff.For(originalBookmarks, updatedBookmarks);
var removedBookmarks = diff.Removed;
var createdBookmarks = diff.Added;
await _eventPublisher.PublishAsync(new WorkflowBookmarksIndexed(new IndexedWorkflowBookmarks(workflowState, createdBookmarks, removedBookmarks)), cancellationToken);
}
}
Loading

0 comments on commit bfefd3f

Please sign in to comment.