Skip to content

Commit

Permalink
Add temporary custom version of Azure Container Apps cluster provider…
Browse files Browse the repository at this point in the history
… for Proto Actor
  • Loading branch information
sfmskywalker committed Jan 10, 2023
1 parent 92c6809 commit 091161d
Show file tree
Hide file tree
Showing 9 changed files with 429 additions and 1 deletion.
7 changes: 7 additions & 0 deletions Elsa.sln
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Samples.Webhooks.Exter
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Samples.MassTransitActivities", "src\samples\aspnet\Elsa.Samples.MassTransitActivities\Elsa.Samples.MassTransitActivities.csproj", "{A1A8AD89-C9C4-41BF-BBCB-EF0544A28BFB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.ProtoActor.Cluster.AzureContainerApps", "src\modules\Elsa.ProtoActor.Cluster.AzureContainerApps\Elsa.ProtoActor.Cluster.AzureContainerApps.csproj", "{9F79BBEC-02CE-4F76-AED4-BC191DA3054B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -386,6 +388,10 @@ Global
{A1A8AD89-C9C4-41BF-BBCB-EF0544A28BFB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A1A8AD89-C9C4-41BF-BBCB-EF0544A28BFB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A1A8AD89-C9C4-41BF-BBCB-EF0544A28BFB}.Release|Any CPU.Build.0 = Release|Any CPU
{9F79BBEC-02CE-4F76-AED4-BC191DA3054B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9F79BBEC-02CE-4F76-AED4-BC191DA3054B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9F79BBEC-02CE-4F76-AED4-BC191DA3054B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9F79BBEC-02CE-4F76-AED4-BC191DA3054B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{155227F0-A33B-40AA-A4B4-06F813EB921B} = {61017E64-6D00-49CB-9E81-5002DC8F7D5F}
Expand Down Expand Up @@ -452,5 +458,6 @@ Global
{130A7A00-A9AF-4EA8-8107-BBEA07F166DF} = {56C2FFB8-EA54-45B5-A095-4A78142EB4B5}
{036D287A-33E1-4B28-BE13-14AEA16BC91F} = {56C2FFB8-EA54-45B5-A095-4A78142EB4B5}
{A1A8AD89-C9C4-41BF-BBCB-EF0544A28BFB} = {56C2FFB8-EA54-45B5-A095-4A78142EB4B5}
{9F79BBEC-02CE-4F76-AED4-BC191DA3054B} = {5BA4A8FA-F7F4-45B3-AEC8-8886D35AAC79}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<ProjectReference Include="..\..\modules\Elsa.MassTransit\Elsa.MassTransit.csproj" />
<ProjectReference Include="..\..\modules\Elsa.ProtoActor\Elsa.ProtoActor.csproj" />
<ProjectReference Include="..\..\modules\Elsa.Identity\Elsa.Identity.csproj" />
<ProjectReference Include="..\..\modules\Elsa.ProtoActor.Cluster.AzureContainerApps\Elsa.ProtoActor.Cluster.AzureContainerApps.csproj" />
<ProjectReference Include="..\Elsa\Elsa.csproj" />
<ProjectReference Include="..\..\modules\Elsa.Hangfire\Elsa.Hangfire.csproj" />
<ProjectReference Include="..\..\modules\Elsa.Http\Elsa.Http.csproj" />
Expand All @@ -27,6 +28,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.6.0" />
<PackageReference Include="Proto.Persistence.Sqlite" Version="0.33.0" />
</ItemGroup>

Expand Down
17 changes: 16 additions & 1 deletion src/bundles/Elsa.WorkflowServer.Web/Program.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using Azure.Identity;
using Azure.ResourceManager;
using Elsa.EntityFrameworkCore.Extensions;
using Elsa.Extensions;
using Elsa.Identity.Options;
Expand All @@ -6,7 +8,9 @@
using Elsa.EntityFrameworkCore.Modules.Management;
using Elsa.EntityFrameworkCore.Modules.Runtime;
using Microsoft.Data.Sqlite;
using Elsa.ProtoActor.Cluster.AzureContainerApps;
using Proto.Persistence.Sqlite;
using Proto.Remote.GrpcNet;

var builder = WebApplication.CreateBuilder(args);
var services = builder.Services;
Expand All @@ -32,7 +36,12 @@
.UseWorkflowManagement(management => management.UseEntityFrameworkCore(ef => ef.UseSqlite(sqliteConnectionString)))
.UseWorkflowRuntime(runtime =>
{
runtime.UseProtoActor(proto => proto.PersistenceProvider = _ => new SqliteProvider(new SqliteConnectionStringBuilder(sqliteConnectionString)));
runtime.UseProtoActor(proto =>
{
proto.PersistenceProvider = _ => new SqliteProvider(new SqliteConnectionStringBuilder(sqliteConnectionString));
proto.ClusterProvider = sp => sp.GetRequiredService<AzureContainerAppsProvider>();
proto.RemoteConfig = sp => GrpcNetRemoteConfig.BindToAllInterfaces();
});
runtime.UseEntityFrameworkCore(ef => ef.UseSqlite(sqliteConnectionString));
runtime.UseAsyncWorkflowStateExporter();
runtime.UseMassTransitDispatcher();
Expand All @@ -48,6 +57,12 @@
.UseHttp()
);

services.AddSingleton(sp =>
{
var armClient = new ArmClient(new DefaultAzureCredential());
return new AzureContainerAppsProvider(armClient, "CyberdyneMortgage-dev", "skynet-workflow-server-v3-dev", "skynet-workflow-server-v3-dev--vk8lzb5", "skynet-workflow-server-v3-dev--vk8lzb5-685f9f56f6-m9fb5");
});

services.AddHandlersFrom<Program>();
services.AddHealthChecks();
services.AddCors(cors => cors.AddDefaultPolicy(policy => policy.AllowAnyHeader().AllowAnyMethod().AllowAnyOrigin()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure;
using Azure.ResourceManager;
using Azure.ResourceManager.AppContainers;
using Azure.ResourceManager.Resources;
using Azure.ResourceManager.Resources.Models;
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Cluster;

namespace Elsa.ProtoActor.Cluster.AzureContainerApps;

public static class ArmClientUtils
{
private static readonly ILogger Logger = Log.CreateLogger(nameof(ArmClientUtils));

public static async Task<Member[]> GetClusterMembers(this ArmClient client, string resourceGroupName, string containerAppName)
{
var members = new List<Member>();

var containerApp = await (await client.GetResourceGroupByName(resourceGroupName)).Value.GetContainerAppAsync(containerAppName);

if (containerApp is null || !containerApp.HasValue)
{
Logger.LogError("Container App: {ContainerApp} in resource group: {ResourceGroup} is not found", containerApp, resourceGroupName);
return members.ToArray();
}

var containerAppRevisions = GetActiveRevisionsWithTraffic(containerApp).ToList();
if (!containerAppRevisions.Any())
{
Logger.LogError("Container App: {ContainerApp} in resource group: {ResourceGroup} does not contain any active revisions with traffic", containerAppName, resourceGroupName);
return members.ToArray();
}

var replicasWithTraffic = containerAppRevisions.SelectMany(r => r.GetContainerAppReplicas());

var allTags = (await containerApp.Value.GetTagResource().GetAsync()).Value.Data.TagValues;

foreach (var replica in replicasWithTraffic)
{
var replicaNameTag = allTags.FirstOrDefault(kvp => kvp.Value == replica.Data.Name);
if (replicaNameTag.Key == null)
{
Logger.LogWarning("Skipping Replica with name: {Name}, no Proto Tags found", replica.Data.Name);
continue;
}

var replicaNameTagPrefix = replicaNameTag.Key.Replace(ResourceTagLabels.LabelReplicaNameWithoutPrefix, string.Empty);
var currentReplicaTags = allTags.Where(kvp => kvp.Key.StartsWith(replicaNameTagPrefix)).ToDictionary(x => x.Key, x => x.Value);

var memberId = currentReplicaTags.FirstOrDefault(kvp => kvp.Key.ToString().Contains(ResourceTagLabels.LabelMemberIdWithoutPrefix)).Value;

var kinds = currentReplicaTags
.Where(kvp => kvp.Key.StartsWith(ResourceTagLabels.LabelKind(memberId)))
.Select(kvp => kvp.Key[(ResourceTagLabels.LabelKind(memberId).Length + 1)..])
.ToArray();

var member = new Member
{
Id = currentReplicaTags[ResourceTagLabels.LabelMemberId(memberId)],
Port = int.Parse(currentReplicaTags[ResourceTagLabels.LabelPort(memberId)]),
Host = currentReplicaTags[ResourceTagLabels.LabelHost(memberId)],
Kinds = { kinds }
};

members.Add(member);
}

return members.ToArray();
}

public static async Task AddMemberTags(this ArmClient client, string resourceGroupName, string containerAppName, Dictionary<string, string> newTags)
{
var resourceTag = new Tag();
foreach (var tag in newTags)
{
resourceTag.TagValues.Add(tag);
}

var resourceGroup = await client.GetResourceGroupByName(resourceGroupName);
var containerApp = await resourceGroup.Value.GetContainerAppAsync(containerAppName);
var tagResource = containerApp.Value.GetTagResource();

var existingTags = (await tagResource.GetAsync()).Value.Data.TagValues;
foreach (var tag in existingTags)
{
resourceTag.TagValues.Add(tag);
}

await tagResource.CreateOrUpdateAsync(WaitUntil.Completed, new TagResourceData(resourceTag));
}

public static async Task ClearMemberTags(this ArmClient client, string resourceGroupName, string containerAppName, string memberId)
{
var resourceGroup = await client.GetResourceGroupByName(resourceGroupName);
var containerApp = await resourceGroup.Value.GetContainerAppAsync(containerAppName);
var tagResource = containerApp.Value.GetTagResource();

var resourceTag = new Tag();
var existingTags = (await tagResource.GetAsync()).Value.Data.TagValues;

foreach (var tag in existingTags)
{
if (!tag.Key.StartsWith(ResourceTagLabels.LabelPrefix(memberId)))
{
resourceTag.TagValues.Add(tag);
}
}

await tagResource.CreateOrUpdateAsync(WaitUntil.Completed, new TagResourceData(resourceTag));
}

public static async Task<Response<ResourceGroupResource>> GetResourceGroupByName(this ArmClient client, string resourceGroupName) =>
await (await client.GetDefaultSubscriptionAsync()).GetResourceGroups().GetAsync(resourceGroupName);

private static IEnumerable<ContainerAppRevisionResource> GetActiveRevisionsWithTraffic(ContainerAppResource containerApp) =>
containerApp.GetContainerAppRevisions().Where(r => r.HasData && (r.Data.IsActive ?? false) && r.Data.TrafficWeight > 0);
}
Loading

0 comments on commit 091161d

Please sign in to comment.