forked from elsa-workflows/elsa-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
EventPublisher.cs
72 lines (65 loc) · 2.57 KB
/
EventPublisher.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
using Elsa.Mediator;
using Elsa.Mediator.Contracts;
using Elsa.Workflows.Runtime.Activities;
using Elsa.Workflows.Runtime.Bookmarks;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Models;
using Elsa.Workflows.Runtime.Options;
using Elsa.Workflows.Runtime.Results;
namespace Elsa.Workflows.Runtime.Services;
/// <inheritdoc />
public class EventPublisher : IEventPublisher
{
private readonly IWorkflowInbox _workflowInbox;
/// <summary>
/// Constructor.
/// </summary>
public EventPublisher(IWorkflowInbox workflowInbox)
{
_workflowInbox = workflowInbox;
}
/// <inheritdoc />
public async Task<ICollection<WorkflowExecutionResult>> PublishAsync(
string eventName,
string? correlationId = default,
string? workflowInstanceId = default,
string? activityInstanceId = default,
object? payload = default,
CancellationToken cancellationToken = default)
{
return await PublishInternalAsync(eventName, false, correlationId, workflowInstanceId, activityInstanceId, payload, cancellationToken);
}
/// <inheritdoc />
public async Task DispatchAsync(
string eventName,
string? correlationId = default,
string? workflowInstanceId = default,
string? activityInstanceId = default,
object? payload = default,
CancellationToken cancellationToken = default)
{
await PublishInternalAsync(eventName, true, correlationId, workflowInstanceId, activityInstanceId, payload, cancellationToken);
}
private async Task<ICollection<WorkflowExecutionResult>> PublishInternalAsync(
string eventName,
bool dispatchAsynchronously,
string? correlationId = default,
string? workflowInstanceId = default,
string? activityInstanceId = default,
object? payload = default,
CancellationToken cancellationToken = default)
{
var eventBookmark = new EventBookmarkPayload(eventName);
var workflowInput = new Dictionary<string, object>
{
[Event.EventPayloadWorkflowInputKey] = payload ?? new Dictionary<string, object>()
};
var message = NewWorkflowInboxMessage.For<Event>(eventBookmark, workflowInstanceId, correlationId, activityInstanceId, workflowInput);
var options = new WorkflowInboxMessageDeliveryOptions
{
DispatchAsynchronously = dispatchAsynchronously
};
var result = await _workflowInbox.SubmitAsync(message, options, cancellationToken);
return result.WorkflowExecutionResults;
}
}