Github:https://github.com/dotnetcore/CAP
开源协议:MIT
CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。
你可以轻松的在基于 .NET Core 技术的分布式系统中引入CAP,包括但限于 ASP.NET Core 和 ASP.NET Core on .NET Framework。
CAP 以 NuGet 包的形式提供,对项目无任何入侵,你仍然可以以你喜爱的方式来构建分布式系统。
CAP 具有 Event Bus 的所有功能,并且CAP提供了更加简化的方式来处理EventBus中的发布/订阅。
CAP 具有消息持久化的功能,也就是当你的服务进行重启或者宕机时,她可以保证消息的可靠性。
CAP 实现了分布式事务中的最终一致性,你不用再去处理这些琐碎的细节。
CAP 提供了基于 Microsoft DI 的 API 服务,她可以和你的 ASP.NET Core 系统进行无缝结合,并且能够和你的业务代码集成支持强一致性的事务处理。
CAP 是开源免费的。CAP基于MIT协议开源,你可以免费的在你的私人或者商业项目中使用,不会有人向你收取任何费用。
目前, CAP 同时支持使用 RabbitMQ 或 Kafka 进行底层之间的消息发送,你不需要具备 RabbitMQ 或者 Kafka 的使用经验,仍然可以轻松的集成到项目中。
CAP 目前支持使用 Sql Server,MySql,PostgreSql 数据库的项目。
CAP 同时支持使用 EntityFrameworkCore 和 Dapper 的项目,你可以根据需要选择不同的配置方式。
下面是CAP在系统中的一个不完全示意图:
图中实线部分代表用户代码,虚线部分代表CAP内部实现。
下面,我们看一下 CAP 怎么集成到项目中:你可以运行下面的命令来安装CAP NuGet 包:
PM>Install-Package DotNetCore.CAP
根据底层消息队列,你可以选择引入不同的包:
//如果你使用的是 Kafka
PM>Install-Package DotNetCore.CAP.Kafka
//如果你使用的是 RabbitMQ
PM>Install-Package DotNetCore.CAP.RabbitMQ
CAP 目前支持使用 My SqlServer 的项目,你需要引入:
PM>Install-Package DotNetCore.CAP.MySqlServer
在 Startup.cs 文件中,添加如下配置:
public void ConfigureServices(IServiceCollection services) { ...... services.AddCap(x => { // 如果你的 SqlServer 使用的 EF 进行数据操作,你需要添加如下配置: // 注意: 你不需要再次配置 x.UseSqlServer(""") x.UseEntityFramework<AppDbContext>(); // 如果你使用的Dapper,你需要添加如下配置: x.UseMySqlServer("数据库连接字符串"); //如果你使用的 Kafka 作为MQ,你需要添加如下配置: x.UseKafka("localhost:9092"); }); } public void Configure(IApplicationBuilder app) { ..... // 添加 CAP, 2.3版本后已移除 app.UseCap(); }
在 Controller 中注入 ICapPublisher 然后使用 ICapPublisher 进行消息发布:
public class PublishController : Controller { private readonly ICapPublisher _publisher; public PublishController(ICapPublisher publisher) { _publisher = publisher; } [Route("~/checkAccountWithTrans")] public async Task<IActionResult> PublishMessageWithTransaction([FromServices]AppDbContext dbContext) { using (var trans = dbContext.Database.BeginTransaction()) { //指定发送的消息标题(供订阅)和内容 await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 }); // 你的业务代码。 trans.Commit(); } return Ok(); } }
在 Controller 中: 如果是在Controller中,直接添加[CapSubscribe("")] 来订阅相关消息。
public class PublishController : Controller { [NoAction] [CapSubscribe("xxx.services.account.check")] public async Task CheckReceivedMessage(Person person) { Console.WriteLine(person.Name); Console.WriteLine(person.Age); return Task.CompletedTask; } }在 xxxService中:如果你的方法没有位于Controller 中,那么你订阅的类需要继承ICapSubscribe,然后添加[CapSubscribe("")]标记:
namespace xxx.Service { public interface ISubscriberService { public void CheckReceivedMessage(Person person); } public class SubscriberService: ISubscriberService, ICapSubscribe { [CapSubscribe("xxx.services.account.check")] public void CheckReceivedMessage(Person person) { } } }然后在Startup.cs中的 ConfigureServices() 中注入你的ISubscriberService类
public void ConfigureServices(IServiceCollection services) { services.AddTransient<ISubscriberService,SubscriberService>(); }
我在centos服务器上采用docker部署,命令如下:
//下载zookeeper docker pull wurstmeister/zookeeper
//下载kafka docker pull wurstmeister/kafka:2.11-0.11.0.3
//启动zookeeper docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper
//启动kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=192.168.161.163:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.161.163 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:2.11-0.11.0.3
部署完毕后就进入下一步运行啦。
第四步:运行项目,运行成功后,我嗯可以在数据库中发现cap会自动在数据库中创建两张表,一张是 发布信息表、一张是接收信息表。
发现 表中有数据存在:
数据体现法发送接收成功。
我们再来看看cap有提供的UI界面,发现里面有一个我们用consul注册的服务器。
参考文档:http://cap.dotnetcore.xyz/user-guide/zh/getting-started/quick-start/