Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V3 default runtime #3285

Merged
merged 2 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/bundles/Elsa.WorkflowServer.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using Elsa.Labels.EntityFrameworkCore.Sqlite;
using Elsa.Labels.Extensions;
using Elsa.Liquid.Extensions;
using Elsa.ProtoActor.Extensions;
using Elsa.Scheduling.Extensions;
using Elsa.WorkflowContexts.Extensions;
using Elsa.Workflows.Api.Extensions;
Expand All @@ -32,8 +31,6 @@
using FastEndpoints;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Authorization;
using Microsoft.Data.Sqlite;
using Proto.Persistence.Sqlite;
using Event = Elsa.Workflows.Core.Activities.Event;

var builder = WebApplication.CreateBuilder(args);
Expand Down Expand Up @@ -68,7 +65,7 @@
identity.CreateDefaultUser = true;
identity.IdentityOptions = options => identitySection.Bind(options);
})
.UseRuntime(runtime => runtime.UseProtoActor(proto => proto.PersistenceProvider = _ => new SqliteProvider(new SqliteConnectionStringBuilder(dbConnectionString))))
//.UseRuntime(runtime => runtime.UseProtoActor(proto => proto.PersistenceProvider = _ => new SqliteProvider(new SqliteConnectionStringBuilder(dbConnectionString))))
.UseJobActivities()
.UseScheduling()
.UseWorkflowPersistence(p => p.UseEntityFrameworkCore(ef => ef.UseSqlite(dbConnectionString)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,20 @@ public async Task<ResumeWorkflowResult> ResumeWorkflowAsync(string instanceId, s
return new ResumeWorkflowResult();
}

public async Task<TriggerWorkflowsResult> TriggerWorkflowsAsync(object bookmarkPayload, TriggerWorkflowsOptions options, CancellationToken cancellationToken = default)
public async Task<TriggerWorkflowsResult> TriggerWorkflowsAsync(string bookmarkName, object bookmarkPayload, TriggerWorkflowsOptions options, CancellationToken cancellationToken = default)
{
var hash = _hasher.Hash(bookmarkPayload);
var client = _cluster.GetBookmarkGrain(hash);
var bookmarksResponse = await client.Resolve(new ResolveBookmarksRequest(), cancellationToken);
var request = new ResolveBookmarksRequest() { BookmarkName = bookmarkName };
var bookmarksResponse = await client.Resolve(request, cancellationToken);
var bookmarks = bookmarksResponse!.Bookmarks;

foreach (var bookmark in bookmarks)
{
var workflowInstanceId = bookmark.WorkflowInstanceId;
var resumeResult = await ResumeWorkflowAsync(workflowInstanceId, bookmark.BookmarkId, new ResumeWorkflowOptions(options.Input), cancellationToken);
var resumeResult = await ResumeWorkflowAsync(workflowInstanceId, bookmark.BookmarkId, new ResumeWorkflowOptions(options.Input), cancellationToken);
}

return new TriggerWorkflowsResult();
}
}
1 change: 1 addition & 0 deletions src/modules/Elsa.ProtoActor/Protos/Messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ message RemoveBookmarksByWorkflowRequest {
}

message ResolveBookmarksRequest {
string BookmarkName = 1;
}

message ResolveBookmarksResponse {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Threading;
using System.Threading.Tasks;
using Elsa.Abstractions;
using Elsa.Workflows.Core.Activities;
using Elsa.Workflows.Core.Helpers;
using Elsa.Workflows.Core.Models;
using Elsa.Workflows.Core.Services;
using Elsa.Workflows.Runtime.Services;
Expand All @@ -27,7 +29,8 @@ public override void Configure()
public override async Task HandleAsync(Request request, CancellationToken cancellationToken)
{
var eventBookmark = new EventBookmarkData(request.EventName);
var result = await _workflowRuntime.TriggerWorkflowsAsync(eventBookmark, new TriggerWorkflowsOptions(), cancellationToken);
var bookmarkName = ActivityTypeNameHelper.GenerateTypeName<Event>();
var result = await _workflowRuntime.TriggerWorkflowsAsync(bookmarkName, eventBookmark, new TriggerWorkflowsOptions(), cancellationToken);

if (!HttpContext.Response.HasStarted)
{
Expand Down
14 changes: 10 additions & 4 deletions src/modules/Elsa.Workflows.Core/Activities/Composite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Elsa.Workflows.Core.Activities;
/// Represents a composite activity that has a single <see cref="Root"/> activity. Like a workflow, but without workflow-level properties.
/// </summary>
[Activity("Elsa", "Workflows", "Execute a root activity that you can configure yourself")]
public class Composite : Activity
public class Composite : ActivityBase
{
/// <summary>
/// The activity to schedule when this activity executes.
Expand All @@ -22,7 +22,10 @@ protected override void Execute(ActivityExecutionContext context)
context.ScheduleActivity(Root, OnCompletedAsync);
}

protected virtual ValueTask OnCompletedAsync(ActivityExecutionContext context, ActivityExecutionContext childContext) => ValueTask.CompletedTask;
protected virtual async ValueTask OnCompletedAsync(ActivityExecutionContext context, ActivityExecutionContext childContext)
{
await context.CompleteActivityAsync();
}

protected static Inline From(Func<ActivityExecutionContext, ValueTask> activity) => new(activity);
protected static Inline From(Func<ValueTask> activity) => new(activity);
Expand All @@ -37,7 +40,7 @@ protected override void Execute(ActivityExecutionContext context)
/// <summary>
/// Represents a composite activity that has a single <see cref="Root"/> activity and returns a result.
/// </summary>
public class Composite<T> : Activity<T>
public class Composite<T> : ActivityBase
{
/// <summary>
/// The activity to schedule when this activity executes.
Expand All @@ -50,7 +53,10 @@ protected override void Execute(ActivityExecutionContext context)
context.ScheduleActivity(Root, OnCompletedAsync);
}

protected virtual ValueTask OnCompletedAsync(ActivityExecutionContext context, ActivityExecutionContext childContext) => ValueTask.CompletedTask;
protected virtual async ValueTask OnCompletedAsync(ActivityExecutionContext context, ActivityExecutionContext childContext)
{
await context.CompleteActivityAsync();
}

protected static Inline From(Func<ActivityExecutionContext, ValueTask> activity) => new(activity);
protected static Inline From(Func<ValueTask> activity) => new(activity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Elsa.Workflows.Core.ActivityNodeResolvers;
public class OutboundActivityPortResolver : IActivityPortResolver
{
public int Priority => -1;
public bool GetSupportsActivity(IActivity activity) => activity is Activity;
public bool GetSupportsActivity(IActivity activity) => true;

public ValueTask<IEnumerable<IActivity>> GetPortsAsync(IActivity activity, CancellationToken cancellationToken = default) =>
new(GetSinglePorts(activity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public async Task<RunWorkflowResult> RunAsync(WorkflowExecutionContext workflowE
ExecuteActivityDelegate? executeActivityDelegate,
CancellationToken cancellationToken)
{
var root = workflow.Root;
var root = workflow;

// Build graph.
var graph = await _activityWalker.WalkAsync(root, cancellationToken);
Expand Down
2 changes: 1 addition & 1 deletion src/modules/Elsa.Workflows.Core/Models/Bookmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ namespace Elsa.Workflows.Core.Models;
public record Bookmark(
string Id,
string Name,
string Hash,
string? Hash,
string? Data,
string ActivityId,
string ActivityInstanceId,
Expand Down
2 changes: 1 addition & 1 deletion src/modules/Elsa.Workflows.Core/Models/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public Workflow()
{
}

public static Workflow FromActivity(IActivity root) => new(root);
public static Workflow FromActivity(IActivity root) => root is Workflow workflow ? workflow : new(root);

/// <summary>
/// Creates a new memory register initialized with this workflow's variables.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Elsa.Mediator.Services;
using Elsa.Workflows.Core.Models;
using Elsa.Workflows.Persistence.Entities;
using Elsa.Workflows.Persistence.Services;
using Elsa.Workflows.Runtime.Notifications;
Expand All @@ -17,20 +18,36 @@ public BookmarkManager(IWorkflowBookmarkStore bookmarkStore, IEventPublisher eve
_eventPublisher = eventPublisher;
}

public async Task DeleteBookmarksAsync(IEnumerable<WorkflowBookmark> workflowBookmarks, CancellationToken cancellationToken = default)
public async Task DeleteAsync(IEnumerable<Bookmark> bookmarks, CancellationToken cancellationToken = default)
{
var list = workflowBookmarks.ToList();
var list = bookmarks.ToList();
var ids = list.Select(x => x.Id).ToList();
var bookmarks = list.Select(x => x.ToBookmark()).ToList();
await _bookmarkStore.DeleteManyAsync(ids, cancellationToken);
await _eventPublisher.PublishAsync(new WorkflowBookmarksDeleted(bookmarks), cancellationToken);
await _eventPublisher.PublishAsync(new WorkflowBookmarksDeleted(list), cancellationToken);
}

public async Task SaveBookmarksAsync(IEnumerable<WorkflowBookmark> workflowBookmarks, CancellationToken cancellationToken = default)
public async Task SaveAsync(WorkflowInstance workflowInstance, IEnumerable<Bookmark> bookmarks, CancellationToken cancellationToken = default)
{
var list = workflowBookmarks.ToList();
var bookmarks = list.Select(x => x.ToBookmark()).ToList();
await _bookmarkStore.SaveManyAsync(list, cancellationToken);
await _eventPublisher.PublishAsync(new WorkflowBookmarksSaved(bookmarks), cancellationToken);
var list = bookmarks.ToList();
var workflowBookmarks = list.Select(x => WorkflowBookmark.FromBookmark(x, workflowInstance)).ToList();
await _bookmarkStore.SaveManyAsync(workflowBookmarks, cancellationToken);
await _eventPublisher.PublishAsync(new WorkflowBookmarksSaved(list), cancellationToken);
}

public async Task<Bookmark?> FindByIdAsync(string id, CancellationToken cancellationToken)
{
var workflowBookmark = await _bookmarkStore.FindByIdAsync(id, cancellationToken);
return workflowBookmark?.ToBookmark();
}

public async Task<IEnumerable<Bookmark>> FindManyByWorkflowInstanceIdAsync(string workflowInstanceId, CancellationToken cancellationToken = default)
{
var workflowBookmarks = await _bookmarkStore.FindManyByWorkflowInstanceIdAsync(workflowInstanceId, cancellationToken);
return workflowBookmarks.Select(x => x.ToBookmark());
}

public async Task<IEnumerable<WorkflowBookmark>> FindManyByHashAsync(string bookmarkName, string hash, CancellationToken cancellationToken = default)
{
return await _bookmarkStore.FindManyAsync(bookmarkName, hash, cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,178 @@
using Elsa.Common.Services;
using Elsa.Mediator.Services;
using Elsa.Models;
using Elsa.Workflows.Core.Helpers;
using Elsa.Workflows.Core.Models;
using Elsa.Workflows.Core.Services;
using Elsa.Workflows.Core.State;
using Elsa.Workflows.Persistence.Entities;
using Elsa.Workflows.Persistence.Services;
using Elsa.Workflows.Runtime.Models;
using Elsa.Workflows.Runtime.Notifications;
using Elsa.Workflows.Runtime.Services;
using Open.Linq.AsyncExtensions;

namespace Elsa.Workflows.Runtime.Implementations;

public class DefaultWorkflowRuntime : IWorkflowRuntime
{
private readonly IWorkflowRunner _workflowRunner;
private readonly IWorkflowDefinitionService _workflowDefinitionService;
private readonly IWorkflowInstanceStore _workflowInstanceStore;
private readonly IBookmarkManager _bookmarkManager;
private readonly IHasher _hasher;
private readonly IEventPublisher _eventPublisher;
private readonly ISystemClock _systemClock;

public DefaultWorkflowRuntime(
IWorkflowRunner workflowRunner,
IWorkflowDefinitionService workflowDefinitionService,
IWorkflowInstanceStore workflowInstanceStore,
IBookmarkManager bookmarkManager,
IHasher hasher,
IEventPublisher eventPublisher,
ISystemClock systemClock)
{
_workflowRunner = workflowRunner;
_workflowDefinitionService = workflowDefinitionService;
_workflowInstanceStore = workflowInstanceStore;
_bookmarkManager = bookmarkManager;
_hasher = hasher;
_eventPublisher = eventPublisher;
_systemClock = systemClock;
}

public async Task<StartWorkflowResult> StartWorkflowAsync(string definitionId, StartWorkflowOptions options, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
var input = options.Input;
var versionOptions = options.VersionOptions;
var workflowDefinition = await _workflowDefinitionService.FindAsync(definitionId, versionOptions, cancellationToken);

if (workflowDefinition == null)
throw new Exception("Specified workflow definition and version does not exist");

var workflow = await _workflowDefinitionService.MaterializeWorkflowAsync(workflowDefinition, cancellationToken);
var workflowResult = await _workflowRunner.RunAsync(workflow, input, cancellationToken);
var workflowState = workflowResult.WorkflowState;
var finished = workflowResult.WorkflowState.Status == WorkflowStatus.Finished;

var workflowInstance = await SaveWorkflowInstanceAsync(workflowDefinition, workflowState, cancellationToken);
await UpdateBookmarksAsync(workflowInstance, new List<Bookmark>(), workflowResult.Bookmarks, cancellationToken);

return new StartWorkflowResult(workflowInstance.Id);
}

public async Task<ResumeWorkflowResult> ResumeWorkflowAsync(string instanceId, string bookmarkId, ResumeWorkflowOptions options, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
var workflowInstance = await _workflowInstanceStore.FindByIdAsync(instanceId, cancellationToken);

if (workflowInstance == null)
throw new Exception($"Workflow instance {instanceId} not found");

var workflowDefinition = await _workflowDefinitionService.FindAsync(workflowInstance.DefinitionId, VersionOptions.SpecificVersion(workflowInstance.Version), cancellationToken);

if (workflowDefinition == null)
throw new Exception("Specified workflow definition and version does not exist");

var input = options.Input;

var existingBookmarks = await _bookmarkManager.FindManyByWorkflowInstanceIdAsync(workflowInstance.Id, cancellationToken).ToList();
var bookmark = existingBookmarks.FirstOrDefault(x => x.Id == bookmarkId);

if (bookmark == null)
throw new Exception("Bookmark not found");

var workflow = await _workflowDefinitionService.MaterializeWorkflowAsync(workflowDefinition, cancellationToken);
var workflowState = workflowInstance.WorkflowState;
var workflowResult = await _workflowRunner.RunAsync(workflow, workflowState, bookmark, input, cancellationToken);
var finished = workflowResult.WorkflowState.Status == WorkflowStatus.Finished;

workflowInstance = await SaveWorkflowInstanceAsync(workflowDefinition, workflowState, cancellationToken);
await UpdateBookmarksAsync(workflowInstance, existingBookmarks, workflowResult.Bookmarks, cancellationToken);

return new ResumeWorkflowResult();
}

public async Task<TriggerWorkflowsResult> TriggerWorkflowsAsync(string bookmarkName, object bookmarkPayload, TriggerWorkflowsOptions options, CancellationToken cancellationToken = default)
{
var hash = _hasher.Hash(bookmarkPayload);
var bookmarks = await _bookmarkManager.FindManyByHashAsync(bookmarkName, hash, cancellationToken);

foreach (var bookmark in bookmarks)
{
var workflowInstanceId = bookmark.WorkflowInstanceId;
var resumeResult = await ResumeWorkflowAsync(workflowInstanceId, bookmark.Id, new ResumeWorkflowOptions(options.Input), cancellationToken);
}

return new TriggerWorkflowsResult();
}

private async Task<WorkflowInstance> SaveWorkflowInstanceAsync(WorkflowDefinition workflowDefinition, WorkflowState workflowState, CancellationToken cancellationToken)
{
var workflowInstance = FromWorkflowState(workflowState, workflowDefinition);
await _workflowInstanceStore.SaveAsync(workflowInstance, cancellationToken);
return workflowInstance;
}

private WorkflowInstance FromWorkflowState(WorkflowState workflowState, WorkflowDefinition workflowDefinition)
{
var workflowInstance = new WorkflowInstance
{
Id = workflowState.Id,
DefinitionId = workflowDefinition.DefinitionId,
DefinitionVersionId = workflowDefinition.Id,
Version = workflowDefinition.Version,
WorkflowState = workflowState,
Status = workflowState.Status,
SubStatus = workflowState.SubStatus,
CorrelationId = workflowState.CorrelationId,
Name = null,
};

// Update timestamps.
var now = _systemClock.UtcNow;

if (workflowInstance.Status == WorkflowStatus.Finished)
{
switch (workflowInstance.SubStatus)
{
case WorkflowSubStatus.Cancelled:
workflowInstance.CancelledAt = now;
break;
case WorkflowSubStatus.Faulted:
workflowInstance.FaultedAt = now;
break;
case WorkflowSubStatus.Finished:
workflowInstance.FinishedAt = now;
break;
}
}

return workflowInstance;
}

private async Task UpdateBookmarksAsync(WorkflowInstance workflowInstance, ICollection<Bookmark> previousBookmarks, ICollection<Bookmark> newBookmarks, CancellationToken cancellationToken)
{
await RemoveBookmarksAsync(previousBookmarks, cancellationToken);
await StoreBookmarksAsync(workflowInstance, newBookmarks, cancellationToken);
await PublishChangedBookmarksAsync(workflowInstance.WorkflowState, previousBookmarks, newBookmarks, cancellationToken);
}

private async Task StoreBookmarksAsync(WorkflowInstance workflowInstance, ICollection<Bookmark> bookmarks, CancellationToken cancellationToken)
{
await _bookmarkManager.SaveAsync(workflowInstance, bookmarks, cancellationToken);
}

private async Task RemoveBookmarksAsync(IEnumerable<Bookmark> bookmarks, CancellationToken cancellationToken)
{
await _bookmarkManager.DeleteAsync(bookmarks, cancellationToken);
}

public Task<TriggerWorkflowsResult> TriggerWorkflowsAsync(object bookmarkPayload, TriggerWorkflowsOptions options, CancellationToken cancellationToken = default)
private async Task PublishChangedBookmarksAsync(WorkflowState workflowState, ICollection<Bookmark> originalBookmarks, ICollection<Bookmark> updatedBookmarks, CancellationToken cancellationToken)
{
throw new NotImplementedException();
var diff = Diff.For(originalBookmarks, updatedBookmarks);
var removedBookmarks = diff.Removed;
var createdBookmarks = diff.Added;
await _eventPublisher.PublishAsync(new WorkflowBookmarksIndexed(new IndexedWorkflowBookmarks(workflowState, createdBookmarks, removedBookmarks)), cancellationToken);
}
}
Loading