Skip to content

Commit

Permalink
Optional rollover strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
grkngrn committed Jan 10, 2023
1 parent ce2434f commit 02ff020
Show file tree
Hide file tree
Showing 20 changed files with 233 additions and 158 deletions.
27 changes: 27 additions & 0 deletions src/modules/Elsa.Elasticsearch/Common/PersistanceFeatureBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Elasticsearch.Net;
using Elsa.Elasticsearch.Extensions;
using Elsa.Elasticsearch.HostedServices;
using Elsa.Elasticsearch.Models;
using Elsa.Elasticsearch.Options;
using Elsa.Elasticsearch.Scheduling;
using Elsa.Elasticsearch.Services;
using Elsa.Features.Abstractions;
using Elsa.Features.Services;
using Microsoft.Extensions.DependencyInjection;
using Nest;

namespace Elsa.Elasticsearch.Common;

public abstract class ElasticPersistanceFeatureBase : FeatureBase
{
public ElasticPersistanceFeatureBase(IModule module) : base(module)
{
}

protected void AddStore<TModel, TStore>() where TModel : class where TStore : class
{
Services
.AddSingleton<ElasticStore<TModel>>()
.AddSingleton<TStore>();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Elasticsearch.Net;
using Elsa.Elasticsearch.Common;
using Elsa.Elasticsearch.Implementations.RolloverStrategies;
using Elsa.Elasticsearch.Models;
using Elsa.Elasticsearch.Options;
using Elsa.Elasticsearch.Services;
Expand Down Expand Up @@ -36,9 +35,9 @@ public static ConnectionSettings ConfigureMapping(this ConnectionSettings settin
return settings;
}

public static void ApplyRolloverStrategy(this ElasticClient client, IDictionary<Type,string> aliasConfig, IndexRolloverStrategy strategy)
public static void ConfigureAliasNaming(this ElasticClient client, IDictionary<Type,string> aliasConfig, IndexRolloverStrategy strategy)
{
var strategyInstance = (IRolloverStrategy)Activator.CreateInstance(strategy.Value, args: client)!;
strategyInstance.Apply(Utils.GetElasticDocumentTypes(), aliasConfig);
var namingStrategy = (IIndexNamingStrategy)Activator.CreateInstance(strategy.IndexNamingStrategy, args: client)!;
namingStrategy.Apply(Utils.GetElasticDocumentTypes(), aliasConfig);
}
}
31 changes: 31 additions & 0 deletions src/modules/Elsa.Elasticsearch/Extensions/ModuleExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using Elsa.Elasticsearch.Common;
using Elsa.Elasticsearch.Features;
using Elsa.Elasticsearch.Models;
using Elsa.Elasticsearch.Options;
using Elsa.Features.Services;

namespace Elsa.Elasticsearch.Extensions;

public static class ModuleExtensions
{
/// <summary>
/// Enables the <see cref="ElasticsearchFeature"/> feature.
/// </summary>
public static IModule UseElasticsearch(
this IModule module,
ElasticsearchOptions options,
IndexRolloverStrategy? rolloverStrategy = default,
IDictionary<string,string>? indexConfig = default,
Action<ElasticsearchFeature>? configure = default)
{
configure += f =>
{
f.Options = options;
f.IndexRolloverStrategy = rolloverStrategy;
f.IndexConfig = Utils.ResolveAliasConfig(f.IndexConfig, options.IndexConfig, indexConfig);
};

module.Configure(configure);
return module;
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
using Elasticsearch.Net;
using Elsa.Elasticsearch.Extensions;
using Elsa.Elasticsearch.HostedServices;
using Elsa.Elasticsearch.Models;
using Elsa.Elasticsearch.Options;
using Elsa.Elasticsearch.Scheduling;
using Elsa.Elasticsearch.Services;
using Elsa.Features.Abstractions;
using Elsa.Features.Services;
using Microsoft.Extensions.DependencyInjection;
using Nest;

namespace Elsa.Elasticsearch.Common;
namespace Elsa.Elasticsearch.Features;

public abstract class ElasticFeatureBase : FeatureBase
public class ElasticsearchFeature : FeatureBase
{
public ElasticFeatureBase(IModule module) : base(module)
public ElasticsearchFeature(IModule module) : base(module)
{
}

Expand All @@ -24,18 +22,24 @@ public ElasticFeatureBase(IModule module) : base(module)

public override void ConfigureHostedServices()
{
Module.ConfigureHostedService<ConfigureElasticsearchHostedService>(-1);
Module.ConfigureHostedService<ConfigureMappingHostedService>(-1);

if (IndexRolloverStrategy != null)
{
Module.ConfigureHostedService<ConfigureIndexRolloverHostedService>(-1);
}
}

public override void Apply()
{
if (Services.Any(x => x.ServiceType == typeof(ElasticClient))) return;

var elasticClient = new ElasticClient(GetSettings());

if (IndexRolloverStrategy != null)
{
elasticClient.ApplyRolloverStrategy(IndexConfig, IndexRolloverStrategy!);
elasticClient.ConfigureAliasNaming(IndexConfig, IndexRolloverStrategy);

var typeInstance = (IIndexRolloverStrategy) Activator.CreateInstance(IndexRolloverStrategy.Value, args: elasticClient)!;
Services.AddSingleton<IIndexRolloverStrategy>(_ => typeInstance);
}

Services.AddSingleton(elasticClient);
Expand All @@ -47,11 +51,4 @@ private ConnectionSettings GetSettings()
.ConfigureAuthentication(Options)
.ConfigureMapping(IndexConfig);
}

protected void AddStore<TModel, TStore>() where TModel : class where TStore : class
{
Services
.AddSingleton<ElasticStore<TModel>>()
.AddSingleton<TStore>();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using Elsa.Elasticsearch.Scheduling;
using Elsa.Jobs.Schedules;
using Elsa.Jobs.Services;
using Microsoft.Extensions.Hosting;

namespace Elsa.Elasticsearch.HostedServices;

public class ConfigureIndexRolloverHostedService : IHostedService
{
private readonly IJobScheduler _jobScheduler;

public ConfigureIndexRolloverHostedService(IJobScheduler jobScheduler)
{
_jobScheduler = jobScheduler;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
var job = new ConfigureIndexRolloverJob();
var schedule = new CronSchedule
{
// At the beginning of every month
//CronExpression = "0 0 1 * *"
CronExpression = "*/5 * * * *"
};

await _jobScheduler.ScheduleAsync(job, GetType().Name, schedule, cancellationToken: cancellationToken);
}

public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Elsa.Workflows.Management.Entities;
using Microsoft.Extensions.Hosting;
using Nest;

namespace Elsa.Elasticsearch.HostedServices;

public class ConfigureMappingHostedService : IHostedService
{
private readonly ElasticClient _elasticClient;

public ConfigureMappingHostedService(ElasticClient elasticClient)
{
_elasticClient = elasticClient;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
await _elasticClient.Indices.PutMappingAsync<WorkflowInstance>(
descriptor => descriptor
.Properties(p => p
.Flattened(d => d
.Name(p => p.WorkflowState.Properties))),
cancellationToken);
}

public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using Elsa.Elasticsearch.Common;
using Elsa.Elasticsearch.Services;
using Nest;

namespace Elsa.Elasticsearch.Implementations.IndexNamingStrategies;

public class NamingWithYearAndMonth : IIndexNamingStrategy
{
private readonly ElasticClient _client;

public NamingWithYearAndMonth(ElasticClient client)
{
_client = client;
}

public void Apply(IEnumerable<Type> typesToConfigure, IDictionary<Type, string> aliasConfig)
{
foreach (var type in typesToConfigure)
{
var aliasName = aliasConfig[type];
var indexName = Utils.GenerateIndexName(aliasName);

var indexExists = _client.Indices.Exists(indexName).Exists;

if (indexExists) continue;

var response = _client.Indices.Create(indexName, s => s
.Aliases(a => a.Alias(aliasName))
.Map(m => m.AutoMap(type)));

if (response.IsValid) continue;
throw response.OriginalException;
}
}
}
17 changes: 17 additions & 0 deletions src/modules/Elsa.Elasticsearch/Models/IndexNamingStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Elsa.Elasticsearch.Implementations.IndexNamingStrategies;

namespace Elsa.Elasticsearch.Models;

public class IndexNamingStrategy
{
private IndexNamingStrategy(Type value) { Value = value; }

public Type Value { get; private set; }

public static IndexNamingStrategy NamingWithYearAndMonth => new (typeof(NamingWithYearAndMonth));

public override string ToString()
{
return Value.Name;
}
}
17 changes: 10 additions & 7 deletions src/modules/Elsa.Elasticsearch/Models/IndexRolloverStrategy.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
using System.Reflection.Emit;
using Elsa.Elasticsearch.Implementations.IndexNamingStrategies;
using Elsa.Elasticsearch.Implementations.RolloverStrategies;
using Elsa.Elasticsearch.Services;

namespace Elsa.Elasticsearch.Models;

public class IndexRolloverStrategy
{
private IndexRolloverStrategy(Type value) { Value = value; }
private IndexRolloverStrategy(Type value, Type indexNamingStrategy)
{
Value = value;
IndexNamingStrategy = indexNamingStrategy;
}

public Type Value { get; private set; }
public Type IndexNamingStrategy { get; private set; }

public static IndexRolloverStrategy RolloverOnMonthlyBasis => new (typeof(RolloverOnMonthlyBasis));

public override string ToString()
{
return Value.Name;
}
public static IndexRolloverStrategy RolloverOnMonthlyBasis => new (typeof(RolloverOnMonthlyBasis),typeof(NamingWithYearAndMonth));
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Elsa.Elasticsearch.Modules.Management;

[DependsOn(typeof(WorkflowManagementFeature))]
public class ElasticWorkflowInstanceFeature : ElasticFeatureBase
public class ElasticWorkflowInstanceFeature : ElasticPersistanceFeatureBase
{
public ElasticWorkflowInstanceFeature(IModule module) : base(module)
{
Expand Down
17 changes: 1 addition & 16 deletions src/modules/Elsa.Elasticsearch/Modules/Management/Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
using Elsa.Elasticsearch.Common;
using Elsa.Elasticsearch.Models;
using Elsa.Elasticsearch.Options;
using Elsa.Workflows.Management.Features;

namespace Elsa.Elasticsearch.Modules.Management;
Expand All @@ -10,20 +7,8 @@ public static class Extensions
/// <summary>
/// Configures the <see cref="WorkflowInstanceFeature"/> to use the <see cref="ElasticWorkflowInstanceFeature"/>.
/// </summary>
public static WorkflowInstanceFeature UseElasticsearch(
this WorkflowInstanceFeature feature,
ElasticsearchOptions options,
IndexRolloverStrategy? rolloverStrategy = default,
IDictionary<string,string>? indexConfig = default,
Action<ElasticWorkflowInstanceFeature>? configure = default)
public static WorkflowInstanceFeature UseElasticsearch(this WorkflowInstanceFeature feature, Action<ElasticWorkflowInstanceFeature>? configure = default)
{
configure += f =>
{
f.Options = options;
f.IndexRolloverStrategy = rolloverStrategy;
f.IndexConfig = Utils.ResolveAliasConfig(f.IndexConfig, options.IndexConfig, indexConfig);
};

feature.Module.Configure(configure);
return feature;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Elsa.Elasticsearch.Modules.Runtime;

[DependsOn(typeof(WorkflowRuntimeFeature))]
public class ElasticExecutionLogRecordFeature : ElasticFeatureBase
public class ElasticExecutionLogRecordFeature : ElasticPersistanceFeatureBase
{
public ElasticExecutionLogRecordFeature(IModule module) : base(module)
{
Expand Down
Loading

0 comments on commit 02ff020

Please sign in to comment.