From 091161dd9a21330141c31dd481073e48c20ab628 Mon Sep 17 00:00:00 2001 From: Sipke Schoorstra Date: Tue, 10 Jan 2023 20:31:55 +0100 Subject: [PATCH] Add temporary custom version of Azure Container Apps cluster provider for Proto Actor --- Elsa.sln | 7 + .../Elsa.WorkflowServer.Web.csproj | 2 + .../Elsa.WorkflowServer.Web/Program.cs | 17 +- .../ArmClientUtils.cs | 121 +++++++++++ .../AzureContainerAppsProvider.cs | 192 ++++++++++++++++++ .../ConfigUtils.cs | 54 +++++ ...otoActor.Cluster.AzureContainerApps.csproj | 20 ++ .../FodyWeavers.xml | 3 + .../ResourceTagLabels.cs | 14 ++ 9 files changed, 429 insertions(+), 1 deletion(-) create mode 100644 src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/ArmClientUtils.cs create mode 100644 src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/AzureContainerAppsProvider.cs create mode 100644 src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/ConfigUtils.cs create mode 100644 src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/Elsa.ProtoActor.Cluster.AzureContainerApps.csproj create mode 100644 src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/FodyWeavers.xml create mode 100644 src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/ResourceTagLabels.cs diff --git a/Elsa.sln b/Elsa.sln index 07c79be7e0..1f64c5fee3 100644 --- a/Elsa.sln +++ b/Elsa.sln @@ -152,6 +152,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Samples.Webhooks.Exter EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.Samples.MassTransitActivities", "src\samples\aspnet\Elsa.Samples.MassTransitActivities\Elsa.Samples.MassTransitActivities.csproj", "{A1A8AD89-C9C4-41BF-BBCB-EF0544A28BFB}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elsa.ProtoActor.Cluster.AzureContainerApps", "src\modules\Elsa.ProtoActor.Cluster.AzureContainerApps\Elsa.ProtoActor.Cluster.AzureContainerApps.csproj", "{9F79BBEC-02CE-4F76-AED4-BC191DA3054B}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -386,6 +388,10 @@ Global {A1A8AD89-C9C4-41BF-BBCB-EF0544A28BFB}.Debug|Any CPU.Build.0 = Debug|Any CPU {A1A8AD89-C9C4-41BF-BBCB-EF0544A28BFB}.Release|Any CPU.ActiveCfg = Release|Any CPU {A1A8AD89-C9C4-41BF-BBCB-EF0544A28BFB}.Release|Any CPU.Build.0 = Release|Any CPU + {9F79BBEC-02CE-4F76-AED4-BC191DA3054B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9F79BBEC-02CE-4F76-AED4-BC191DA3054B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9F79BBEC-02CE-4F76-AED4-BC191DA3054B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9F79BBEC-02CE-4F76-AED4-BC191DA3054B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {155227F0-A33B-40AA-A4B4-06F813EB921B} = {61017E64-6D00-49CB-9E81-5002DC8F7D5F} @@ -452,5 +458,6 @@ Global {130A7A00-A9AF-4EA8-8107-BBEA07F166DF} = {56C2FFB8-EA54-45B5-A095-4A78142EB4B5} {036D287A-33E1-4B28-BE13-14AEA16BC91F} = {56C2FFB8-EA54-45B5-A095-4A78142EB4B5} {A1A8AD89-C9C4-41BF-BBCB-EF0544A28BFB} = {56C2FFB8-EA54-45B5-A095-4A78142EB4B5} + {9F79BBEC-02CE-4F76-AED4-BC191DA3054B} = {5BA4A8FA-F7F4-45B3-AEC8-8886D35AAC79} EndGlobalSection EndGlobal diff --git a/src/bundles/Elsa.WorkflowServer.Web/Elsa.WorkflowServer.Web.csproj b/src/bundles/Elsa.WorkflowServer.Web/Elsa.WorkflowServer.Web.csproj index 154154a0e6..e18fdcd4cb 100644 --- a/src/bundles/Elsa.WorkflowServer.Web/Elsa.WorkflowServer.Web.csproj +++ b/src/bundles/Elsa.WorkflowServer.Web/Elsa.WorkflowServer.Web.csproj @@ -16,6 +16,7 @@ + @@ -27,6 +28,7 @@ + diff --git a/src/bundles/Elsa.WorkflowServer.Web/Program.cs b/src/bundles/Elsa.WorkflowServer.Web/Program.cs index 4a9c145bd2..2d23d868a2 100644 --- a/src/bundles/Elsa.WorkflowServer.Web/Program.cs +++ b/src/bundles/Elsa.WorkflowServer.Web/Program.cs @@ -1,3 +1,5 @@ +using Azure.Identity; +using Azure.ResourceManager; using Elsa.EntityFrameworkCore.Extensions; using Elsa.Extensions; using Elsa.Identity.Options; @@ -6,7 +8,9 @@ using Elsa.EntityFrameworkCore.Modules.Management; using Elsa.EntityFrameworkCore.Modules.Runtime; using Microsoft.Data.Sqlite; +using Elsa.ProtoActor.Cluster.AzureContainerApps; using Proto.Persistence.Sqlite; +using Proto.Remote.GrpcNet; var builder = WebApplication.CreateBuilder(args); var services = builder.Services; @@ -32,7 +36,12 @@ .UseWorkflowManagement(management => management.UseEntityFrameworkCore(ef => ef.UseSqlite(sqliteConnectionString))) .UseWorkflowRuntime(runtime => { - runtime.UseProtoActor(proto => proto.PersistenceProvider = _ => new SqliteProvider(new SqliteConnectionStringBuilder(sqliteConnectionString))); + runtime.UseProtoActor(proto => + { + proto.PersistenceProvider = _ => new SqliteProvider(new SqliteConnectionStringBuilder(sqliteConnectionString)); + proto.ClusterProvider = sp => sp.GetRequiredService(); + proto.RemoteConfig = sp => GrpcNetRemoteConfig.BindToAllInterfaces(); + }); runtime.UseEntityFrameworkCore(ef => ef.UseSqlite(sqliteConnectionString)); runtime.UseAsyncWorkflowStateExporter(); runtime.UseMassTransitDispatcher(); @@ -48,6 +57,12 @@ .UseHttp() ); +services.AddSingleton(sp => +{ + var armClient = new ArmClient(new DefaultAzureCredential()); + return new AzureContainerAppsProvider(armClient, "CyberdyneMortgage-dev", "skynet-workflow-server-v3-dev", "skynet-workflow-server-v3-dev--vk8lzb5", "skynet-workflow-server-v3-dev--vk8lzb5-685f9f56f6-m9fb5"); +}); + services.AddHandlersFrom(); services.AddHealthChecks(); services.AddCors(cors => cors.AddDefaultPolicy(policy => policy.AllowAnyHeader().AllowAnyMethod().AllowAnyOrigin())); diff --git a/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/ArmClientUtils.cs b/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/ArmClientUtils.cs new file mode 100644 index 0000000000..9466485a63 --- /dev/null +++ b/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/ArmClientUtils.cs @@ -0,0 +1,121 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Azure; +using Azure.ResourceManager; +using Azure.ResourceManager.AppContainers; +using Azure.ResourceManager.Resources; +using Azure.ResourceManager.Resources.Models; +using Microsoft.Extensions.Logging; +using Proto; +using Proto.Cluster; + +namespace Elsa.ProtoActor.Cluster.AzureContainerApps; + +public static class ArmClientUtils +{ + private static readonly ILogger Logger = Log.CreateLogger(nameof(ArmClientUtils)); + + public static async Task GetClusterMembers(this ArmClient client, string resourceGroupName, string containerAppName) + { + var members = new List(); + + var containerApp = await (await client.GetResourceGroupByName(resourceGroupName)).Value.GetContainerAppAsync(containerAppName); + + if (containerApp is null || !containerApp.HasValue) + { + Logger.LogError("Container App: {ContainerApp} in resource group: {ResourceGroup} is not found", containerApp, resourceGroupName); + return members.ToArray(); + } + + var containerAppRevisions = GetActiveRevisionsWithTraffic(containerApp).ToList(); + if (!containerAppRevisions.Any()) + { + Logger.LogError("Container App: {ContainerApp} in resource group: {ResourceGroup} does not contain any active revisions with traffic", containerAppName, resourceGroupName); + return members.ToArray(); + } + + var replicasWithTraffic = containerAppRevisions.SelectMany(r => r.GetContainerAppReplicas()); + + var allTags = (await containerApp.Value.GetTagResource().GetAsync()).Value.Data.TagValues; + + foreach (var replica in replicasWithTraffic) + { + var replicaNameTag = allTags.FirstOrDefault(kvp => kvp.Value == replica.Data.Name); + if (replicaNameTag.Key == null) + { + Logger.LogWarning("Skipping Replica with name: {Name}, no Proto Tags found", replica.Data.Name); + continue; + } + + var replicaNameTagPrefix = replicaNameTag.Key.Replace(ResourceTagLabels.LabelReplicaNameWithoutPrefix, string.Empty); + var currentReplicaTags = allTags.Where(kvp => kvp.Key.StartsWith(replicaNameTagPrefix)).ToDictionary(x => x.Key, x => x.Value); + + var memberId = currentReplicaTags.FirstOrDefault(kvp => kvp.Key.ToString().Contains(ResourceTagLabels.LabelMemberIdWithoutPrefix)).Value; + + var kinds = currentReplicaTags + .Where(kvp => kvp.Key.StartsWith(ResourceTagLabels.LabelKind(memberId))) + .Select(kvp => kvp.Key[(ResourceTagLabels.LabelKind(memberId).Length + 1)..]) + .ToArray(); + + var member = new Member + { + Id = currentReplicaTags[ResourceTagLabels.LabelMemberId(memberId)], + Port = int.Parse(currentReplicaTags[ResourceTagLabels.LabelPort(memberId)]), + Host = currentReplicaTags[ResourceTagLabels.LabelHost(memberId)], + Kinds = { kinds } + }; + + members.Add(member); + } + + return members.ToArray(); + } + + public static async Task AddMemberTags(this ArmClient client, string resourceGroupName, string containerAppName, Dictionary newTags) + { + var resourceTag = new Tag(); + foreach (var tag in newTags) + { + resourceTag.TagValues.Add(tag); + } + + var resourceGroup = await client.GetResourceGroupByName(resourceGroupName); + var containerApp = await resourceGroup.Value.GetContainerAppAsync(containerAppName); + var tagResource = containerApp.Value.GetTagResource(); + + var existingTags = (await tagResource.GetAsync()).Value.Data.TagValues; + foreach (var tag in existingTags) + { + resourceTag.TagValues.Add(tag); + } + + await tagResource.CreateOrUpdateAsync(WaitUntil.Completed, new TagResourceData(resourceTag)); + } + + public static async Task ClearMemberTags(this ArmClient client, string resourceGroupName, string containerAppName, string memberId) + { + var resourceGroup = await client.GetResourceGroupByName(resourceGroupName); + var containerApp = await resourceGroup.Value.GetContainerAppAsync(containerAppName); + var tagResource = containerApp.Value.GetTagResource(); + + var resourceTag = new Tag(); + var existingTags = (await tagResource.GetAsync()).Value.Data.TagValues; + + foreach (var tag in existingTags) + { + if (!tag.Key.StartsWith(ResourceTagLabels.LabelPrefix(memberId))) + { + resourceTag.TagValues.Add(tag); + } + } + + await tagResource.CreateOrUpdateAsync(WaitUntil.Completed, new TagResourceData(resourceTag)); + } + + public static async Task> GetResourceGroupByName(this ArmClient client, string resourceGroupName) => + await (await client.GetDefaultSubscriptionAsync()).GetResourceGroups().GetAsync(resourceGroupName); + + private static IEnumerable GetActiveRevisionsWithTraffic(ContainerAppResource containerApp) => + containerApp.GetContainerAppRevisions().Where(r => r.HasData && (r.Data.IsActive ?? false) && r.Data.TrafficWeight > 0); +} \ No newline at end of file diff --git a/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/AzureContainerAppsProvider.cs b/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/AzureContainerAppsProvider.cs new file mode 100644 index 0000000000..d66326c7b2 --- /dev/null +++ b/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/AzureContainerAppsProvider.cs @@ -0,0 +1,192 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Azure.ResourceManager; +using Azure.ResourceManager.AppContainers; +using JetBrains.Annotations; +using Microsoft.Extensions.Logging; +using Proto; +using Proto.Cluster; +using Proto.Utils; + +namespace Elsa.ProtoActor.Cluster.AzureContainerApps; + +[PublicAPI] +public class AzureContainerAppsProvider : IClusterProvider +{ + private readonly ArmClient _client; + private readonly string _resourceGroup; + private readonly string _containerAppName; + private readonly string _revisionName; + private readonly string _replicaName; + private readonly string _advertisedHost; + + private string _memberId = null!; + private string _address = null!; + private global::Proto.Cluster.Cluster _cluster = null!; + private string _clusterName = null!; + private string[] _kinds = null!; + private int _port; + + private static readonly ILogger Logger = Log.CreateLogger(); + private static readonly TimeSpan PollIntervalInSeconds = TimeSpan.FromSeconds(5); + + /// + /// Use this constructor to create a new instance. + /// + /// An existing instance that you need to bring yourself. + /// The resource group name containing your Azure Container App. + /// The name of the container app. If not specified, the CONTAINER_APP_NAME environment variable is used. + /// The revision of the container app. If not specified, the CONTAINER_APP_REVISION environment variable is used. + /// The replica name of the container app. If not specified, the HOSTNAME environment variable is used. + /// The host or IP address of the container app. If not specified, will take the smallest local IP address (e.g. 127.0.0.1). + public AzureContainerAppsProvider( + ArmClient client, + string resourceGroup, + [CanBeNull] string containerAppName = default, + [CanBeNull] string revision = default, + [CanBeNull] string replicaName = default, + [CanBeNull] string advertisedHost = default) + { + _client = client; + _resourceGroup = resourceGroup; + _containerAppName = containerAppName ?? Environment.GetEnvironmentVariable("CONTAINER_APP_NAME"); + _revisionName = revision ?? Environment.GetEnvironmentVariable("CONTAINER_APP_REVISION"); + _replicaName = replicaName ?? Environment.GetEnvironmentVariable("HOSTNAME"); + _advertisedHost = advertisedHost; + + if (string.IsNullOrEmpty(_advertisedHost)) + _advertisedHost = ConfigUtils.FindIpAddress().ToString(); + } + + public async Task StartMemberAsync(global::Proto.Cluster.Cluster cluster) + { + var clusterName = cluster.Config.ClusterName; + var (host, port) = cluster.System.GetAddress(); + var kinds = cluster.GetClusterKinds(); + _cluster = cluster; + _clusterName = clusterName; + _memberId = cluster.System.Id; + _port = port; + _kinds = kinds; + _address = $"{host}:{port}"; + + await RegisterMemberAsync(); + StartClusterMonitor(); + } + + public Task StartClientAsync(global::Proto.Cluster.Cluster cluster) + { + var clusterName = cluster.Config.ClusterName; + var (_, port) = cluster.System.GetAddress(); + _cluster = cluster; + _clusterName = clusterName; + _memberId = cluster.System.Id; + _port = port; + _kinds = Array.Empty(); + + StartClusterMonitor(); + return Task.CompletedTask; + } + + public async Task ShutdownAsync(bool graceful) => await DeregisterMemberAsync(); + + private async Task RegisterMemberAsync() + { + await Retry.Try(RegisterMemberInner, retryCount: Retry.Forever, onError: OnError, onFailed: OnFailed); + + static void OnError(int attempt, Exception exception) => Logger.LogWarning(exception, "Failed to register service"); + static void OnFailed(Exception exception) => Logger.LogError(exception, "Failed to register service"); + } + + private async Task RegisterMemberInner() + { + var resourceGroup = await _client.GetResourceGroupByName(_resourceGroup); + var containerApp = await resourceGroup.Value.GetContainerAppAsync(_containerAppName); + var revision = await containerApp.Value.GetContainerAppRevisionAsync(_revisionName); + + if (revision.Value.Data.TrafficWeight.GetValueOrDefault(0) == 0) + return; + + Logger.LogInformation( + "[Cluster][AzureContainerAppsProvider] Registering service {ReplicaName} on {IpAddress}", + _replicaName, + _address); + + var tags = new Dictionary + { + [ResourceTagLabels.LabelCluster(_memberId)] = _clusterName, + [ResourceTagLabels.LabelHost(_memberId)] = _advertisedHost, + [ResourceTagLabels.LabelPort(_memberId)] = _port.ToString(), + [ResourceTagLabels.LabelMemberId(_memberId)] = _memberId, + [ResourceTagLabels.LabelReplicaName(_memberId)] = _replicaName + }; + + foreach (var kind in _kinds) + { + var labelKey = $"{ResourceTagLabels.LabelKind(_memberId)}-{kind}"; + tags.TryAdd(labelKey, "true"); + } + + try + { + await _client.AddMemberTags(_resourceGroup, _containerAppName, tags); + } + catch (Exception x) + { + Logger.LogError(x, "Failed to update metadata"); + } + } + + private void StartClusterMonitor() => + _ = SafeTask.Run(async () => + { + while (!_cluster.System.Shutdown.IsCancellationRequested) + { + Logger.LogInformation("Calling ACS API"); + + try + { + var members = await _client.GetClusterMembers(_resourceGroup, _containerAppName); + + if (members.Any()) + { + Logger.LogInformation("Got members {Members}", members.Length); + _cluster.MemberList.UpdateClusterTopology(members); + } + else + { + Logger.LogWarning("Failed to get members from Azure Container Apps"); + } + } + catch (Exception x) + { + Logger.LogError(x, "Failed to get members from Azure Container Apps"); + } + + await Task.Delay(PollIntervalInSeconds); + } + } + ); + + private async Task DeregisterMemberAsync() + { + await Retry.Try(DeregisterMemberInner, onError: OnError, onFailed: OnFailed); + + static void OnError(int attempt, Exception exception) => + Logger.LogWarning(exception, "Failed to deregister service"); + + static void OnFailed(Exception exception) => Logger.LogError(exception, "Failed to deregister service"); + } + + private async Task DeregisterMemberInner() + { + Logger.LogInformation( + "[Cluster][AzureContainerAppsProvider] Unregistering member {ReplicaName} on {IpAddress}", + _replicaName, + _address); + + await _client.ClearMemberTags(_resourceGroup, _containerAppName, _memberId); + } +} \ No newline at end of file diff --git a/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/ConfigUtils.cs b/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/ConfigUtils.cs new file mode 100644 index 0000000000..19ef925f38 --- /dev/null +++ b/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/ConfigUtils.cs @@ -0,0 +1,54 @@ +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.NetworkInformation; +using System.Net.Sockets; + +namespace Elsa.ProtoActor.Cluster.AzureContainerApps; + +public static class ConfigUtils +{ + internal static IPAddress FindIpAddress(AddressFamily family = AddressFamily.InterNetwork) + { + var addressCandidates = NetworkInterface.GetAllNetworkInterfaces() + .Where(nif => nif.OperationalStatus == OperationalStatus.Up) + .SelectMany(nif => nif.GetIPProperties().UnicastAddresses.Select(a => a.Address)) + .Where(addr => addr.AddressFamily == family && !IPAddress.IsLoopback(addr)) + .ToList(); + + return PickSmallestIpAddress(addressCandidates); + } + + private static IPAddress PickSmallestIpAddress(IEnumerable candidates) + { + IPAddress result = null!; + foreach (var addr in candidates) + { + if (CompareIpAddresses(addr, result)) + result = addr; + } + + return result; + + static bool CompareIpAddresses(IPAddress lhs, IPAddress rhs) + { + if (rhs == null) + return true; + + var lbytes = lhs.GetAddressBytes(); + var rbytes = rhs.GetAddressBytes(); + + if (lbytes.Length != rbytes.Length) return lbytes.Length < rbytes.Length; + + for (var i = 0; i < lbytes.Length; i++) + { + if (lbytes[i] != rbytes[i]) + { + return lbytes[i] < rbytes[i]; + } + } + + return false; + } + } +} \ No newline at end of file diff --git a/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/Elsa.ProtoActor.Cluster.AzureContainerApps.csproj b/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/Elsa.ProtoActor.Cluster.AzureContainerApps.csproj new file mode 100644 index 0000000000..ef4b5de96b --- /dev/null +++ b/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/Elsa.ProtoActor.Cluster.AzureContainerApps.csproj @@ -0,0 +1,20 @@ + + + + + + + 10 + netcoreapp3.1;net6.0;net7.0 + + This is a temporary stand-in until Proto.Actor provides an updated version of Proto.Cluster.AzureContainerApps. + + elsa module runtime protoactor cluster azure container apps + + + + + + + + diff --git a/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/FodyWeavers.xml b/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/FodyWeavers.xml new file mode 100644 index 0000000000..00e1d9a1c1 --- /dev/null +++ b/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/FodyWeavers.xml @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/ResourceTagLabels.cs b/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/ResourceTagLabels.cs new file mode 100644 index 0000000000..151e2d3dc3 --- /dev/null +++ b/src/modules/Elsa.ProtoActor.Cluster.AzureContainerApps/ResourceTagLabels.cs @@ -0,0 +1,14 @@ +namespace Elsa.ProtoActor.Cluster.AzureContainerApps; + +public static class ResourceTagLabels +{ + public static string LabelPrefix(string memberId) => $"proto.cluster-{memberId}|"; + public static string LabelHost(string memberId) => LabelPrefix(memberId) + "host"; + public static string LabelPort(string memberId) => LabelPrefix(memberId) + "port"; + public static string LabelKind(string memberId) => LabelPrefix(memberId) + "kind"; + public static string LabelCluster(string memberId) => LabelPrefix(memberId) + "cluster"; + public static string LabelMemberId(string memberId) => LabelPrefix(memberId) + LabelMemberIdWithoutPrefix; + public const string LabelMemberIdWithoutPrefix = "memberId"; + public static string LabelReplicaName(string memberId) => LabelPrefix(memberId) + LabelReplicaNameWithoutPrefix; + public const string LabelReplicaNameWithoutPrefix = "replicaName"; +} \ No newline at end of file