[toc]
项目开发常用脚手架
- 分布式锁
- 基于Redis
- 分布式缓存
- 基于Redis
- 分布式Id
- 基于Snowfake
- 分布式追踪 Opentracing
- 基于Jaeger
- 消息总线
- 消息队列
- 基于Rabbitmq
- 基于Kafka
- 消息可靠性保证
- 基于MySql
- 基于SqlServer
- 消息队列
- 健康检查
- Mongodb 健康检查
- MySql 健康检查
- SqlServer 健康检查
- Redis 健康检查
- Rabbitmq 健康检查
- Kafka 健康检查
- 负载均衡
- 随机负载均衡
- 轮训负载均衡
- 配置中心
- 基于Apollo配置中心
- 基于Nacos配置中心
- 服务注册
- 基于Consul服务注册和发现
- 基于Nacos服务注册和发现
- 服务调用
- 基于HTTP弹性客户端(支持:服务发现、负载均衡、超时、重试、熔断)
- 基于HTTP非弹性客户端(支持:服务发现、负载均衡)
- Canal 数据集成
- 输出到控制台
- 输出到Rabbitmq(待实现)
- 输出到Kafka(待实现)
步骤1:安装Nuget包
Install-Package Hummingbird.Extensions.DistributedLock -Version 1.15.0
步骤2:配置连接信息
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddHummingbird(hb =>
{
hb.AddDistributedLock(option =>
{
option.WithDb(0);
option.WithKeyPrefix("");
option.WithPassword("123456");
option.WithServerList("127.0.0.1:6379");
option.WithSsl(false);
});
});
}
}
步骤3:测试分布式锁
using Microsoft.AspNetCore.Mvc;
using System;
using System.Threading.Tasks;
[Route("api/[controller]")]
public class DistributedLockController : Controller
{ private readonly IDistributedLock distributedLock;
public DistributedLockController(IDistributedLock distributedLock)
{
this.distributedLock = distributedLock;
}
[HttpGet]
[Route("Test")]
public async Task<string> Test()
{
var lockName = "name";
var lockToken = Guid.NewGuid().ToString("N");
try
{
if (distributedLock.Enter(lockName, lockToken, TimeSpan.FromSeconds(30)))
{
// do something
return "ok";
}
else
{
return "error";
}
}
finally
{
distributedLock.Exit(lockName, lockToken);
}
}
}
步骤1:安装Nuget包
Install-Package Hummingbird.Extensions.Cacheing -Version 1.15.0
步骤2:设置缓存连接信息
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddHummingbird(hb =>
{
hb.AddCacheing(option =>
{
option.WithDb(0);
option.WithKeyPrefix("");
option.WithPassword("123456");
option.WithReadServerList("192.168.109.44:6379");
option.WithWriteServerList("192.168.109.44:6379");
option.WithSsl(false);
})
});
}
}
步骤3:测试Redis缓存
using Hummingbird.Extensions.Cacheing;
using Microsoft.AspNetCore.Mvc;
using System.Threading.Tasks;
[Route("api/[controller]")]
public class CacheingController : Controller
{
private readonly ICacheManager cacheManager;
public CacheingController(ICacheManager cacheManager)
{
this.cacheManager = cacheManager;
}
[HttpGet]
[Route("Test")]
public async Task<string> Test()
{
var cacheKey = "cacheKey";
var cacheValue = cacheManager.StringGet<string>(cacheKey);
if(cacheValue == null)
{
cacheValue = "value";
cacheManager.StringSet(cacheKey, cacheValue);
}
return cacheValue;
}
步骤1: 安装Nuget包
Install-Package Hummingbird.Extensions.UidGenerator -Version 1.15.5
Install-Package Hummingbird.Extensions.UidGenerator.ConsulWorkIdStrategy -Version 1.15.7
步骤2:配置使用Snowfake算法生产唯一Id
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddHummingbird(hb =>
{
hb.AddSnowflakeUniqueIdGenerator(workIdBuilder =>
{
workIdBuilder.CenterId = 0; // 设置CenterId
workIdBuilder.AddConsulWorkIdCreateStrategy("Example"); //设置使用Consul创建WorkId
})
});
}
}
步骤3:测试Id生成
using Hummingbird.Extensions.UidGenerator;
using Microsoft.AspNetCore.Mvc;
using System.Threading.Tasks;
[Route("api/[controller]")]
public class UniqueIdController : Controller
{
private readonly IUniqueIdGenerator uniqueIdGenerator;
public UniqueIdController(IUniqueIdGenerator uniqueIdGenerator)
{
this.uniqueIdGenerator = uniqueIdGenerator;
}
[HttpGet]
[Route("Test")]
public async Task<long> Test()
{
return uniqueIdGenerator.NewId();
}
}
步骤1:安装Nuget包
Install-Package Hummingbird.Extensions.OpenTracing -Version 1.15.0
Install-Package Hummingbird.Extensions.OpenTracking.Jaeger -Version 1.15.0
步骤2: 创建tracing.json 配置
{
"Tracing": {
"Open": false,
"SerivceName": "SERVICE_EXAMPLE",
"FlushIntervalSeconds": 15,
"SamplerType": "const",
"LogSpans": true,
"AgentPort": "5775", //代理端口
"AgentHost": "dev.jaeger-agent.service.consul", //代理地址
"EndPoint": "https://dev.jaeger-collector.service.consul:14268/api/traces"
}
}
步骤3:添加tracing.json 配置依赖
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.ConfigureAppConfiguration((builderContext, config) =>
{
config.SetBasePath(Directory.GetCurrentDirectory());
config.AddJsonFile("tracing.json");
config.AddEnvironmentVariables();
})
.ConfigureLogging((hostingContext, logging) =>
{
logging.ClearProviders();
})
.Build();
}
步骤3: 配置OpenTracing基于Jaeger实现
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddHummingbird(hb =>
{
hb.AddOpenTracing(builder => {
builder.AddJaeger(Configuration.GetSection("Tracing"));
})
});
}
}
步骤4:测试手动埋点日志
using Microsoft.AspNetCore.Mvc;
using System.Threading.Tasks;
[Route("api/[controller]")]
public class OpenTracingController : Controller
{
[HttpGet]
[Route("Test")]
public async Task Test()
{
using (Hummingbird.Extensions.Tracing.Tracer tracer = new Hummingbird.Extensions.Tracing.Tracer("Test"))
{
tracer.SetTag("tag1", "value1");
tracer.SetError();
tracer.Log("key1", "value1");
}
}
}
步骤1:安装Nuget包
Install-Package Hummingbird.Extensions.EventBus -Version 1.15.0
Install-Package Hummingbird.Extensions.EventBus.RabbitMQ -Version 1.15.3
Install-Package Hummingbird.Extensions.EventBus.MySqlLogging -Version 1.15.3
步骤2:创建消息消费端,消息处理程序
using Hummingbird.Extensions.EventBus.Abstractions;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
public class TestEvent
{
public string EventType { get; set; }
}
public class TestEventHandler1 : IEventHandler<TestEvent>
{
public Task<bool> Handle(TestEvent @event, Dictionary<string, object> headers, CancellationToken cancellationToken)
{
//执行业务操作1并返回操作结果
return Task.FromResult(true);
}
}
public class TestEventHandler2 : IEventHandler<TestEvent>
{
public Task<bool> Handle(TestEvent @event, Dictionary<string, object> headers, CancellationToken cancellationToken)
{
//执行业务操作2并返回操作结果
return Task.FromResult(true);
}
}
步骤2:创建消息生产端,消息发送程序
using Hummingbird.Extensions.EventBus.Abstractions;
using Hummingbird.Extensions.EventBus.Models;
using Microsoft.AspNetCore.Mvc;
using MySql.Data.MySqlClient;
using System.Collections.Generic;
using System.Threading.Tasks;
using Dapper;
using System.Linq;
using System.Threading;
[Route("api/[controller]")]
public class MQPublisherTestController : Controller
{
private readonly IEventLogger eventLogger;
private readonly IEventBus eventBus;
public MQPublisherTestController(
IEventLogger eventLogger,
IEventBus eventBus)
{
this.eventLogger = eventLogger;
this.eventBus = eventBus;
}
/// <summary>
/// 无本地事务发布消息,消息直接写入队列
/// </summary>
[HttpGet]
[Route("Test1")]
public async Task<string> Test1()
{
var events = new List<EventLogEntry>() {
new EventLogEntry("TestEvent",new Events.TestEvent() {
EventType="Test1"
}),
new EventLogEntry("TestEvent",new {
EventType="Test1"
}),
};
var ret= await eventBus.PublishAsync(events);
return ret.ToString();
}
/// <summary>
/// 有本地事务发布消息,消息落盘到数据库确保事务完整性
/// </summary>
/// <returns></returns>
[HttpGet]
[Route("Test2")]
public async Task<string> Test2()
{
var connectionString = "Server=localhost;Port=63307;Database=test; User=root;Password=123456;pooling=True;minpoolsize=1;maxpoolsize=100;connectiontimeout=180";
using (var sqlConnection = new MySqlConnection(connectionString))
{
await sqlConnection.OpenAsync();
var sqlTran = await sqlConnection.BeginTransactionAsync();
var events = new List<EventLogEntry>() {
new EventLogEntry("TestEvent",new Events.TestEvent() {
EventType="Test2"
}),
new EventLogEntry("TestEvent",new {
EventType="Test2"
}),
};
//保存消息至业务数据库,保证写消息和业务操作在一个事务
await eventLogger.SaveEventAsync(events, sqlTran);
var ret = await sqlConnection.ExecuteAsync("you sql code");
return ret.ToString();
}
}
/// <summary>
/// 有本地事务发布消息,消息落盘到数据库,从数据库重新取出消息发送到队列
/// </summary>
/// <returns></returns>
[HttpGet]
[Route("Test3")]
public async Task Test3()
{
//获取1000条没有发布的事件
var unPublishedEventList = eventLogger.GetUnPublishedEventList(1000);
//通过消息总线发布消息
var ret = await eventBus.PublishAsync(unPublishedEventList);
if (ret)
{
await eventLogger.MarkEventAsPublishedAsync(unPublishedEventList.Select(a => a.EventId).ToList(), CancellationToken.None);
}
else
{
await eventLogger.MarkEventAsPublishedFailedAsync(unPublishedEventList.Select(a => a.EventId).ToList(), CancellationToken.None);
}
}
}
步骤3: 配置使用Rabbitmq消息队列和使用Mysql消息持久化
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddHummingbird(hb =>
{
hb.AddEventBus((builder) =>
{
//使用Rabbitmq 消息队列
builder.AddRabbitmq(factory =>
{
factory.WithEndPoint("192.168.109.2,192.168.109.3", "5672"));
factory.WithAuth("guest", "guest");
factory.WithExchange("/");
factory.WithReceiver(PreFetch: 10, ReceiverMaxConnections: 1, ReveiverMaxDegreeOfParallelism: 1);
factory.WithSender(10);
});
//使用Kafka 消息队列
//builder.AddKafka(option =>
//{
// option.WithSenderConfig(new Confluent.Kafka.ProducerConfig()
// {
// EnableDeliveryReports = true,
// BootstrapServers = "192.168.78.29:9092,192.168.78.30:9092,192.168.78.31:9092",
// // Debug = "msg" // Debug = "broker,topic,msg"
// });
// option.WithReceiverConfig(new Confluent.Kafka.ConsumerConfig()
// {
// // Debug= "consumer,cgrp,topic,fetch",
// GroupId = "test-consumer-group",
// BootstrapServers = "192.168.78.29:9092,192.168.78.30:9092,192.168.78.31:9092",
// });
// option.WithReceiver(1, 1);
// option.WithSender(10, 3, 1000 * 5, 50);
//});
// 基于MySql 数据库 进行消息持久化,当存在分布式事务问题时
builder.AddMySqlEventLogging(o => {
o.WithEndpoint("Server=localhost;Port=63307;Database=test; User=root;Password=123456;pooling=True;minpoolsize=1;maxpoolsize=100;connectiontimeout=180");
});
// 基于SqlServer 数据库 进行消息持久化,当存在分布式事务问题时
//builder.AddSqlServerEventLogging(a =>
//{
// a.WithEndpoint("Data Source=localhost,63341;Initial Catalog=test;User Id=sa;Password=123456");
//});
})
});
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
var logger = app.ApplicationServices.GetRequiredService<ILogger<IEventLogger>>();
app.UseHummingbird(humming =>
{
humming.UseEventBus(sp =>
{
sp.UseSubscriber(eventbus =>
{
eventbus.Register<TestEvent, TestEventHandler1>("TestEventHandler1", "TestEvent");
eventbus.Register<TestEvent, TestEventHandler2>("TestEventHandler2", "TestEvent");
//订阅消息
eventbus.Subscribe((Messages) =>
{
foreach (var message in Messages)
{
logger.LogDebug($"ACK: queue {message.QueueName} route={message.RouteKey} messageId:{message.MessageId}");
}
}, async (obj) =>
{
foreach (var message in obj.Messages)
{
logger.LogError($"NAck: queue {message.QueueName} route={message.RouteKey} messageId:{message.MessageId}");
}
//消息消费失败执行以下代码
if (obj.Exception != null)
{
logger.LogError(obj.Exception, obj.Exception.Message);
}
// 消息等待5秒后重试,最大重试次数3次
var events = obj.Messages.Select(message => message.WaitAndRetry(a => 5,3)).ToList();
// 消息写到重试队列
var ret = !(await eventBus.PublishAsync(events));
return ret;
});
});
});
});
}
}
步骤1: 安装Nuget包
Install-Package Hummingbird.Extensions.HealthChecks -Version 1.15.0
Install-Package Hummingbird.Extensions.HealthChecks.Redis -Version 1.15.0
Install-Package Hummingbird.Extensions.HealthChecks.Rabbitmq -Version 1.15.0
Install-Package Hummingbird.Extensions.HealthChecks.MySql -Version 1.15.0
Install-Package Hummingbird.Extensions.HealthChecks.SqlServer -Version 1.15.0
步骤2: 配置健康检查Endpoint
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.UseHealthChecks("/healthcheck")
.ConfigureAppConfiguration((builderContext, config) =>
{
config.SetBasePath(Directory.GetCurrentDirectory());
config.AddEnvironmentVariables();
})
.ConfigureLogging((hostingContext, logging) =>
{
logging.ClearProviders();
})
.Build();
}
步骤3: 配置监控检查项
public class Startup
{
public IConfiguration Configuration { get; }
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddHealthChecks(checks =>
{
checks.WithDefaultCacheDuration(TimeSpan.FromSeconds(5));
checks.AddMySqlCheck("mysql", "Server=localhost;Port=63307;Database=test; User=root;Password=123456;pooling=True;minpoolsize=1;maxpoolsize=100;connectiontimeout=180;SslMode=None");
checks.AddSqlCheck("sqlserver", "Data Source=localhost,63341;Initial Catalog=test;User Id=sa;Password=123456");
checks.AddRedisCheck("redis", "localhost:6379,password=123456,allowAdmin=true,ssl=false,abortConnect=false,connectTimeout=5000");
checks.AddRabbitMQCheck("rabbitmq", factory =>
{
factory.WithEndPoint("192.168.109.2,192.168.109.3","5672"));
factory.WithAuth("guest", "guest");
factory.WithExchange("/");
});
});
}
}
步骤1: 安装Nuget包
Install-Package Hummingbird.DynamicRoute -Version 1.15.1
Install-Package Hummingbird.LoadBalancers -Version 1.15.0
Install-Package Hummingbird.Extensions.DynamicRoute.Consul -Version 1.15.5
Install-Package Hummingbird.Extensions.Resilience.Http -Version 1.15.0
步骤2:配置 appsettings.json
{
"SERVICE_REGISTRY_ADDRESS": "localhost", // 注册中心地址
"SERVICE_REGISTRY_PORT": "8500", //注册中心端口
"SERVICE_SELF_REGISTER": true, //自注册开关打开
"SERVICE_NAME": "SERVICE_EXAMPLE", //服务名称
"SERVICE_ADDRESS": "",
"SERVICE_PORT": "80",
"SERVICE_TAGS": "test",
"SERVICE_REGION": "DC1",
"SERVICE_80_CHECK_HTTP": "/healthcheck",
"SERVICE_80_CHECK_INTERVAL": "15",
"SERVICE_80_CHECK_TIMEOUT": "15",
"SERVICE_CHECK_TCP": null,
"SERVICE_CHECK_SCRIPT": null,
"SERVICE_CHECK_TTL": "15",
"SERVICE_CHECK_INTERVAL": "5",
"SERVICE_CHECK_TIMEOUT": "5"
}
步骤3:添加appsettings.json 配置依赖
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.ConfigureAppConfiguration((builderContext, config) =>
{
config.SetBasePath(Directory.GetCurrentDirectory());
config.AddJsonFile("appsettings.json");
config.AddEnvironmentVariables();
})
.ConfigureLogging((hostingContext, logging) =>
{
logging.ClearProviders();
})
.Build();
}
步骤4:服务注册到Consul并配置弹性HTTP客户端
public class Startup
{
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddHummingbird(hummingbird =>
{
hummingbird
// 服务注册到Consul
.AddConsulDynamicRoute(Configuration, s =>
{
s.AddTags("version=v1");
})
// 设置弹性HTTP客户端(服务发现、超时、重试、熔断)
.AddResilientHttpClient((orign, option) =>
{
var setting = Configuration.GetSection("HttpClient");
if (!string.IsNullOrEmpty(orign))
{
var orginSetting = Configuration.GetSection($"HttpClient:{orign.ToUpper()}");
if(orginSetting.Exists())
{
setting = orginSetting;
}
}
option.DurationSecondsOfBreak = int.Parse(setting["DurationSecondsOfBreak"]);
option.ExceptionsAllowedBeforeBreaking = int.Parse(setting["ExceptionsAllowedBeforeBreaking"]);
option.RetryCount = int.Parse(setting["RetryCount"]);
option.TimeoutMillseconds = int.Parse(setting["TimeoutMillseconds"]);
});
});
}
}
步骤5:测试HTTP Client
using Hummingbird.Extensions.Resilience.Http;
using Microsoft.AspNetCore.Mvc;
using System.Threading;
using System.Threading.Tasks;
[Route("api/[controller]")]
public class HttpClientTestController : Controller
{
private readonly IHttpClient httpClient;
public HttpClientTestController(IHttpClient httpClient)
{
this.httpClient = httpClient;
}
[HttpGet]
[Route("Test1")]
public async Task<string> Test1()
{
return await httpClient.GetStringAsync("https://localhost:5001/healthcheck");
}
[HttpGet]
[Route("Test2")]
public async Task<string> Test2()
{
return await (await httpClient.PostAsync(
uri: "https://{example}/healthcheck",
item: new { },
authorizationMethod: null,
authorizationToken: null,
dictionary: null,
cancellationToken: CancellationToken.None)).Content.ReadAsStringAsync();
}
}
步骤1: 安装Nuget包
Install-Package Hummingbird.DynamicRoute -Version 1.17.3
Install-Package Hummingbird.LoadBalancers -Version 1.17.3
Install-Package Hummingbird.Extensions.DynamicRoute.Nacos -Version 1.17.6
Install-Package Hummingbird.Extensions.Resilience.Http -Version 1.17.3
步骤2:配置 appsettings.json
{
"Nacos": {
"EndPoint": "",
"ServerAddresses": [ "https://localhost:8848" ],
"DefaultTimeOut": 15000,
"Namespace": "public",
"ListenInterval": 1000,
"ServiceName": "example",
"GroupName": "DEFAULT_GROUP",
"ClusterName": "DEFAULT",
"Ip": "",
"PreferredNetworks": "",
"Port": 0,
"Weight": 100,
"RegisterEnabled": true,
"InstanceEnabled": true,
"Ephemeral": true,
"Secure": false,
"AccessKey": "",
"SecretKey": "",
"UserName": "",
"Password": "",
"ConfigUseRpc": true,
"NamingUseRpc": true,
"NamingLoadCacheAtStart": "",
"LBStrategy": "WeightRandom", //WeightRandom WeightRoundRobin
"Metadata": {
"debug": "true",
"dev": ""
}
}
}
步骤3:添加appsettings.json 配置依赖
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.ConfigureAppConfiguration((builderContext, config) =>
{
config.SetBasePath(Directory.GetCurrentDirectory());
config.AddJsonFile("appsettings.json");
config.AddEnvironmentVariables();
})
.ConfigureLogging((hostingContext, logging) =>
{
logging.ClearProviders();
})
.Build();
}
步骤4:服务注册到Nacos并配置弹性HTTP客户端
public class Startup
{
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddHummingbird(hummingbird =>
{
hummingbird
// 服务注册到Nacos
.AddNacosDynamicRoute(Configuration, s =>
{
s.AddTags("version=v1");
})
// 设置弹性HTTP客户端(服务发现、超时、重试、熔断)
.AddResilientHttpClient((orign, option) =>
{
var setting = Configuration.GetSection("HttpClient");
if (!string.IsNullOrEmpty(orign))
{
var orginSetting = Configuration.GetSection($"HttpClient:{orign.ToUpper()}");
if(orginSetting.Exists())
{
setting = orginSetting;
}
}
option.DurationSecondsOfBreak = int.Parse(setting["DurationSecondsOfBreak"]);
option.ExceptionsAllowedBeforeBreaking = int.Parse(setting["ExceptionsAllowedBeforeBreaking"]);
option.RetryCount = int.Parse(setting["RetryCount"]);
option.TimeoutMillseconds = int.Parse(setting["TimeoutMillseconds"]);
});
});
}
}
步骤5:测试HTTP Client
using Hummingbird.Extensions.Resilience.Http;
using Microsoft.AspNetCore.Mvc;
using System.Threading;
using System.Threading.Tasks;
[Route("api/[controller]")]
public class HttpClientTestController : Controller
{
private readonly IHttpClient httpClient;
public HttpClientTestController(IHttpClient httpClient)
{
this.httpClient = httpClient;
}
[HttpGet]
[Route("Test1")]
public async Task<string> Test1()
{
return await httpClient.GetStringAsync("https://localhost:5001/healthcheck");
}
[HttpGet]
[Route("Test2")]
public async Task<string> Test2()
{
return await (await httpClient.PostAsync(
uri: "https://{example}/healthcheck",
item: new { },
authorizationMethod: null,
authorizationToken: null,
dictionary: null,
cancellationToken: CancellationToken.None)).Content.ReadAsStringAsync();
}
}
步骤1: 安装Nuget包
Install-Package Hummingbird.Extensions.Canal -Version 1.17.3
步骤2:配置 canal.json, binlog日志输出到控制台
{
"Canal": {
"Subscribes": [
{
"Filter": ".*\\..*",
"BatchSize": 1024,
"Format": "Hummingbird.Extensions.Canal.Formatters.CanalJson.Formatter,Hummingbird.Extensions.Canal", //MaxwellJsonFormatter,CanalJsonFormatter
"Connector": "Hummingbird.Extensions.Canal.Connectors.ConsoleConnector,Hummingbird.Extensions.Canal",
"ConnectionInfo": {
"Address": "localhost",
"Port": 11111,
"Destination": "test1",
"UserName": "",
"Passsword": ""
}
}
]
}
步骤3:添加appsettings.json 配置依赖
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.ConfigureAppConfiguration((builderContext, config) =>
{
config.SetBasePath(Directory.GetCurrentDirectory());
config.AddJsonFile("canal.json");
config.AddEnvironmentVariables();
})
.Build();
}
步骤4: 实现自己的binlog处理
public class ConsoleSubscripter : ISubscripter
{
public bool Process(CanalEventEntry[] entrys)
{
foreach(var entry in entrys)
{
Console.WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(entry));
}
//ack
return true;
}
}
4.1 Consul
4.2 Apollo
4.3 Mysql
4.4 Redis
4.5 Elasticsearch
4.6 Kafka
4.7 Rabbitmq
4.8 Grafana
4.9 Influxdb
4.10 Jaeger
4.11 Canal
4.12 Nacos
5.1 项目模板
wechat:genius-ming email:[email protected]