Mysql通过Canal同步Elasticsearch
时间:2023-01-17阅读:26来源:柠檬博客作者:柠檬博客
目录
版本管理
mysql: 8.0 Elasticsearch 7.9.2 Canal v1.1.5 Canal-Adapter v1.1.5
Canal 参考官网:
Mysql 设置 在MySQL配置文件my.cnf设置: 应该是 vi 或者 vim 无法使用,使用 docker copy 来解决docker exec -it [id] /bin/bash // 进入容器 # 本地创建 vim mysqld.cnf // 修改mysql配置
# 打开binlog log-bin=mysql-bin # 选择ROW(行)模式 binlog-format=ROW # 配置MySQL replaction需要定义,不要和canal的slaveId重复 server_id=1
docker copy /root/mysqld.cnf 容器id/etc/mysql/检查是否开启
# 查看是否开启binlog模式 show variables like 'log_bin%'; # 查看binlog日志文件列表 show variables like 'binlog_format%'; # 查看当前正在写入的binlog文件: SHOW master STATUS; # 重置 reset master;增加新用户:
CODE # 添加用户并设置密码 CREATE USER canal IDENTIFIED BY 'canal'; # 授权 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; #重新加载权限 FLUSH PRIVILEGES;安装 Elasticsearch
# 下载elasticsearch镜像 docker pull elasticsearch:7.9.2 # //启动elasticsearch docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms64m -Xmx1g" elasticsearch:7.9.2 # //进入elasticsearch容器 docker exec -it elasticsearch /bin/bash # 安装ik分词器(服务器下载失败可能是内存炸了) ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.9.2/elasticsearch-analysis-ik-7.9.2.zipes 跨域问题
docker exec -it elasticsearch /bin/sh vi config/elasticsearch.yml
cluster.name: "docker-cluster" network.hosts:0.0.0.0 # 跨域 http.cors.allow-origin: "*" http.cors.enabled: true目录挂载
# 授予权限,不然启动失败 chmod 777 /home/haha/mydata/elasticsearch/data
docker run -d --name elasticsearch --restart=always -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms64m -Xmx1g" -v /home/haha/mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins -v /home/haha/mydata/elasticsearch/data:/usr/share/elasticsearch/data elasticsearch:7.9.2安装 Elasticsearch-Head 解决 es 跨域连接问题
docker exec -it elasticsearch /bin/sh vi config/elasticsearch.yml
cluster.name: "docker-cluster" network.hosts:0.0.0.0 # 跨域 http.cors.allow-origin: "*" http.cors.enabled: true启动 es-head
docker run -d \ --name=elasticsearch-head \ -p 9100:9100 \ mobz/elasticsearch-head:5-alpine解决 es-head 无法创建索引
进入elasticsearch-head容器内
docker exec -it elasticsearch-head /bin/sh vi _site/vendor.js
将第6886行 contentType: "application/x-www-form-urlencoded", 改为 contentType: "application/json;charset=UTF-8", 将第7574行 var inspectData = s.contentType === "application/x-www-form-urlencoded" && 改为 var inspectData = s.contentType === "application/json;charset=UTF-8" &&
重启,可以清一下浏览器缓存
exit docker restart elasticsearch-head安装 Canal
可以参考:
2 G的服务器搞不来 安装镜像# 拉取镜像 $ docker pull canal/canal-server:v1.1.5 # 随便启动一个,用于拉取配置文件 $ docker run --name canal -p 11111:11111 -d canal/canal-server:v1.1.5 # 挂载配置文件 $ docker cp canal:/home/admin /home/haha/docker-canal # 删除容器 $ docker stop canal $ docker rm canal # 启动新的容器 $ docker run --name canal -p 11111:11111 -v /home/haha/docker-canal:/home/admin -d canal/canal-server:v1.1.5编写配置文件
# 切换到挂载目录 $ cd /usr/kang/docker-canal # 切换到需要修改的配置文件所在目录 $ cd canal-server/conf/example/ # 修改文件 $ vim instance.properties
更改 slaveId=10 ,记得和数据库的 server-id 不一样
更改数据库地址
canal.instance.mysql.slaveId=0 canal.instance.master.address=192.168.80.80:3306 canal.instance.dbUsername=root canal.instance.dbPassword=123456789
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info # 数据库地址 canal.instance.master.address=192.168.118.130:3306 # 当前正在写入的 binlog文件,第二部分中mysql命令可查询 canal.instance.master.journal.name=binlog.000001 # 正在写入的偏移量 canal.instance.master.position=156 canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password # mysql 用户,若之前创建过可不用修改 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #################################################问题:
# 下面是挂在的目录,需要授权,不然挂在出来,里面拒绝访问 chmod 777 /home/haha/docker-canal/
PUT /area_city { "mappings": { "properties": { "id":{ "type": "long" }, "pid":{ "type": "integer" }, "deep":{ "type": "integer" }, "name":{ "type": "text" }, "pinyin_prefix":{ "type": "text" }, "pinyin":{ "type": "text" }, "ext_id":{ "type": "text" }, "ext_name":{ "type": "text" } } } }安装 Canal-Adapter 安装镜像
BASH # 拉取镜像 $ docker pull slpcat/canal-adapter:v1.1.5 # 随意启动,用于挂载配置文件 $ docker run --name canal-adapter -p 8081:8081 -d slpcat/canal-adapter:v1.1.5 # 创建存储挂载的目录 $ mkdir /usr/kang/docker-canal-adapter # 挂载文件 $ docker cp canal-adapter:/opt/canal-adapter /home/haha/docker-canal-adapter # 删除容器 $ docker stop canal-adapter $ docker rm canal-adapter # 建议添加权限 chmod 777 /home/haha/docker-canal-adapter # 启动新的容器 $ docker run --name canal-adapter -p 8081:8081 -v /home/haha/docker-canal-adapter/canal-adapter:/opt/canal-adapter -d slpcat/canal-adapter:v1.1.5配置canal-adapter 文件'
需要修改的是,conf目录下的 applicatiopn.yml 、es7目录下的 mytest_user.yml
编辑 application.yml:
BASH # 切换到挂载目录 $ cd /home/haha/docker-canal-adapter/canal-adapter $ cd conf/ $ vim application.yml YAML server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer 编辑此处为 canal.deployer 的地址,canal.deployer的默认端口为 11111 # 用于获取 canal 的数据进行实时同步 canal.tcp.server.host: 192.168.80.80:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: # kafka consumer # kafka.bootstrap.servers: 127.0.0.1:9092 # kafka.enable.auto.commit: false # kafka.auto.commit.interval.ms: 1000 # kafka.auto.offset.reset: latest # kafka.request.timeout.ms: 40000 # kafka.session.timeout.ms: 30000 # kafka.isolation.level: read_committed # kafka.max.poll.records: 1000 # rocketMQ consumer # rocketmq.namespace: # rocketmq.namesrv.addr: 127.0.0.1:9876 # rocketmq.batch.size: 1000 # rocketmq.enable.message.trace: false # rocketmq.customized.trace.topic: # rocketmq.access.channel: # rocketmq.subscribe.filter: # rabbitMQ consumer # rabbitmq.host: # rabbitmq.virtual.host: # rabbitmq.username: # rabbitmq.password: # rabbitmq.resource.ownerId: srcDataSources: defaultDS: # 修改此处为 数据库信息 url: jdbc:mysql://192.168.80.80:3306/estest?useUnicode=true username: root password: 123456789 canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase - name: es7 # es7文件夹配置,还有es6 # key: fgnKey hosts: 192.168.80.80:9200 # 127.0.0.1:9200 for rest mode es 集群地址, 逗号分隔 properties: mode: rest # or rest 可指定transport模式或者rest模式 # # security.auth: test:123456 # only used for rest mode cluster.name: elasticsearch #指定es的集群名称 # - name: kudu # key: kudu # properties: # kudu.master.address: 127.0.0.1 # ',' split multi address
修改mytest_user.yml 文件
在 es7 文件夹dataSourceKey: defaultDS #源数据源的key, 对应上面配置的srcDataSources中的值 destination: example # cannal的instance或者MQ的topic groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据 esMapping: _index: area_city # es 的索引名称 _id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配 upsert: true pk: id # 如果不需要_id, 则需要指定一个属性为主键属性 # sql映射 sql: "SELECT id AS _id, pid, deep, name, pinyin_prefix, pinyin, ext_id, ext_name FROM area_city" # objFields: # _labels: array:; etlCondition: "where a.c_time>={}" # etl 的条件参数 commitBatch: 3000 # 提交批大小
sql 映射中的 sql语句,为 id 其别名为 _id 不然在同步时为出错
sql 字段不要写 `` 不然会有问题,这个折磨我好久,
编写完成重启容器:
BASH $ docker restart canal-adapter测试 同步全量数据
postman 或者 xshell
curl http://localhost:8081/etl/es7/mytest_user.yml -X POST创建表
CREATE TABLE `area_city` ( `id` bigint(20) NOT NULL COMMENT '城市编号', `pid` int(11) NOT NULL COMMENT '上级ID', `deep` int(11) NOT NULL COMMENT '层级深度;0:省,1:市,2:区,3:镇', `name` varchar(255) NOT NULL COMMENT '城市', `pinyin_prefix` varchar(255) NOT NULL COMMENT 'name的拼音前缀', `pinyin` varchar(255) NOT NULL COMMENT 'name的完整拼音', `ext_id` varchar(50) NOT NULL COMMENT '数据源原始的编号;如果是添加的数据,此编号为0\r\n', `ext_name` varchar(255) NOT NULL COMMENT '数据源原始的名称,为未精简的名称\r\n' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
表要创建在 application.yml 所设置的数据库中
创建 es 索引PUT /index { "mappings": { "properties": { "id":{ "type": "long" }, "pid":{ "type": "integer" }, "deep":{ "type": "integer" }, "name":{ "type": "text" }, "pinyin_prefix":{ "type": "text" }, "pinyin":{ "type": "text" }, "ext_id":{ "type": "text" }, "ext_name":{ "type": "text" } } } }
26人参与,
0条评论
登录后显示评论回复