Skip to content

Commit

Permalink
Elastic updates (#3623)
Browse files Browse the repository at this point in the history
* Incremental work

* Incremental work

* Incremental work

* Merge remote-tracking branch 'origin/v3' into v3-elasticsearch-updates

* Incremental work

* Incremental work

* Fix listing with no query

* Restore missing class

* Remove rollover hosted service and expose index configurations

* Rename hosted service
  • Loading branch information
sfmskywalker committed Jan 13, 2023
1 parent dad33e6 commit 096206d
Show file tree
Hide file tree
Showing 66 changed files with 869 additions and 454 deletions.
7 changes: 7 additions & 0 deletions Elsa.sln
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.ProtoActor.Cluster.Azu
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Samples.AzureServiceBusActivities", "src\samples\aspnet\Elsa.Samples.AzureServiceBusActivities\Elsa.Samples.AzureServiceBusActivities.csproj", "{C25CFDCF-8E06-4DD8-A83E-FF7EF9FDA02E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Samples.ElasticsearchStorage", "src\samples\aspnet\Elsa.Samples.ElasticsearchStorage\Elsa.Samples.ElasticsearchStorage.csproj", "{4A54379F-4775-41B6-9752-FF300288916A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -404,6 +406,10 @@ Global
{C25CFDCF-8E06-4DD8-A83E-FF7EF9FDA02E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C25CFDCF-8E06-4DD8-A83E-FF7EF9FDA02E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C25CFDCF-8E06-4DD8-A83E-FF7EF9FDA02E}.Release|Any CPU.Build.0 = Release|Any CPU
{4A54379F-4775-41B6-9752-FF300288916A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4A54379F-4775-41B6-9752-FF300288916A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4A54379F-4775-41B6-9752-FF300288916A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4A54379F-4775-41B6-9752-FF300288916A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{155227F0-A33B-40AA-A4B4-06F813EB921B} = {61017E64-6D00-49CB-9E81-5002DC8F7D5F}
Expand Down Expand Up @@ -473,5 +479,6 @@ Global
{A1A8AD89-C9C4-41BF-BBCB-EF0544A28BFB} = {56C2FFB8-EA54-45B5-A095-4A78142EB4B5}
{9F79BBEC-02CE-4F76-AED4-BC191DA3054B} = {5BA4A8FA-F7F4-45B3-AEC8-8886D35AAC79}
{C25CFDCF-8E06-4DD8-A83E-FF7EF9FDA02E} = {56C2FFB8-EA54-45B5-A095-4A78142EB4B5}
{4A54379F-4775-41B6-9752-FF300288916A} = {56C2FFB8-EA54-45B5-A095-4A78142EB4B5}
EndGlobalSection
EndGlobal
33 changes: 32 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,40 @@ services:
ports:
- "15672:15672"
- "5672:5672"

elasticsearch:
image: "docker.elastic.co/elasticsearch/elasticsearch:8.5.0"
environment:
- xpack.security.enabled=false
- discovery.type=single-node
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
cap_add:
- IPC_LOCK
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
- "9300:9300"

kibana:
image: "docker.elastic.co/kibana/kibana:8.5.0"
environment:
- ELASTICSEARCH_HOSTS=http:https://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
- elasticsearch

networks:
consul:

volumes:
mongodb_data:
mongodb_data:
elasticsearch-data:
driver: local
7 changes: 1 addition & 6 deletions src/bundles/Elsa.WorkflowServer.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,13 @@
identity.TokenOptions = identityTokenOptions;
})
.UseDefaultAuthentication()
.UseWorkflowManagement(management =>
{
management.UseDefaultManagement(m => m.UseEntityFrameworkCore(ef => ef.UseSqlite(sqliteConnectionString)));
management.UseWorkflowInstances(w => w.UseEntityFrameworkCore(ef => ef.UseSqlite(sqliteConnectionString)));
})
.UseWorkflowManagement(management => management.UseEntityFrameworkCore(m => m.UseSqlite(sqliteConnectionString)))
.UseWorkflowRuntime(runtime =>
{
runtime.UseProtoActor(proto =>
{
proto.PersistenceProvider = _ => new SqliteProvider(new SqliteConnectionStringBuilder(sqliteConnectionString));
});
runtime.UseDefaultRuntime(df => df.UseEntityFrameworkCore(ef => ef.UseSqlite(sqliteConnectionString)));
runtime.UseExecutionLogRecords(e => e.UseEntityFrameworkCore(ef => ef.UseSqlite(sqliteConnectionString)));
runtime.UseAsyncWorkflowStateExporter();
runtime.UseMassTransitDispatcher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class ActivityNameFormatter {
public static readonly SnakeCaseStrategy: ActivityNameStrategy = context => snakeCase(ActivityNameFormatter.DefaultStrategy(context));
public static readonly KebabCaseStrategy: ActivityNameStrategy = context => kebabCase(ActivityNameFormatter.DefaultStrategy(context));

public strategy: ActivityNameStrategy = ActivityNameFormatter.CamelCaseStrategy;
public strategy: ActivityNameStrategy = ActivityNameFormatter.PascalCaseStrategy;

public format(context: ActivityNameFormatterContext): string {
return this.strategy(context);
Expand Down
9 changes: 0 additions & 9 deletions src/modules/Elsa.Elasticsearch/Common/ElasticConfiguration.cs

This file was deleted.

73 changes: 47 additions & 26 deletions src/modules/Elsa.Elasticsearch/Common/ElasticStore.cs
Original file line number Diff line number Diff line change
@@ -1,73 +1,94 @@
using System.Collections.ObjectModel;
using Elastic.Clients.Elasticsearch;
using Elsa.Common.Models;
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
using Exception = System.Exception;

namespace Elsa.Elasticsearch.Common;

/// <summary>
/// A thin wrapper around <see cref="ElasticsearchClient"/> for easy re-usability.
/// </summary>
/// <typeparam name="T">The document type.</typeparam>
[PublicAPI]
public class ElasticStore<T> where T : class
{
private readonly ElasticsearchClient _elasticClient;
private readonly ILogger _logger;

/// <summary>
/// Constructor.
/// </summary>
public ElasticStore(ElasticsearchClient elasticClient, ILogger<ElasticStore<T>> logger)
{
_elasticClient = elasticClient;
_logger = logger;
}

/// <summary>
/// Searches the index using the specified search descriptor.
/// </summary>
public async Task<Page<T>> SearchAsync(Action<SearchRequestDescriptor<T>> search, PageArgs? pageArgs, CancellationToken cancellationToken)
{
if (pageArgs != default)
if (pageArgs?.Page != null && pageArgs?.PageSize != null)
{
search += s => s.From(pageArgs.Offset).Size(pageArgs.Limit);
}

var response = await _elasticClient.SearchAsync(search, cancellationToken);

if (response.IsSuccess())
return new Page<T>(response.Hits.Select(hit => hit.Source).ToList()!, response.Total);

_logger.LogError("Failed to search data in Elasticsearch: {message}", response.ElasticsearchServerError.ToString());
return new Page<T>(new Collection<T>(), 0);

if (!response.IsSuccess())
throw new Exception(response.DebugInformation);

return new Page<T>(response.Hits.Select(hit => hit.Source).ToList()!, response.Total);
}

public async Task SaveAsync(T model, CancellationToken cancellationToken)
/// <summary>
/// Stores the specified document in the index.
/// </summary>
public async Task SaveAsync(T document, CancellationToken cancellationToken)
{
var response = await _elasticClient.IndexAsync(model, cancellationToken);
var response = await _elasticClient.IndexAsync(document, cancellationToken);

if (response.IsSuccess()) return;

throw new Exception($"Failed to save data in Elasticsearch: {response.ElasticsearchServerError}");
if (!response.IsSuccess())
throw new Exception($"Failed to save data in Elasticsearch: {response.ElasticsearchServerError}");
}

/// <summary>
/// Stores the specified documents in the index.
/// </summary>
public async Task SaveManyAsync(IEnumerable<T> documents, CancellationToken cancellationToken)
{
var response = await _elasticClient.IndexManyAsync(documents, cancellationToken);

if (response.IsSuccess()) return;

throw new Exception($"Failed to save data in Elasticsearch: {response.ElasticsearchServerError}");
if (!response.IsSuccess())
throw new Exception($"Failed to save data in Elasticsearch: {response.ElasticsearchServerError}");
}

public async Task<long> DeleteManyAsync(IEnumerable<T> list, CancellationToken cancellationToken)
/// <summary>
/// Deletes the specified set of documents from the index.
/// </summary>
public async Task<long> DeleteManyAsync(IEnumerable<T> documents, CancellationToken cancellationToken)
{
var response = await _elasticClient.BulkAsync(b => b.DeleteMany(list), cancellationToken);
if (response.IsSuccess()) return response.Items.Count(i => i.IsValid);

_logger.LogError("Failed to bulk delete data in Elasticsearch: {message}", response.ElasticsearchServerError.ToString());
return 0;
var response = await _elasticClient.BulkAsync(b => b.DeleteMany(documents), cancellationToken);

if (!response.IsSuccess())
throw new Exception(response.DebugInformation);

return response.Items.Count(i => i.IsValid);
}

/// <summary>
/// Deletes the documents matching the specified query.
/// </summary>
public async Task<long> DeleteByQueryAsync(Action<DeleteByQueryRequestDescriptor<T>> query, CancellationToken cancellationToken)
{
var response = await _elasticClient.DeleteByQueryAsync(Indices.All, query, cancellationToken);

if (response.IsSuccess()) return response.Deleted ?? 0;

_logger.LogError("Failed to delete data in Elasticsearch: {message}", response.ElasticsearchServerError.ToString());
return 0;
if (!response.IsSuccess())
throw new Exception(response.DebugInformation);

return response.Deleted ?? 0;
}
}
23 changes: 23 additions & 0 deletions src/modules/Elsa.Elasticsearch/Common/IndexConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Elastic.Clients.Elasticsearch;
using Elsa.Elasticsearch.Services;
using Elsa.Elasticsearch.Strategies;

namespace Elsa.Elasticsearch.Common;

/// <summary>
/// A convenience base class for document type configurations.
/// </summary>
public abstract class IndexConfiguration<T> : IIndexConfiguration<T>
{
/// <inheritdoc />
public Type DocumentType => typeof(T);

/// <inheritdoc />
public virtual IIndexNamingStrategy IndexNamingStrategy => new DefaultNaming();

/// <inheritdoc />
public abstract void ConfigureClientSettings(ElasticsearchClientSettings settings);

/// <inheritdoc />
public virtual ValueTask ConfigureClientAsync(ElasticsearchClient client, CancellationToken cancellationToken) => ValueTask.CompletedTask;
}
17 changes: 16 additions & 1 deletion src/modules/Elsa.Elasticsearch/Common/PersistenceFeatureBase.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
using Elsa.Elasticsearch.Services;
using Elsa.Features.Abstractions;
using Elsa.Features.Services;
using Microsoft.Extensions.DependencyInjection;

namespace Elsa.Elasticsearch.Common;

/// <summary>
/// Base class for features that configure Elasticsearch persistence.
/// </summary>
public abstract class ElasticPersistenceFeatureBase : FeatureBase
{
public ElasticPersistenceFeatureBase(IModule module) : base(module)
/// <inheritdoc />
protected ElasticPersistenceFeatureBase(IModule module) : base(module)
{
}

/// <summary>
/// Registers an <see cref="ElasticStore{T}"/>.
/// </summary>
/// <typeparam name="TModel">The entity type of the store.</typeparam>
/// <typeparam name="TStore">The type of the store.</typeparam>
protected void AddStore<TModel, TStore>() where TModel : class where TStore : class
{
Services
.AddSingleton<ElasticStore<TModel>>()
.AddSingleton<TStore>();
}

/// <summary>
/// Registers an <see cref="IIndexConfiguration"/>.
/// </summary>
protected void AddIndexConfiguration<TDocument>(Func<IServiceProvider, IIndexConfiguration<TDocument>> configuration) => Services.AddSingleton<IIndexConfiguration>(configuration);
}
28 changes: 0 additions & 28 deletions src/modules/Elsa.Elasticsearch/Common/Utils.cs

This file was deleted.

43 changes: 1 addition & 42 deletions src/modules/Elsa.Elasticsearch/Extensions/ElasticExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,59 +1,18 @@
using Elastic.Clients.Elasticsearch;
using Elastic.Clients.Elasticsearch.IndexManagement;
using Elastic.Transport;
using Elsa.Elasticsearch.Common;
using Elsa.Elasticsearch.Models;
using Elsa.Elasticsearch.Options;
using Elsa.Elasticsearch.Services;

namespace Elsa.Elasticsearch.Extensions;

public static class ElasticExtensions
internal static class ElasticExtensions
{
public static ElasticsearchClientSettings ConfigureAuthentication(this ElasticsearchClientSettings settings, ElasticsearchOptions options)
{
if (!string.IsNullOrEmpty(options.ApiKey))
{
settings.Authentication(new ApiKey(options.ApiKey));
}
else if (!string.IsNullOrEmpty(options.Username) && !string.IsNullOrEmpty(options.Password))
{
settings.Authentication(new BasicAuthentication(options.Username, options.Password));
}

return settings;
}

public static ElasticsearchClientSettings ConfigureMapping(this ElasticsearchClientSettings settings, IDictionary<Type,string> indexConfig)
{
foreach (var config in Utils.GetElasticConfigurationTypes())
{
var configInstance = (IElasticConfiguration)Activator.CreateInstance(config)!;
configInstance.Apply(settings, indexConfig);
}

return settings;
}

public static void ConfigureAliases(this ElasticsearchClient client, IDictionary<Type,string> aliasConfig, IndexRolloverStrategy strategy)
{
var namingStrategy = (IIndexNamingStrategy)Activator.CreateInstance(strategy.IndexNamingStrategy)!;

foreach (var type in Utils.GetElasticDocumentTypes())
{
var aliasName = aliasConfig[type];
var indexName = namingStrategy.GenerateName(aliasName);

var indexExists = client.Indices.Exists(indexName).Exists;
if (indexExists) continue;

var response = client.Indices.Create(indexName, c => c
.Aliases(a => a.Add(aliasName, new Alias {IsWriteIndex = true})));

if (response.IsValidResponse) continue;
response.TryGetOriginalException(out var exception);
if(exception != null)
throw exception;
}
}
}
Loading

0 comments on commit 096206d

Please sign in to comment.