Skip to content

Commit

Permalink
Merge branch '3.0.1' into v3.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
sfmskywalker committed Jan 6, 2024
2 parents 7f1b5be + 3b48636 commit 91ac5e8
Show file tree
Hide file tree
Showing 36 changed files with 508 additions and 113 deletions.
27 changes: 27 additions & 0 deletions src/bundles/Elsa.Server.Web/MyEndpoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Elsa.Abstractions;
using Elsa.Workflows.Runtime.Contracts;

namespace Elsa.Server.Web;

public class MyEndpoint : ElsaEndpointWithoutRequest
{
private readonly IEventPublisher _eventPublisher;

public MyEndpoint(IEventPublisher eventPublisher)
{
_eventPublisher = eventPublisher;
}

public override void Configure()
{
Get("/my-event-workflow");
AllowAnonymous();
}

public override async Task HandleAsync(CancellationToken ct)
{
Console.WriteLine("Publishing MyEvent");
var results = await _eventPublisher.PublishAsync("MyEvent", cancellationToken: ct);
Console.WriteLine($"Affected workflows: {results.Count}");
}
}
39 changes: 39 additions & 0 deletions src/bundles/Elsa.Server.Web/MyEventWorkflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using Elsa.Workflows;
using Elsa.Workflows.Activities;
using Elsa.Workflows.Contracts;
using Elsa.Workflows.Runtime.Activities;

namespace Elsa.Server.Web;

public class OnMyEventWorkflow : WorkflowBase
{
protected override void Build(IWorkflowBuilder builder)
{
builder.Version = 1;
builder.Id = "OnMyEventWorkflow";
builder.Root = new Sequence
{
Activities =
{
new Event("MyEvent")
{
CanStartWorkflow = true
},
new Inline(async () =>
{
// IEventPublisher.PublishAsync returns before this executes
await SomeCallAsync();
}),
new WriteLine("End of workflow"),
new Finish()
}
};
}

private async Task SomeCallAsync()
{
Console.WriteLine("Hello from OnMyEventWorkflow");
await Task.Delay(1000);
Console.WriteLine("Goodbye from OnMyEventWorkflow");
}
}
2 changes: 1 addition & 1 deletion src/bundles/Elsa.Server.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
const bool useMongoDb = false;
const bool useSqlServer = false;
const bool useDapper = false;
const bool useProtoActor = true;
const bool useProtoActor = false;
const bool useHangfire = false;
const bool useQuartz = true;
const bool useMassTransit = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public static IServiceCollection AddElsaClient(this IServiceCollection services,
{
var builderOptions = new ElsaClientBuilderOptions();
configureClient.Invoke(builderOptions);

builderOptions.ConfigureHttpClientBuilder += builder => builder.AddHttpMessageHandler(sp => (DelegatingHandler)sp.GetRequiredService(builderOptions.AuthenticationHandler));

services.AddScoped(builderOptions.AuthenticationHandler);
Expand All @@ -63,7 +62,19 @@ public static IServiceCollection AddElsaClient(this IServiceCollection services,
options.ConfigureHttpClient = builderOptions.ConfigureHttpClient;
options.ApiKey = builderOptions.ApiKey;
});

var builderOptionsWithoutRetryPolicy = new ElsaClientBuilderOptions
{
ApiKey = builderOptions.ApiKey,
AuthenticationHandler = builderOptions.AuthenticationHandler,
BaseAddress = builderOptions.BaseAddress,
ConfigureHttpClient = builderOptions.ConfigureHttpClient,
ConfigureHttpClientBuilder = builderOptions.ConfigureHttpClientBuilder,
ConfigureRetryPolicy = null
};

services.AddApi<IWorkflowDefinitionsApi>(builderOptions);
services.AddApi<IExecuteWorkflowApi>(builderOptionsWithoutRetryPolicy);
services.AddApi<IWorkflowInstancesApi>(builderOptions);
services.AddApi<IActivityDescriptorsApi>(builderOptions);
services.AddApi<IActivityDescriptorOptionsApi>(builderOptions);
Expand All @@ -89,8 +100,20 @@ public static IServiceCollection AddElsaClient(this IServiceCollection services,
public static void AddApi<T>(this IServiceCollection services, ElsaClientBuilderOptions? httpClientBuilderOptions = default) where T : class
{
var builder = services.AddRefitClient<T>(CreateRefitSettings, typeof(T).Name).ConfigureHttpClient(ConfigureElsaApiHttpClient);
httpClientBuilderOptions?.ConfigureHttpClientBuilder?.Invoke(builder);
builder.AddTransientHttpErrorPolicy(p => p.WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))));
httpClientBuilderOptions?.ConfigureHttpClientBuilder(builder);
httpClientBuilderOptions?.ConfigureRetryPolicy?.Invoke(builder);
}

/// <summary>
/// Adds a refit client for the specified API type.
/// </summary>
/// <param name="services">The service collection.</param>
/// <param name="httpClientBuilderOptions">An options object that can be used to configure the HTTP client builder.</param>
/// <typeparam name="T">The type representing the API.</typeparam>
public static void AddApiWithoutRetryPolicy<T>(this IServiceCollection services, ElsaClientBuilderOptions? httpClientBuilderOptions = default) where T : class
{
var builder = services.AddRefitClient<T>(CreateRefitSettings, typeof(T).Name).ConfigureHttpClient(ConfigureElsaApiHttpClient);
httpClientBuilderOptions?.ConfigureHttpClientBuilder(builder);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Elsa.Api.Client.HttpMessageHandlers;
using Microsoft.Extensions.DependencyInjection;
using Polly;

namespace Elsa.Api.Client.Options;

Expand Down Expand Up @@ -33,4 +34,9 @@ public class ElsaClientBuilderOptions
/// Gets or sets a delegate that can be used to configure the HTTP client builder.
/// </summary>
public Action<IHttpClientBuilder> ConfigureHttpClientBuilder { get; set; } = _ => { };

/// <summary>
/// Gets or sets a delegate that can be used to configure the retry policy.
/// </summary>
public Action<IHttpClientBuilder>? ConfigureRetryPolicy { get; set; } = builder => builder.AddTransientHttpErrorPolicy(p => p.WaitAndRetryAsync(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt))));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Elsa.Api.Client.Resources.WorkflowDefinitions.Requests;
using JetBrains.Annotations;
using Refit;

namespace Elsa.Api.Client.Resources.WorkflowDefinitions.Contracts;

/// <summary>
/// Represents a client for the workflow definitions API.
/// </summary>
[PublicAPI]
public interface IExecuteWorkflowApi
{
/// <summary>
/// Executes a workflow definition.
/// </summary>
/// <param name="definitionId">The definition ID of the workflow definition to execute.</param>
/// <param name="request">An optional request containing options for executing the workflow definition.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>A response containing information about the workflow instance that was created.</returns>
[Post("/workflow-definitions/{definitionId}/execute")]
Task<HttpResponseMessage> ExecuteAsync(string definitionId, ExecuteWorkflowDefinitionRequest? request, CancellationToken cancellationToken = default);

/// <summary>
/// Dispatches a request to execute the specified workflow definition.
/// </summary>
/// <param name="definitionId">The definition ID of the workflow definition to dispatch request.</param>
/// <param name="request">An optional request containing options for dispatching a request to execute the specified workflow definition.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>A response containing information about the workflow instance that was created.</returns>
[Post("/workflow-definitions/{definitionId}/dispatch")]
Task<HttpResponseMessage> DispatchAsync(string definitionId, DispatchWorkflowDefinitionRequest? request, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,24 +196,4 @@ public interface IWorkflowDefinitionsApi
/// <param name="cancellationToken">An optional cancellation token.</param>
[Post("/workflow-definitions/{definitionId}/revert/{version}")]
Task RevertVersionAsync(string definitionId, int version, CancellationToken cancellationToken = default);

/// <summary>
/// Executes a workflow definition.
/// </summary>
/// <param name="definitionId">The definition ID of the workflow definition to execute.</param>
/// <param name="request">An optional request containing options for executing the workflow definition.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>A response containing information about the workflow instance that was created.</returns>
[Post("/workflow-definitions/{definitionId}/execute")]
Task<HttpResponseMessage> ExecuteAsync(string definitionId, ExecuteWorkflowDefinitionRequest? request, CancellationToken cancellationToken = default);

/// <summary>
/// Dispatches a request to execute the specified workflow definition.
/// </summary>
/// <param name="definitionId">The definition ID of the workflow definition to dispatch request.</param>
/// <param name="request">An optional request containing options for dispatching a request to execute the specified workflow definition.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>A response containing information about the workflow instance that was created.</returns>
[Post("/workflow-definitions/{definitionId}/dispatch")]
Task<HttpResponseMessage> DispatchAsync(string definitionId, DispatchWorkflowDefinitionRequest? request, CancellationToken cancellationToken = default);
}
6 changes: 3 additions & 3 deletions src/common/Elsa.Api.Common/Elsa.Api.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
<PackageReference Include="FastEndpoints" Version="5.20.1.7-beta"/>
<PackageReference Include="FastEndpoints.Security" Version="5.20.1.7-beta"/>
<PackageReference Include="FastEndpoints.Swagger" Version="5.20.1.7-beta"/>
<PackageReference Include="FastEndpoints" Version="5.21.2"/>
<PackageReference Include="FastEndpoints.Security" Version="5.21.2"/>
<PackageReference Include="FastEndpoints.Swagger" Version="5.21.2"/>
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 0 additions & 2 deletions src/modules/Elsa.Http/Activities/FlowSendHttpRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ protected override async ValueTask HandleResponseAsync(ActivityExecutionContext
outcomes.Add(outcome);

outcomes.Add("Done");

context.JournalData["StatusCode"] = statusCode;
await context.CompleteActivityWithOutcomesAsync(outcomes.ToArray());
}

Expand Down
17 changes: 17 additions & 0 deletions src/modules/Elsa.Http/Activities/SendHttpRequestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,24 @@ protected SendHttpRequestBase(string? source = default, int? line = default) : b
)]
public Input<HttpHeaders?> RequestHeaders { get; set; } = new(new HttpHeaders());

/// <summary>
/// The HTTP response status code
/// </summary>
[Output(Description = "The HTTP response status code")]
public Output<int> StatusCode { get; set; } = default!;

/// <summary>
/// The parsed content, if any.
/// </summary>
[Output(Description = "The parsed content, if any.")]
public Output<object?> ParsedContent { get; set; } = default!;

/// <summary>
/// The response headers that were received.
/// </summary>
[Output(Description = "The response headers that were received.")]
public Output<HttpHeaders?> ResponseHeaders { get; set; } = default!;

/// <inheritdoc />
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
Expand Down Expand Up @@ -115,8 +127,13 @@ private async Task TrySendAsync(ActivityExecutionContext context)
{
var response = await httpClient.SendAsync(request, cancellationToken);
var parsedContent = await ParseContentAsync(context, response.Content);
var statusCode = (int)response.StatusCode;
var responseHeaders = new HttpHeaders(response.Headers);

context.Set(Result, response);
context.Set(ParsedContent, parsedContent);
context.Set(StatusCode, statusCode);
context.Set(ResponseHeaders, responseHeaders);

await HandleResponseAsync(context, response);
}
Expand Down
20 changes: 20 additions & 0 deletions src/modules/Elsa.Http/Models/HttpHeaders.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Net.Http.Headers;
using System.Text.Json.Serialization;
using Elsa.Extensions;
using Elsa.Http.Serialization;
Expand All @@ -10,6 +11,25 @@ namespace Elsa.Http.Models;
[JsonConverter(typeof(HttpHeadersConverter))]
public class HttpHeaders : Dictionary<string, string[]>
{
/// <inheritdoc />
public HttpHeaders()
{
}

/// <inheritdoc />
public HttpHeaders(IDictionary<string, string[]> source)
{
foreach (var item in source)
Add(item.Key, item.Value);
}

/// <inheritdoc />
public HttpHeaders(HttpResponseHeaders source)
{
foreach (var item in source)
Add(item.Key, item.Value.ToArray());
}

/// <summary>
/// Gets the content type.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ await _bus.Publish(new DispatchWorkflowInstance(request.InstanceId)
ActivityInstanceId = request.ActivityInstanceId,
ActivityHash = request.ActivityHash,
Input = request.Input,
Properties = request.Properties,
CorrelationId = request.CorrelationId
}, cancellationToken);
return new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public async ValueTask InvokeAsync(ActivityExecutionContext context)
}

// Check if this is a background execution.
var isBackgroundExecution = context.TransientProperties.GetValueOrDefault<object, bool>(BackgroundActivityCollectorMiddleware.IsBackgroundExecution);
var isBackgroundExecution = context.GetIsBackgroundExecution();

// Is the activity configured to load the context?
foreach (var providerType in providerTypes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,29 @@ public async ValueTask ScheduleActivityAsync(IActivity? activity, ActivityExecut
/// <param name="options">The options used to schedule the activity.</param>
public async ValueTask ScheduleActivityAsync(ActivityNode? activityNode, ActivityExecutionContext? owner = default, ScheduleWorkOptions? options = default)
{
if (this.GetIsBackgroundExecution())
{
var scheduledActivity = new ScheduledActivity
{
ActivityNodeId = activityNode?.NodeId,
OwnerActivityInstanceId = owner?.Id,
Options = options != null ? new ScheduledActivityOptions
{
CompletionCallback = options?.CompletionCallback?.Method.Name,
Tag = options?.Tag,
ExistingActivityInstanceId = options?.ExistingActivityExecutionContext?.Id,
PreventDuplicateScheduling = options?.PreventDuplicateScheduling ?? false,
Variables = options?.Variables?.ToList(),
Input = options?.Input
} : default
};

var scheduledActivities = this.GetBackgroundScheduledActivities().ToList();
scheduledActivities.Add(scheduledActivity);
this.SetBackgroundScheduledActivities(scheduledActivities);
return;
}

var completionCallback = options?.CompletionCallback;
owner ??= this;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,16 @@ public static async ValueTask CompleteActivityAsync(this ActivityCompletedContex
/// </summary>
public static async ValueTask CompleteActivityAsync(this ActivityExecutionContext context, object? result = default)
{
var outcomes = result as Outcomes;

// If the activity is executing in the background, simply capture the result and return.
if (context.GetIsBackgroundExecution())
{
if (outcomes != null)
context.SetBackgroundOutcomes(outcomes.Names);
return;
}

// If the activity is not running, do nothing.
if (context.Status != ActivityStatus.Running)
return;
Expand All @@ -470,7 +480,7 @@ public static async ValueTask CompleteActivityAsync(this ActivityExecutionContex
context.Status = ActivityStatus.Completed;

// Record the outcomes, if any.
if (result is Outcomes outcomes)
if (outcomes != null)
context.JournalData["Outcomes"] = outcomes.Names;

// Record the output, if any.
Expand Down
Loading

0 comments on commit 91ac5e8

Please sign in to comment.