基于1.1.5-alpha版本,具体源码笔记可以参考我的github:https://github.com/saigu/JavaKnowledgeGraph/tree/master/code_reading/canal
本文将对canal的server模块进行分析,跟之前一样,我们带着几个问题来看源码:
CanalServer有几种使用方式?
控制台Admin、客户端client是如何与CanalServer交互的?
CanalServerWithNetty和CanalServerWithEmbedded究竟有什么关系?
Canal事件消费的特色协议,异步流式api(get/ack/rollback协议)的设计是如何实现的?
server模块内的结构如下:
主要分为了三个包:
admin包:
这个包的CanalAdmin接口定义了canalServer上暴露给canal-admin控制台使用的一些服务接口。
上一篇deployer模块解析中提到的CanalAdminController就是实现了CanalAdmin接口(把这个接口的实现放在deployer模块是挺奇怪的)。Admin包中使用了netty作为服务端(CanalAdminWithNetty类中实现),接受控制台Admin的请求,返回当前canalServer的一些运行状态。
server包:
server模块的核心包,本文重点解析的部分,需要了解CanalServerWithEmbedded 和CanalServerWithNetty。
spi包:
定义了canalServer的监控内容 通过spi实现,比如项目中的Prometheus子模块实现了监控能力,我们不展开分析。
CanalServer目前支持两种模式:
serverMode = tcp的Server-Client模式
serverMode = kafak 或 rocketMQ 的 Server-MQ-Client模式
为了大家能充分理解canalServer的结构,这里精心制作了一个canalServer的架构图(如果觉得这图不错,给本文点个赞吧)。
1.1 Server-Client模式
架构如图所示:
我们可以清楚的看到Server模块中各个模块的关系与能力:
CanalServerWithEmbedde维护了具体的instance任务,负责对binlog进行订阅、过滤、缓存,就是之前的文章介绍过的parser-sink-store的方式。
CanalServerWithNetty作为服务端,接收CanalClient的请求,将binlog的消息发送给client。
CanalAdminWithNetty作为admin的服务器,接收控制台Admin的控制操作、查询状态操作等,启停或显示当前CanalServer以及instance的状态。
1.2 Server-MQ-Client模式
架构如图所示:
主体部分与Server-client模式一致,主要区别如下:
不需要CanalServerWithNetty,改为CanalMQProducer投递消息给消息队列。
不使用CanalClient,改为MqClient获取消息队列的消息进行消费。
这种模式相比于Server-client模式
下游解耦,利用消息队列的特性,可以支持多个客户端广播消费、集群消费、重复消费等。
会增加系统的复杂度,增加一些延迟。
具体模式的选择,需要根据具体的使用场景来决定。
admin包和spi包都不属于核心逻辑,因此我们重点关注server包的代码。
我们看到,server包下面分为了embedded包、exception包、netty包和几个接口类。
其中,最顶层的设计就要从CanalServer接口入手。
它的实现类有两个,CanalServerWithEmbedded 和 CanalServerWithNetty。
它们之间的区别官方文档给了一些说明。
那么,对于官方文档中提到的Embedded(嵌入式)的自主开发是怎么使用呢?
跟我们上面提到的Server-Client模式和Server-MQ-Client模式完全不同,采用了一种无server的架构,如下图所示。
我们可以看到,这种模式没有了Canal-Server,直接在自己的应用中引入canal,然后使用CanalServerWithEmbedded进行数据抓取和订阅。
当然,这种方式开发成本有点高,一般也不会去这样使用。
对于CanalServerWithEmbedded 和 CanalServerWithNetty,官方文档里面实际上没有解释的特别到位,只讲了区别,没有讲联系。
这两个实现类除了官方文档中说明的区别之外,还有很大的联系。
可以看看我们上文介绍的架构图,对于Server-Client模式下的模块联系
实际上,真正的执行逻辑是在CanalServerWithEmbedded中的,CanalServerWithNetty中持有了CanalServerWithEmbedded对象,委托embedded进行相关逻辑处理,CanalServerWithNetty更多的作用是充当服务端与CanalClient进行交互。
下面,我们先看看CanalServerWithNetty类。
3.1 单例构建
使用 private构造器 + 静态内部类 来实现一个单例模式,保证了一个CanalServer内部只有一个CanalServerWithNetty。
同时,我们能看到内部持有一个CanalServerWithEmbedded对象,用来处理相关请求,验证了我们上面的说明。
3.2 启动逻辑 start()
源码如下:
主要流程如下:
启动embeddedServer。
创建bootstrap实例,设置netty相关配置。
参数NioServerSocketChannelFactory也是Netty的API,接受2个线程池参数,第一个线程池是Accept线程池,第二个线程池是woker线程池,Accept线程池接收到client连接请求后,会将代表client的对象转发给worker线程池处理。这里属于netty的知识,不熟悉的可以暂时不必深究,简单认为netty使用线程来处理客户端的高并发请求即可。
构造对应的pipeline,包括解码处理、身份验证、创建netty的 seesionHandler(真正处理客户端请求,seesionHandler的实现是核心逻辑)。
pipeline实际上就是netty对客户端请求的处理器链,可以类比JAVA EE编程中Filter的责任链模式,上一个filter处理完成之后交给下一个filter处理,只不过在netty中,不再是filter,而是ChannelHandler。
启动netty,监听port端口,然后客户端对 这个端口的请求可以被接收到
对于 netty的相关知识 ,本文 不深入展开,简单理解 为一个高性能服务器即可,可以监听 端口请求,并 进行相应的处理。
重点在于sessionHandler的处理。
3.3 逻辑分发SessionHandler类
canalServer的处理逻辑显然都在sessionHandler里面,而这个handler在构建时,传入了embeddedServer。
前面我们提过,serverWithNetty的处理逻辑是委派给embeddedServer的,所以这里就非常顺理成章了,让handler维护embeddedServer实例,进行逻辑处理。
sessionHandler继承了netty的SimpleChannelHandler类,重写了messageReceived方法,接收到不同请求后,委托embeddedServer用不同方法进行处理 。
这个方法里面的代码非常冗长,而本质都是委托给embeddedServer去处理,因此,我们看下主干逻辑即可。
可以看到,根据不同的packet类型,最终都是委托给embeddedServer进行处理,这里只是做一个逻辑的判断和分发。
3.4 CanalServerWithNetty小结
到此,我们已经了解了CanalServerWithNetty是如何启动的。
并且,它的主要定位就是充当服务器,接收客户端的请求,然后做消息分发,委托给CanalServerEmbedded进行处理。
下面,我们来看下CanalServerEmbedded的相关实现。
4.1 基本认识
非完全单例模式,这里使用public的构造器,用户还是有机会自己new对象出来的,应用是用来独立引入进行开发的时候使用。
维护了instance的对象容器
继承了CanalServer和CanalService接口
CannalServer接口其实就是就是start()和stop()方法,没有特别的地方,主要是start()配置了一个MigrateMap.makeComputingMap,
当需要某个instance的时候,就会调用apply方法用instanceGenerator创建对应的instance。
我们重点看下CanalService接口定义的方法。
每个方法的入参都带来clientIdentity,这个是客户端的身份标示
目前canal只支持一个客户端对一个instance进行订阅,clientId全部写死为1001,据说以后可能会支持多用户订阅。
了解CanalService定义的方法在CanalServerEmbedded中如何实现,基本也就能看清CanalServerEmbedded的全貌了。
尤其是,你能理解官网wiki中介绍的canal核心功能——异步消费流式api(get/ack/rollback协议) 设计。
4.2 subscribe方法
主要步骤:
根据客户端标识clientIdentity中的destination,找到对应的instance。
通过instance的metaManager记录下当前这个客户端在订阅。
通过instace的metaManage获取当前订阅binlog的position位置。如果是第一次订阅,那么metaManage没有position信息,就从eventStore获取第一个binlog的position,然后更新到metaManager。
通知下订阅关系变化。
这里需要注意一下metaManager,这是一个接口,有多种实现方式,包括基于内存、基于文件、基于内存+zookeeper混合、基于zookeeper等,都在meta模块中,这里就简单了解下概念即可。
MemoryMetaManager:位点信息保存在内存中。
ZookeeperMetaManage:位点信息保存在zk上。
PeriodMixedMetaManager:前面两种的混合,保存在内存中,然后位点信息定期刷新到zk上。
我们在集群模式下,default-instance.xml使用的是基于PeriodMixedMetaManager的实现。
4.3 unsubscribe方法
这个方法比较简单,就不放源码了。
就是找到instance对应的metaManager,然后调用unsubscribe方法取消这个客户端的订阅。
需要注意的是,取消订阅,instance本身仍然是在运行的,可以有新的client来订阅这个instance。
4.4 getWithoutAck方法
先解释几个概念。
我们用的集群版canalServer,默认是使用PeriodMixedMetaManager来管理位点信息,也就是MemoryMetaManager + zookeeperMetaManager。
其中,对于客户端消费instance消息的情况,内部维护了一个对象MemoryClientIdentityBatch进行记录
回到这个方法来说,这个方法用于客户端获取binlog消息,大致流程如下:
根据clientIdentity的destination获取对应的instance
获取到流式数据中的最后一批获取的位置positionRanges(跟batchId有关联,就是上面那个map里面的)
从cananlEventStore里面获取binlog,转化为event。一般是从最后的一个batchId位置开始,如果之前没有batchId,那么就从cursor记录的消费位点开始;如果cursor为空,那只能从eventStore的第一条消息开始。
event转化为entry,并生成新的batchId,组合成message返回给客户端
注意在eventStore获取event的时候,用户可以自己设置batchSize和超时时间timeout。为了尽量提高效率,一般一次获取一批binlog,而不是获取一条。这个批次的大小(batchSize)由客户端指定。同时客户端可以指定超时时间,在超时时间内,如果获取到了batchSize的binlog,会立即返回。如果超时了还没有获取到batchSize指定的binlog个数,也会立即返回。特别的,如果没有设置超时时间,如果没有获取到binlog也立即返回。具体eventStore的获取逻辑,我们下次讲到这个模块再展开。
4.5 get方法
这个方法主要是用于客户端获取binlog消息,与getWithoutAck基本一致。
主要区别在于,客户端获取batch后,自动ack,这样相对来说肯定更快,但是无法保证可靠性。
在项目中看起来暂时没有使用,我们就不展开了。
4.6 ack方法
进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
从metaManager中移除batchId对应的记录
记录已经成功消费到的binlog位置,以便下一次获取的时候可以从这个位置开始
已经ack的数据,在eventStore中清除
4.7 rollback
rollback有两个方法,回滚所有和回滚指定batchId,不过从源码来看,目前回滚指定指定batchId也是回滚所有。
回滚的本质,就是把所有还没ack的batchId都清空,流式api被get但是还没ack的消息会被重新get。
在第一节的架构模式中我们分析过了,在启动过程中,如果serverMode选择tcp,会启动canalServerWithNetty,如果serverMode选择了mq,就会启动cannalMQStarter。
所以从模块组成来说,canalMQStarter跟canalServerWithNetty是比较相似的。
canalMQStarter也是委托embeddedCanal做处理,同时委托CanalMQProducer把消息投递到mq集群。
canalServerWithNetty也是委托embeddedCanal做处理,然后通过netty来跟canal-client做交互。
如果我们以后应用中要内嵌embeddedCanal,完全可以参照canalMQStarter和canalServerWithNetty的模式来写。
主要组成如下:
工作线程池executorService,对每个instance起一个worker线程。
canalMQWorks,记录了destination(instance的标识)和worker线程的关系。
维持了一个CanalServerWithEmbedded对象。
CanalMQProducer投递mq消息。
5.1 start方法
这个方法就是前面canalStarter类里面的start()方法中,对CanalMQStarter.start()的调用。
具体做了三件事情:
获取CanalServerWithEmbedded的单例对象
对应每个instance启动一个worker线程CanalMQRunnable
注册ShutdownHook,退出时关闭线程池和mqProducer
这里主要看看CanalMQRunnable做了些什么。
5.2 CanalMQRunnable
这是一个内部类,就是看看worker里面做了什么
只有一个worker方法,主要逻辑非常清晰:
给自己创建一个身份标识,作为client。
根据destination获取对应instance,如果没有就sleep,等待产生(比如从别的server那边HA过来一个instance)。
构建一个MQ的destination对象,加载相关mq的配置信息,用作mqProducer的入参。
在embeddedCanal中注册这个订阅客户端。
开始运行,并通过embededCanal进行流式get/ack/rollback协议,进行数据消费。
回到开头的几个问题,相信文中都已经做了解答。
CanalServer有几种使用方式?
可以独立部署(推荐),可以使用Server-Client模式 和 Server-MQ-Client模式两种。
可以内嵌部署开发(embedded,难度较高)。
控制台Admin、客户端client是如何与CanalServer交互的?
控制台Admin通过CanalAdminWithNetty与服务端交互 客户端client通过CanalServerWithNetty与服务端交互。
CanalServerWithNetty和CanalServerWithEmbedded究竟有什么关系?
CanalServerWithEmbedded是真正核心逻辑(parser-sink-store)处理的地方 。CanalServerWithNetty持有CanalServerWithEmbedded对象,接收client的请求然后转发给CanalServerWithEmbedded对象处理。
Canal事件消费的特色协议,异步流式api(get/ack/rollback协议)的设计是如何实现的?
CanalServerWithEmbedded集成了CanalService接口,实现了具体的get/ack/rollback协议。
原创:阿丸笔记(微信公众号:aone_note),欢迎 分享,转载请保留出处。
扫描下方二维码可以关注我哦~