Skip to content

Commit

Permalink
Add compression feature for workflow state data
Browse files Browse the repository at this point in the history
This update adds a compression feature for workflow state data to reduce storage needs. A compression strategy resolver, None and GZip strategies have been implemented. Migration scripts were also updated, and a superfluous file(s) were removed.
  • Loading branch information
sfmskywalker committed Feb 3, 2024
1 parent ff9aa54 commit 0bfebcc
Show file tree
Hide file tree
Showing 14 changed files with 257 additions and 21 deletions.
7 changes: 6 additions & 1 deletion Elsa.sln
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "solution", "solution", "{7D
NuGet.Config = NuGet.Config
packages.props = packages.props
README.md = README.md
update-migrations.sh = update-migrations.sh
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{0354F050-3992-4DD4-B0EE-5FBA04AC72B6}"
Expand Down Expand Up @@ -308,6 +307,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Samples.AspNet.Heartbe
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.MassTransit.AzureServiceBus", "src\modules\Elsa.MassTransit.AzureServiceBus\Elsa.MassTransit.AzureServiceBus.csproj", "{AFEB799E-82C3-4D02-9D5C-766BB8DEF004}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "migrations", "migrations", "{C80C8231-D35C-4ACC-9ED6-9F3DB221535E}"
ProjectSection(SolutionItems) = preProject
migrations\efcore-3.1.sh = migrations\efcore-3.1.sh
migrations\efcore-3.0.sh = migrations\efcore-3.0.sh
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down
1 change: 1 addition & 0 deletions Elsa.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<s:Int64 x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/MAX_INITIALIZER_ELEMENTS_ON_LINE/@EntryValue">1</s:Int64>
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_SIMPLE_INITIALIZER_ON_SINGLE_LINE/@EntryValue">False</s:Boolean>
<s:Boolean x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_WHILE_ON_NEW_LINE/@EntryValue">True</s:Boolean>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=EF/@EntryIndexedValue">EF</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=downloadables/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=initializable/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=materializers/@EntryIndexedValue">True</s:Boolean>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env zsh

# Define the modules to update
mods=("Runtime")
mods=("Management")
# mods=("Alterations" "Runtime" "Management" "Identity" "Labels")

# Define the list of providers
Expand Down
38 changes: 38 additions & 0 deletions migrations/efcore-3.0.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env zsh

# Define the modules to update
mods=("Management")
# mods=("Alterations" "Runtime" "Management" "Identity" "Labels")

# Define the list of providers
providers=("MySql" "SqlServer" "Sqlite" "PostgreSql")
# providers=("SqlServer")

# Connection strings for each provider
typeset -A connStrings
connStrings=(
MySql "Server=localhost;Port=3306;Database=elsa;User=root;Password=password;"
SqlServer ""
Sqlite ""
PostgreSql ""
)

# Loop through each module
for module in "${mods[@]}"; do
# Loop through each provider
for provider in "${providers[@]}"; do
providerPath="../src/modules/Elsa.EntityFrameworkCore.$provider"
migrationsPath="Migrations/$module"

echo "Updating migrations for $provider..."
echo "Provider path: ${providerPath:?}/${migrationsPath}"
echo "Migrations path: $migrationsPath"
echo "Connection string: ${connStrings[$provider]}"

# 1. Delete the existing migrations folder
rm -rf "${providerPath:?}/${migrationsPath}"

# 2. Run the migrations command
dotnet ef migrations add Initial -c "$module"ElsaDbContext -p "$providerPath" -o "$migrationsPath" -- --connectionString "${connStrings[$provider]}"
done
done
35 changes: 35 additions & 0 deletions migrations/efcore-3.1.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env zsh

# Define the modules to update
mods=("Management")
# mods=("Alterations" "Runtime" "Management" "Identity" "Labels")

# Define the list of providers
providers=("MySql" "SqlServer" "Sqlite" "PostgreSql")
# providers=("SqlServer")

# Connection strings for each provider
typeset -A connStrings
connStrings=(
MySql "Server=localhost;Port=3306;Database=elsa;User=root;Password=password;"
SqlServer ""
Sqlite ""
PostgreSql ""
)

# Loop through each module
for module in "${mods[@]}"; do
# Loop through each provider
for provider in "${providers[@]}"; do
providerPath="../src/modules/Elsa.EntityFrameworkCore.$provider"
migrationsPath="Migrations/$module"

echo "Updating migrations for $provider..."
echo "Provider path: ${providerPath:?}/${migrationsPath}"
echo "Migrations path: $migrationsPath"
echo "Connection string: ${connStrings[$provider]}"

# 1. Run the migrations command
dotnet ef migrations add 3_1 -c "$module"ElsaDbContext -p "$providerPath" -o "$migrationsPath" -- --connectionString "${connStrings[$provider]}"
done
done
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public void Configure(EntityTypeBuilder<WorkflowDefinition> builder)
builder.Ignore(x => x.CustomProperties);
builder.Ignore(x => x.Options);
builder.Property<string>("Data");
builder.Property<string>("DataFormat");
builder.Property<string>("DataCompressionAlgorithm");
builder.Property<bool?>("UsableAsActivity");
builder.Property(x => x.ToolVersion).HasConversion(VersionToStringConverter, StringToVersionConverter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,50 @@
using Elsa.EntityFrameworkCore.Common;
using Elsa.Extensions;
using Elsa.Workflows.Contracts;
using Elsa.Workflows.Management.Compression;
using Elsa.Workflows.Management.Contracts;
using Elsa.Workflows.Management.Entities;
using Elsa.Workflows.Management.Filters;
using Elsa.Workflows.Management.Models;
using Elsa.Workflows.Management.Options;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Open.Linq.AsyncExtensions;

namespace Elsa.EntityFrameworkCore.Modules.Management;

/// <summary>
/// An EF Core implementation of <see cref="IWorkflowInstanceStore"/>.
/// </summary>
[UsedImplicitly]
public class EFCoreWorkflowInstanceStore : IWorkflowInstanceStore
{
private readonly EntityStore<ManagementElsaDbContext, WorkflowInstance> _store;
private readonly IWorkflowStateSerializer _workflowStateSerializer;
private readonly ICompressionStrategyResolver _compressionStrategyResolver;
private readonly IOptions<ManagementOptions> _options;

/// <summary>
/// Constructor.
/// </summary>
public EFCoreWorkflowInstanceStore(EntityStore<ManagementElsaDbContext, WorkflowInstance> store, IWorkflowStateSerializer workflowStateSerializer)
public EFCoreWorkflowInstanceStore(
EntityStore<ManagementElsaDbContext, WorkflowInstance> store,
IWorkflowStateSerializer workflowStateSerializer,
ICompressionStrategyResolver compressionStrategyResolver,
IOptions<ManagementOptions> options)
{
_store = store;
_workflowStateSerializer = workflowStateSerializer;
_compressionStrategyResolver = compressionStrategyResolver;
_options = options;
}

/// <inheritdoc />
public async ValueTask<WorkflowInstance?> FindAsync(WorkflowInstanceFilter filter, CancellationToken cancellationToken = default) =>
await _store.QueryAsync(query => Filter(query, filter), OnLoadAsync, cancellationToken).FirstOrDefault();
public async ValueTask<WorkflowInstance?> FindAsync(WorkflowInstanceFilter filter, CancellationToken cancellationToken = default)
{
return await _store.QueryAsync(query => Filter(query, filter), OnLoadAsync, cancellationToken).FirstOrDefault();
}

/// <inheritdoc />
public async ValueTask<Page<WorkflowInstance>> FindManyAsync(WorkflowInstanceFilter filter, PageArgs pageArgs, CancellationToken cancellationToken = default)
Expand All @@ -49,13 +64,17 @@ public async ValueTask<Page<WorkflowInstance>> FindManyAsync<TOrderBy>(WorkflowI
}

/// <inheritdoc />
public async ValueTask<IEnumerable<WorkflowInstance>> FindManyAsync(WorkflowInstanceFilter filter, CancellationToken cancellationToken = default) =>
await _store.QueryAsync(query => Filter(query, filter), OnLoadAsync, cancellationToken).ToList().AsEnumerable();
public async ValueTask<IEnumerable<WorkflowInstance>> FindManyAsync(WorkflowInstanceFilter filter, CancellationToken cancellationToken = default)
{
return await _store.QueryAsync(query => Filter(query, filter), OnLoadAsync, cancellationToken).ToList().AsEnumerable();
}

/// <inheritdoc />
public async ValueTask<IEnumerable<WorkflowInstance>> FindManyAsync<TOrderBy>(WorkflowInstanceFilter filter, WorkflowInstanceOrder<TOrderBy> order, CancellationToken cancellationToken = default) =>
await _store.QueryAsync(query => Filter(query, filter).OrderBy(order), OnLoadAsync, cancellationToken).ToList().AsEnumerable();

public async ValueTask<IEnumerable<WorkflowInstance>> FindManyAsync<TOrderBy>(WorkflowInstanceFilter filter, WorkflowInstanceOrder<TOrderBy> order, CancellationToken cancellationToken = default)
{
return await _store.QueryAsync(query => Filter(query, filter).OrderBy(order), OnLoadAsync, cancellationToken).ToList().AsEnumerable();
}

/// <inheritdoc />
public async ValueTask<long> CountAsync(WorkflowInstanceFilter filter, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -84,31 +103,46 @@ public async ValueTask<Page<WorkflowInstanceSummary>> SummarizeManyAsync<TOrderB
}

/// <inheritdoc />
public async ValueTask<IEnumerable<WorkflowInstanceSummary>> SummarizeManyAsync(WorkflowInstanceFilter filter, CancellationToken cancellationToken = default) =>
await _store.QueryAsync(query => Filter(query, filter), WorkflowInstanceSummary.FromInstanceExpression(), cancellationToken).ToList().AsEnumerable();
public async ValueTask<IEnumerable<WorkflowInstanceSummary>> SummarizeManyAsync(WorkflowInstanceFilter filter, CancellationToken cancellationToken = default)
{
return await _store.QueryAsync(query => Filter(query, filter), WorkflowInstanceSummary.FromInstanceExpression(), cancellationToken).ToList().AsEnumerable();
}

/// <inheritdoc />
public async ValueTask<IEnumerable<WorkflowInstanceSummary>> SummarizeManyAsync<TOrderBy>(WorkflowInstanceFilter filter, WorkflowInstanceOrder<TOrderBy> order, CancellationToken cancellationToken = default) =>
await _store.QueryAsync(query => Filter(query, filter).OrderBy(order), WorkflowInstanceSummary.FromInstanceExpression(), cancellationToken).ToList().AsEnumerable();
public async ValueTask<IEnumerable<WorkflowInstanceSummary>> SummarizeManyAsync<TOrderBy>(WorkflowInstanceFilter filter, WorkflowInstanceOrder<TOrderBy> order, CancellationToken cancellationToken = default)
{
return await _store.QueryAsync(query => Filter(query, filter).OrderBy(order), WorkflowInstanceSummary.FromInstanceExpression(), cancellationToken).ToList().AsEnumerable();
}

/// <inheritdoc />
public async ValueTask<long> DeleteAsync(WorkflowInstanceFilter filter, CancellationToken cancellationToken = default) =>
await _store.DeleteWhereAsync(query => Filter(query, filter), cancellationToken);
public async ValueTask<long> DeleteAsync(WorkflowInstanceFilter filter, CancellationToken cancellationToken = default)
{
return await _store.DeleteWhereAsync(query => Filter(query, filter), cancellationToken);
}

/// <inheritdoc />
public async ValueTask SaveAsync(WorkflowInstance instance, CancellationToken cancellationToken = default) =>
public async ValueTask SaveAsync(WorkflowInstance instance, CancellationToken cancellationToken = default)
{
await _store.SaveAsync(instance, OnSaveAsync, cancellationToken);
}

/// <inheritdoc />
public async ValueTask SaveManyAsync(IEnumerable<WorkflowInstance> instances, CancellationToken cancellationToken = default) =>
public async ValueTask SaveManyAsync(IEnumerable<WorkflowInstance> instances, CancellationToken cancellationToken = default)
{
await _store.SaveManyAsync(instances, OnSaveAsync, cancellationToken);
}

private async ValueTask OnSaveAsync(ManagementElsaDbContext managementElsaDbContext, WorkflowInstance entity, CancellationToken cancellationToken)
{
var data = entity.WorkflowState;
var json = await _workflowStateSerializer.SerializeAsync(data, cancellationToken);
var compressionAlgorithm = _options.Value.CompressionAlgorithm ?? nameof(None);
var compressionStrategy = _compressionStrategyResolver.Resolve(compressionAlgorithm);
var compressedJson = await compressionStrategy.CompressAsync(json, cancellationToken);

managementElsaDbContext.Entry(entity).Property("Data").CurrentValue = json;
managementElsaDbContext.Entry(entity).Property("Data").CurrentValue = compressedJson;
managementElsaDbContext.Entry(entity).Property("DataFormat").CurrentValue = "Json";
managementElsaDbContext.Entry(entity).Property("DataCompressionAlgorithm").CurrentValue = compressionAlgorithm;
}

private async ValueTask OnLoadAsync(ManagementElsaDbContext managementElsaDbContext, WorkflowInstance? entity, CancellationToken cancellationToken)
Expand All @@ -118,12 +152,20 @@ private async ValueTask OnLoadAsync(ManagementElsaDbContext managementElsaDbCont

var data = entity.WorkflowState;
var json = (string?)managementElsaDbContext.Entry(entity).Property("Data").CurrentValue;
var compressionAlgorithm = (string?)managementElsaDbContext.Entry(entity).Property("DataCompressionAlgorithm").CurrentValue ?? nameof(None);
var compressionStrategy = _compressionStrategyResolver.Resolve(compressionAlgorithm);

if (!string.IsNullOrWhiteSpace(json))
if (!string.IsNullOrWhiteSpace(json))
{
json = await compressionStrategy.DecompressAsync(json, cancellationToken);
data = await _workflowStateSerializer.DeserializeAsync(json, cancellationToken);
}

entity.WorkflowState = data;
}

private static IQueryable<WorkflowInstance> Filter(IQueryable<WorkflowInstance> query, WorkflowInstanceFilter filter) => filter.Apply(query);
private static IQueryable<WorkflowInstance> Filter(IQueryable<WorkflowInstance> query, WorkflowInstanceFilter filter)
{
return filter.Apply(query);
}
}
36 changes: 36 additions & 0 deletions src/modules/Elsa.Workflows.Management/Compression/GZip.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System.IO.Compression;
using System.Text;
using Elsa.Workflows.Management.Contracts;

namespace Elsa.Workflows.Management.Compression;

/// <summary>
/// Represents a GZip compression strategy.
/// </summary>
public class GZip : ICompressionStrategy
{
/// <inheritdoc />
public async ValueTask<string> CompressAsync(string input, CancellationToken cancellationToken)
{
var inputBytes = Encoding.UTF8.GetBytes(input);
using var output = new MemoryStream();
await using var compressionStream = new GZipStream(output, CompressionMode.Compress);
await compressionStream.WriteAsync(inputBytes, 0, inputBytes.Length, cancellationToken);

return Convert.ToBase64String(output.ToArray());
}

/// <inheritdoc />
public async ValueTask<string> DecompressAsync(string input, CancellationToken cancellationToken)
{
var inputBytes = Convert.FromBase64String(input);
using var inputMemoryStream = new MemoryStream(inputBytes);
await using var decompressionStream = new GZipStream(inputMemoryStream, CompressionMode.Decompress);
using var outputMemoryStream = new MemoryStream();

await decompressionStream.CopyToAsync(outputMemoryStream, cancellationToken);
var decompressedBytes = outputMemoryStream.ToArray();

return Encoding.UTF8.GetString(decompressedBytes);
}
}
21 changes: 21 additions & 0 deletions src/modules/Elsa.Workflows.Management/Compression/None.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Elsa.Workflows.Management.Contracts;

namespace Elsa.Workflows.Management.Compression;

/// <summary>
/// Represents a compression strategy that does not compress or decompress the input.
/// </summary>
public class None : ICompressionStrategy
{
/// <inheritdoc />
public ValueTask<string> CompressAsync(string input, CancellationToken cancellationToken)
{
return new ValueTask<string>(input);
}

/// <inheritdoc />
public ValueTask<string> DecompressAsync(string input, CancellationToken cancellationToken)
{
return new ValueTask<string>(input);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Elsa.Workflows.Management.Contracts;

/// <summary>
/// Represents a compression strategy.
/// </summary>
public interface ICompressionStrategy
{
/// <summary>
/// Compresses the input.
/// </summary>
ValueTask<string> CompressAsync(string input, CancellationToken cancellationToken);

/// <summary>
/// Decompresses the input.
/// </summary>
/// <param name="input"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
ValueTask<string> DecompressAsync(string input, CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Elsa.Workflows.Management.Contracts;

/// <summary>
/// Resolves a <see cref="ICompressionStrategy"/> from its name.
/// </summary>
public interface ICompressionStrategyResolver
{
/// <summary>
/// Resolves a <see cref="ICompressionStrategy"/> from its name.
/// </summary>
ICompressionStrategy Resolve(string name);
}
Loading

0 comments on commit 0bfebcc

Please sign in to comment.