本文共 17226 字,大约阅读时间需要 57 分钟。
我们都知道一个系统最重要的是数据,数据是保存在数据库里。但是很多时候不单止要保存在数据库中,还要同步保存到Elastic Search、HBase、Redis等等。
这时我注意到阿里开源的框架Canal,他可以很方便地同步数据库的增量数据到其他的存储应用。
在很多业务情况下,我们都会在系统中引入ElasticSearch搜索引擎作为做全文检索的优化方案,引入redis缓存作为缓存优化查询显示。
如果数据库数据发生更新,这时候就需要在业务代码中写一段同步更新ElasticSearch的代码,同步更新缓存的代码。
这种数据同步的代码跟业务代码耦合性非常高,并且使得代码的可读性降低,我们能不能把这些数据同步的代码抽出来形成一个独立的模块呢?肯定是可以的。
下面我会以一个CMS文章管理为例来演示canal+RocketMQ实现MySQL与ElasticSearch,redis数据同步。
SpringBoot、canal、RocketMQ、MySQL、ElasticSearch,redis
介绍一下canal,其他的自行学习。
2.1 canal定义
canal
[kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费.。
2.2 canal工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
canal能做什么
以下参考canal官网。与其问canal能做什么,不如说数据同步有什么作用。
但是canal的数据同步不是全量的,而是增量。基于binary log增量订阅和消费,canal可以做:
数据库镜像
数据库实时备份 索引构建和实时维护 业务cache(缓存)刷新 带业务逻辑的增量数据处理2.3 架构
说明:
server代表一个canal运行实例,对应于一个jvm
instance对应于一个数据队列 (1个server对应1…n个instance) instance模块:eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作) eventStore (数据存储) metaManager (增量订阅&消费信息管理器) 到这里我们对canal有了一个初步的认识,接下我们就进入实战环节。3.环境准备
3.1 MySQL 配置 对于自建 MySQL , 需要先开启 Binlog写入功能,配置binlog-format为ROW 模式,my.cnf 中配置如下[mysqld]
log-bin=mysql-bin # 开启 binlogbinlog-format=ROW # 选择 ROW 模式server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复**注意:**针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
授权canal 连接 MySQL 账号具有作为 MySQL slave的权限, 如果已有账户可直接 使用grant 命令授权。
#创建用户名和密码都为canal
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;3.2 canal的安装和配置 3.2.1 canal.admin安装和配置 canal提供web ui 进行Server管理、Instance管理。
3.2.1.1 下载 canal.admin, 访问 页面 , 选择需要的包下载,
wget https://github.com/alibaba/canal/releases/download/canal-XXX/canal.admin-XXX.tar.gz 3.2.1.2 解压完成我们先配置canal.admin之后。通过web ui来配置 cancal server,这样使用界面操作非常的方便。
3.2.1.3 配置修改
vi conf/application.ymlserver: port: 8089spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1canal: adminUser: admin adminPasswd: admin3.2.1.4 初始化元数据库 初始化元数据库
# 导入初始化SQL
> source conf/canal_manager.sql 初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化canal_manager.sql默认会在conf目录下
3.2.1.5 启动
sh bin/startup.sh 3.2.1.6 启动成功,使用浏览器输入http://ip:8089/ 会跳转到登录界面使用用户名:admin 密码为:123456 登录
这时候我们的canal.admin就搭建成功了。 3.2.2 下载 canal.deployer, 访问 页面 , 选择需要的包下载解压完成
进入conf 目录。
我们先对canal.properties 不做任何修改。使用canal_local.properties的配置覆盖canal.properties
# register ipcanal.register.ip =# canal admin configcanal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441# admin auto registercanal.admin.register.auto = truecanal.admin.register.cluster =使用如下命令启动canal server
sh bin/startup.sh local
启动成功。同时我们在canal.admin web ui中刷新 server 管理,可以到canal server 已经启动成功。 这时候我们的canal.server 搭建已经成功。3.2.3 在canal admin ui 中配置Instance管理
3.2.3.1 新建 Instance 选择Instance 管理-> 新建Instance 填写 Instance名称:cms_article 选择 选择所属主机集群 选择 载入模板 修改默认信息#mysql serverId
canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的数据库信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,需要改成自己的数据库信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal #改成自己的数据库信息(需要监听的数据库) canal.instance.defaultDatabaseName = cms-manage canal.instance.connectionCharset = UTF-8 #table regex 需要过滤的表 这里数据库的中所有表 canal.instance.filter.regex = .\*\\..\*# MQ 配置 日志数据会发送到cms_article这个topic上
canal.mq.topic=cms_article # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* #单分区处理消息 canal.mq.partition=0 我们这里为了演示之创建一张cms_articla表。 配置好之后,我需要点击保存。此时在Instances 管理中就可以看到此时的实例信息。 3.2.4 修改canal server 的配置文件,选择消息队列处理binlog canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:kafka: https://github.com/apache/kafka
RocketMQ : https://github.com/apache/rocketmq 本案例以RocketMQ为例我们仍然使用web ui 界面操作。点击 server 管理 - > 点击配置
修改配置文件
# ...# 可选项: tcp(默认), kafka, RocketMQcanal.serverMode = RocketMQ# ...# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 canal.mq.servers = 192.168.0.200:9078canal.mq.retries = 0# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限canal.mq.batchSize = 16384canal.mq.maxRequestSize = 1048576# flatMessage模式下请将该值改大, 建议50-200canal.mq.lingerMs = 1canal.mq.bufferMemory = 33554432# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)canal.mq.canalBatchSize = 50# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时canal.mq.canalGetTimeout = 100# 是否为flat json格式对象canal.mq.flatMessage = falsecanal.mq.compressionType = nonecanal.mq.acks = all# kafka消息投递是否使用事务canal.mq.transaction = false修改好之后保存。会自动重启。
此时我们就可以在rocketmq的控制台看到一个cms_article topic已经自动创建了。
3.2.5 配置ElasticSearch启动
我们使用 elasticsearch-head 连接是可以看到节点信息。一会我们就使用 elasticsearch-head 查询es中数据。
4.代码实战
4.1 创建一个springboot 项目 4.2 pom.xml文件spring-boot-starter-data-elasticsearch:操作es依赖库 rocketmq-spring-boot-starter:操作rocketmq依赖库 其他就不过多介绍了。大家一看就明白了。rockmq-samples com.lidong.rocketmq 1.0.0 4.0.0 springboot-canal-rocketmq-es 1.8 UTF-8 UTF-8 2.2.5.RELEASE org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test org.springframework.boot spring-boot-starter-data-elasticsearch org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 org.springframework.boot spring-boot-dependencies ${spring-boot.version} pom import org.apache.maven.plugins maven-compiler-plugin 3.8.1 org.springframework.boot spring-boot-maven-plugin 2.3.0.RELEASE com.lidong.RocketmqSyncSamplesApplication repackage repackage
4.3 application的配置
server: port: 8085rocketmq: name-server: localhost:9876spring: data: elasticsearch: cluster-nodes: localhost:9300 cluster-name: my-application repositories: enabled: true
4.4 创建es操作的实体类和仓库类
4.4.1 EsCmsArticle实体类import org.springframework.data.annotation.Id;import org.springframework.data.elasticsearch.annotations.Document;import java.io.Serializable;import java.util.Date;/** * 文章详情 * * String indexName();//索引库的名称,个人建议以项目的名称命名 * String type() default "";//类型,个人建议以实体的名称命名 * short shards() default 5;//默认分区数 * short replicas() default 1;//每个分区默认的备份数 * String refreshInterval() default "1s";//刷新间隔 * String indexStoreType() default "fs";//索引文件存储类型 * **/@Document(indexName = "canal-rocketmq-es", type = "cms-article")public class EsCmsArticle implements Serializable { @Id private Long courseId; /** 标题 */ private String title; /** 摘要 */ private String abstractX; /** 内容 */ private String content; /** 年龄段 */ private String ageRange; /** 图片 */ private String image; /** 查看次数 */ private Long viewNumber; /** 作者 */ private String author; /** 来源 */ private String source; /** 所属分类 */ private Long classId; /** 关键字 */ private String keyWords; /** 描述 */ private String description; /** 文章url */ private String url; /** * 文章状态 */ private Integer status; /** * 创建时间 */ private Date createTime; /** * 修改时间 */ private Date updateTime; public void setCourseId(Long courseId) { this.courseId = courseId; } public Long getCourseId() { return courseId; } public void setTitle(String title) { this.title = title; } public String getTitle() { return title; } public void setAbstractX(String abstractX) { this.abstractX = abstractX; } public String getAbstractX() { return abstractX; } public void setContent(String content) { this.content = content; } public String getContent() { return content; } public void setAgeRange(String ageRange) { this.ageRange = ageRange; } public String getAgeRange() { return ageRange; } public void setImage(String image) { this.image = image; } public String getImage() { return image; } public void setViewNumber(Long viewNumber) { this.viewNumber = viewNumber; } public Long getViewNumber() { return viewNumber; } public void setAuthor(String author) { this.author = author; } public String getAuthor() { return author; } public void setSource(String source) { this.source = source; } public String getSource() { return source; } public void setClassId(Long classId) { this.classId = classId; } public Long getClassId() { return classId; } public void setKeyWords(String keyWords) { this.keyWords = keyWords; } public String getKeyWords() { return keyWords; } public void setDescription(String description) { this.description = description; } public String getDescription() { return description; } public void setUrl(String url) { this.url = url; } public String getUrl() { return url; } public Integer getStatus() { return status; } public void setStatus(Integer status) { this.status = status; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } @Override public String toString() { return "CmsArticle{" + "courseId=" + courseId + ", title='" + title + '\'' + ", abstractX='" + abstractX + '\'' + ", content='" + content + '\'' + ", ageRange='" + ageRange + '\'' + ", image='" + image + '\'' + ", viewNumber=" + viewNumber + ", author='" + author + '\'' + ", source='" + source + '\'' + ", classId=" + classId + ", keyWords='" + keyWords + '\'' + ", description='" + description + '\'' + ", url='" + url + '\'' + ", status=" + status + ", createTime=" + createTime + ", updateTime=" + updateTime + '}'; }}4.4.2 CmsArticleRepository 仓库类
import com.alibaba.fastjson.JSON;import com.lidong.canal.bean.CanalBean;import com.lidong.canal.bean.CmsArticle;import com.lidong.canal.es.entity.EsCmsArticle;import com.lidong.canal.es.repository.CmsArticleRepository;import org.apache.rocketmq.spring.annotation.ConsumeMode;import org.apache.rocketmq.spring.annotation.MessageModel;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.BeanUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;@Component@RocketMQMessageListener( topic = "cms_article", consumerGroup = "cms-article", selectorExpression = "*", consumeMode = ConsumeMode.ORDERLY, messageModel = MessageModel.CLUSTERING, consumeThreadMax = 1)public class SpringConsumer implements RocketMQListener4.6 SpringBootApplication启动类{ private Logger logger = LoggerFactory.getLogger(SpringConsumer.class.getSimpleName()); @Autowired CmsArticleRepository cmsArticleRepository; /** * 实现方式很简单吧,但是你也看见了代码中就没有消息能够消费是否成功后的确认方式,因为实现的onMessage()方法是个void的,还好看过原始的rocketmq的消费者实现方式,也就是rocketmq-client.jar的实现,它是MessageListener.java类来实现消息监听接收的,而它有2个继承接口类MessageListenerConcurrently.java和MessageListenerOrderly.java,这样就好找了,直接收一下这2个接口的实现类,乖乖,果然找到了在rocket-spring-boot的jar里面,就是DefaultRocketMQListenerContainer.java这个类,看下其中一个实现 * * * @param msg */ @Override public void onMessage(String msg) { System.out.println("接收到消息 -> " + msg); CanalBean canalBean = JSON.parseObject(msg, CanalBean.class); String table = canalBean.getTable(); System.out.println(table.toString()); String type = canalBean.getType(); System.out.println(type); List data = canalBean.getData(); data.stream().forEach(tbTest -> { EsCmsArticle esCmsArticle = new EsCmsArticle(); System.out.println(tbTest.toString()); if ("UPDATE".equals(type) && "cms_article".equals(table)) { Optional article = cmsArticleRepository.findById(tbTest.getCourseId()); //删除缓存 //操作es if (article.isPresent()) { EsCmsArticle cmsArticle = article.get(); BeanUtils.copyProperties(tbTest, cmsArticle); cmsArticleRepository.save(cmsArticle); logger.info("id = {} 编辑es成功", cmsArticle.getCourseId()); } else { BeanUtils.copyProperties(tbTest, esCmsArticle); cmsArticleRepository.save(esCmsArticle); logger.info("id = {} 添加es成功", esCmsArticle.getCourseId()); } } else if ("INSERT".equals(type) && "cms_article".equals(table)) { BeanUtils.copyProperties(tbTest, esCmsArticle); //添加缓存 //操作es cmsArticleRepository.save(esCmsArticle); logger.info("id = {} 添加es成功", esCmsArticle.getCourseId()); } }); }}
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class RocketmqToEsSamplesApplication { public static void main(String[] args) { SpringApplication.run(RocketmqToEsSamplesApplication.class, args); }}4.7 CanalBean类 接收mq的数据实体
public class CanalBean implements Serializable { //数据 private Listdata; //数据库名称 private String database; private long es; //递增,从1开始 private int id; //是否是DDL语句 private boolean isDdl; //表结构的字段类型 private MysqlType mysqlType; //UPDATE语句,旧数据 private List old; //主键名称 private List pkNames; //sql语句 private String sql; private SqlType sqlType; //表名 private String table; private long ts; //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等 private String type;//get set ...
public class MysqlType implements Serializable { private String id; private String commodity_name; private String commodity_price; private String number; private String description;//get set..
public class SqlType implements Serializable { private int id; private int commodity_name; private int commodity_price; private int number; private int description; //get set..}
import java.io.Serializable;import java.util.Date;public class CmsArticle implements Serializable { /** $column.columnComment */ private Long courseId; /** 标题 */ private String title; /** 摘要 */ private String abstractX; /** 内容 */ private String content; /** 年龄段 */ private String ageRange; /** 图片 */ private String image; /** 查看次数 */ private Long viewNumber; /** 作者 */ private String author; /** 来源 */ private String source; /** 所属分类 */ private Long classId; /** 关键字 */ private String keyWords; /** 描述 */ private String description; /** 文章url */ private String url; /** * 文章状态 */ private Integer status; /** * 创建时间 */ private Date createTime; /** * 修改时间 */ private Date updateTime;}
转载地址:http://chxzb.baihongyu.com/