Canal 使用总结 Zookeeper mysql
准备工作:
Canal下载地址: https://github.com/alibaba/canal/releases
Zookeeper 下载: https://zookeeper.apache.org/documentation.html
1. 首先 创建 mysql 授权用户
CREATE USER canal IDENTIFIED BY '123456';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
错误: 启动canal 时报错:
/canal/deployer/CanalLauncher : Unsupported major.minor version 52.0
解决方法:
javac -version
java -version
显示均为1.7.
下载安装1.8版本java jdk:
yum -y install java-1.8.0-openjdk-devel.x86_64
安装完成,验证: java -version 是否为 1.8
如果错误, 可生成新的软链接:
rm /etc/alternatives/javac
rm /etc/alternatives/java
ln -s /usr/lib/jvm/java-1.8.0/bin/java /etc/alternatives/java
ln -s /usr/lib/jvm/java-1.8.0/bin/javac /etc/alternatives/javac
2. 使用Admin版本时, 配置文件将不再生效,
错误:
2020-07-03 22:33:19.338 [destination = example , address = /172.20.35.139:3306 , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /172.20.35.139:3306 has an error, retrying. caused by
3. 当出现 driver.mysql.MysqlConnector.connect 错误时, 请检查 指定的 binlog 与 position 是否有错误.
错误:
2020-07-03 22:56:15.281 [destination = example , address = /172.20.35.139:3306 , EventParser] ERROR c.a.o.canal.parse.inbound.mysql.dbsync.DirectLogFetcher - I/O
error while reading from client socket
2020-07-03 20:03:06.095 [destination = example , address = /172.20.35.139:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: java.io.IOException: connect /172.20.35.139:3306 failure
Caused by: java.io.IOException: connect /172.20.35.139:3306 failure
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:83)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:89)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.
解决方法:
指定binlog 与 position:
canal.instance.master.journal.name 一般在/var/lib/mysql路径下,可以自己决定从哪个binlog开始。
canal.instance.master.position 可以在mysql下通过show master status in 'xxx'(binlog名) limit 0, 10指令来获取你需要的position;
如:
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=4
4. subscribe 使用:
# table regex 设置白名单,如果在instance.properties配置文件中进行该项配置,则在代码中不应该再配置
# connector.subscribe(".*\\..*");,如果还在代码中配置,则配置文件将会失效!!!
canal.instance.filter.regex = .*\\..*
# table black regex 设置黑名单
canal.instance.filter.black.regex =
所以当你只关心部分库表更新时,设置了canal.instance.filter.regex,一定不要在客户端调用CanalConnector.subscribe(".*\\..*"),不然等于没设置canal.instance.filter.regex。
如果一定要调用CanalConnector.subscribe(".*\\..*"),那么可以设置instance.properties的canal.instance.filter.black.regex参数添加黑名单,过滤非关注库表。
========================================================
mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)
5. 多个相同客户端连接同一个 instance 时:
则通过串连 轮询方式提供数据, client1, client2, client3
6. 使用canal 帐号密码时报错:
./yii canal/example/watcher
unpack(): Type N: not enough input, need 4, have 0
Exception 'Socket\Raw\Exception' with message 'Socket operation failed: Broken pipe (SOCKET_EPIPE)'
暂时无解决方法, 只要开启了 就会报错误,可能是php处理的问题, 此处的 passwd 在mysql 中执行: select password('123456');
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
7. canal.properties 配置文件不支持 热加载, instance 支持热加载处理, 保存即生效
8. 错误:
2020-07-09T08:09:23.498472Z 21 [Note] Aborted connection 21 to db: 'unconnected' user: 'canal' host: '172.20.35.92' (Got an error reading communication packets)
原因, 可能是binlog和position找不到导致的错误,
解决方法: 修改配置文件中的meta.dat, 如: canalServer/conf/example/meta.dat 中的 journalName 和 position
{"clientDatas":[{"clientIdentity":{"clientId":1009,"destination":"example","filter":".*\\..*"},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"172.20.35.139","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000008","position":154,"serverId":1,"timestamp":1594281603000}}}],"destination":"example"}
9. 支持 HA Zookeeper 需开启配置 canal.instance.global.spring.xml = classpath:spring/default-instance.xml, 不然的话只会写入 meta.dat 中
Zookeeper 下载: https://zookeeper.apache.org/documentation.html
错误:
2020-07-17 12:27:56,151 [myid:] - ERROR [main:QuorumPeerMain@98] - Invalid config, exiting abnormally
org.apache.zookeeper.server.quorum.QuorumPeerConfig$ConfigException: Error processing /opt/zookeeper/bin/../conf/zoo.cfg
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:197)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:124)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:90)
Caused by: java.lang.IllegalArgumentException: myid file is missing
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.checkValidity(QuorumPeerConfig.java:810)
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.setupQuorumPeerConfig(QuorumPeerConfig.java:681)
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parseProperties(QuorumPeerConfig.java:506)
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:193)
... 2 more
Invalid config, exiting abnormally
解决方法:
echo 1 > /tmp/zookeeper/data/myid
此处的1为你的配置中的ID值 可以是server.x 中的x值
按照部署和配置,在单台机器上各自完成配置,演示时instance name为example
① 修改主机:192.168.43.110 的canal.properties,加上zookeeper配置,spring配置选择default-instance.xml
canal.zkServers =192.168.43.110:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
② example目录,并修改instance.properties
canal.instance.mysql.slaveId=1234 ##另外一台机器改成1235,保证slaveId不重复即可
canal.instance.master.address=192.168.43.110:3306
注意: 两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置
10. Zookeeper 中的节点信息查看:
集群列表:
[zk: 172.20.35.104:2181(CONNECTED) 35] ls /otter/canal/destinations/JgCanalOrder/cluster
[172.20.35.103:11111, 172.20.35.104:11111, 172.20.35.92:11111]
meta 数据库位置信息:
[zk: 172.20.35.104:2181(CONNECTED) 32] get -s /otter/canal/destinations/JgCanalOrder/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"172.20.35.139","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000010","position":256911,"serverId":1,"timestamp":1594816430000}}
客户端只能使用 running 里的IP和端口访问:
[zk: 172.20.35.104:2181(CONNECTED) 37] get /otter/canal/destinations/JgCanalOrder/running
{"active":true,"address":"172.20.35.104:11111"}
11. PHP Zookeeper 扩展安装,
pecl install zookeeper-0.7.2
如报错 libzookeeper, 请先安装 Zookeeper 后再执行 pecl, MacOS 下先执行 brew install zookeeper 后再执行 pecl install zookeeper-0.7.2
12. 当mysql master down机, 如何切换到 slave 上?
请自行修改对应的 meta.dat 或 对应的 zookeeper 中 cursor 接点信息
OR
采用 mysql 的 GTID , 这里 没做测试 暂不知效果
13. 参考mysql表结构
CREATE TABLE `sy_canal_item_order` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`event_type` tinyint(1) DEFAULT '0' COMMENT 'DML事件类型 1:insert 2:update 3:delete',
`event_name` varchar(10) DEFAULT '' COMMENT 'DML事件名称',
`logfile_name` varchar(20) DEFAULT '' COMMENT 'binlog名称',
`position` varchar(20) DEFAULT '0' COMMENT 'binlog中的位置',
`server_id` int(5) DEFAULT '0' COMMENT 'Mysql 的Master ID',
`db_name` varchar(20) DEFAULT '' COMMENT 'Mysql 数据库名',
`table_name` varchar(20) DEFAULT '' COMMENT 'Mysql 数据库表名',
`unique_id` varchar(30) DEFAULT '0' COMMENT '表主键值',
`origin` longtext COMMENT '原始值',
`current` longtext COMMENT '当前值',
`updated` longtext COMMENT '有变更的字段和值',
`event_state` tinyint(1) DEFAULT '0' COMMENT '处理结果: 0:未处理, 1:已处理',
`event_state_time` int(10) DEFAULT '0' COMMENT '事件状态变更时间',
`create_time` int(10) DEFAULT '0',
PRIMARY KEY (`id`),
KEY `ind_event_type` (`event_type`),
KEY `ind_event_name` (`event_name`),
KEY `ind_position` (`position`),
KEY `ind_logfile_name` (`logfile_name`),
KEY `ind_db_name` (`db_name`),
KEY `ind_table_name` (`table_name`),
KEY `ind_create_time` (`create_time`),
KEY `ind_event_state` (`event_state`),
KEY `ind_unique_id` (`unique_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
14. 参考PHP
# CanalBaseController
<?php namespace console\controllers\canal; use Com\Alibaba\Otter\Canal\Protocol\Entry; use Com\Alibaba\Otter\Canal\Protocol\EntryType; use Com\Alibaba\Otter\Canal\Protocol\EventType; use Com\Alibaba\Otter\Canal\Protocol\RowChange; use common\components\eai\Eai; use common\models\BaseModel; use xingwenge\canal_php\CanalClient; use xingwenge\canal_php\CanalConnectorFactory; use Yii; use yii\console\Controller; class CanalBaseController extends Controller { public function init() { parent::init(); } public function watcher($instance) { try { if (empty(Yii::$app->params['canalClient']) || empty(Yii::$app->params['canalClient'][$instance])) { exit("Canal 未配置实例 {$instance}, 请在 params 中进行配置!"); } $config = Yii::$app->params['canalClient'][$instance]; $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE); # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE); $client->connect($config['server'], $config['port']); $client->checkValid(); $client->subscribe($config['clientid'], $config['instance'], $config['filter']); # $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤 while (true) { $message = $client->getWithoutAck($config['size']); // 无需自动响应成功结果 $message_id = $message->getId(); if (!empty($message_id)) { if ($entries = $message->getEntries()) { foreach ($entries as $entry) { $event_list = self::_getData($entry); (!empty($event_list)) && $this->_insertItem($event_list, $config); } } v($message_id); $client->ack($message_id); // 事务处理完毕后 响应成功结果, 方便下次从错误位置调用 } sleep(1); } $client->disConnect(); } catch (\Exception $e) { echo $e->getMessage(), PHP_EOL; } } /** * 写入binglog 事件数据 */ private function _insertItem($event_list, $config) { $columns = array_keys($event_list[0]); foreach ($event_list as &$v) { $v['origin'] = json_encode($v['origin'], JSON_UNESCAPED_UNICODE); $v['current'] = json_encode($v['current'], JSON_UNESCAPED_UNICODE); $v['updated'] = json_encode($v['updated'], JSON_UNESCAPED_UNICODE); } BaseModel::batchInsertItem($event_list, $columns, $config['item_table'], Eai::writer()); } /** * @param Entry $entry * @throws \Exception */ private static function _getData($entry) { switch ($entry->getEntryType()) { case EntryType::TRANSACTIONBEGIN: case EntryType::TRANSACTIONEND: return false; break; } $row_change = new RowChange(); $row_change->mergeFromString($entry->getStoreValue()); $eventType = $row_change->getEventType(); if ($eventType !== EventType::DELETE && $eventType !== EventType::INSERT && $eventType !== EventType::UPDATE) { return false; } $header = $entry->getHeader(); $event_type = $header->getEventType(); $common_data = [ 'event_type' => $event_type, 'event_name' => EventType::name($event_type), 'logfile_name' => $header->getLogfileName(), 'position' => $header->getLogfileoffset(), 'server_id' => $header->getServerId(), 'db_name' => $header->getSchemaName(), 'table_name' => $header->getTableName(), 'origin' => [], # 原始值 'current' => [], # 当前值 'updated' => [], # 有变更的字段和值 'create_time' => time() ]; $new_data = []; foreach ($row_change->getRowDatas() as $rowData) { switch ($eventType) { case EventType::DELETE: $new_data[] = self::_formatInfo($common_data, $rowData->getBeforeColumns()); break; case EventType::INSERT: $new_data[] = self::_formatInfo($common_data, $rowData->getAfterColumns()); break; default: $new_data[] = self::_formatInfo($common_data, $rowData->getAfterColumns(), $rowData->getBeforeColumns()); break; } } // throw new \Exception('12'); p($new_data); return $new_data; } /** * @desc 兼容批量处理的情况 delete from, insert values * @param $common_data * @param $columns * @param null $before_columns * @return array */ private static function _formatInfo($common_data, $columns, $before_columns = null) { $has_change = !empty($before_columns) ? true : false; foreach ($columns as $key => $column) { $field = $column->getName(); if ($common_data['event_type'] == EventType::DELETE) { $common_data['origin'][$field] = $column->getValue(); } else { $common_data['current'][$field] = $column->getValue(); } if ($has_change && $column->getUpdated()) { $common_data['origin'][$field] = $before_columns[$key]->getValue(); $common_data['updated'][$field] = $column->getValue(); } } if ($common_data['event_type'] == EventType::UPDATE) { $common_data['origin'] = array_merge($common_data['current'], $common_data['origin']); } return $common_data; } }
# CanalClientController
<?php namespace console\controllers\canal; class CanalClientController extends CanalBaseController { public function init() { parent::init(); } /** * Canal 客户端, 观察并记录 Canal Server 变更信息 ./yii canal/canal-client/order */ public function actionOrder() { $this->watcher('order'); } /** * Canal 客户端, 观察并记录 Canal Server 变更信息 ./yii canal/canal-client/log */ public function actionLog() { $this->watcher('log'); } /** * Canal 客户端, 观察并记录 Canal Server 变更信息 ./yii canal/canal-client/example */ public function actionExample() { $this->watcher('example'); } }
# 配置参考 return [ 'canalClient' => [ 'order' => [ 'server' => '172.20.35.104', 'port' => 11111, 'instance' => 'JgCanalOrder', 'clientid' => 1001, 'filter' => 'b2b.bd_order', 'size' => 100, 'item_table' => 'sy_canal_item_order',# 记录数据流的表 ], 'example' => [ 'server' => '127.0.0.1', 'port' => 11111, 'instance' => 'example', 'clientid' => 1009, 'filter' => '.*\\..*', 'size' => 100, 'item_table' => 'sy_canal_item_log', # 记录数据流的表 ], ] ];
快速安装:
https://github.com/alibaba/canal/wiki/QuickStart
参考文档:
https://www.jianshu.com/p/6299048fad66
https://blog.csdn.net/weixin_42462138/article/details/86742352
https://blog.csdn.net/varyall/article/details/79208574
https://www.cnblogs.com/f-zhao/p/9111690.html
http://alibaba.github.io/canal/apidocs/1.0.13/com/alibaba/otter/canal/client/impl/ClusterCanalConnector.html
https://www.bookstack.cn/read/canal-v1.1.4/aa1b771944934f89.md
https://pdf.us/2018/08/30/1819.html
相关资料:
1)一般我们会设置连接超时时间,在客户端设置,其API为:
ZOOAPI zhandle_t *zookeeper_init(const char *host,
watcher_fn fn,
int recv_timeout,
const clientid_t * clientid,
void *context, int flags);
功能: 创建一个句柄(handle)和一个响应(response)这个句柄的会话(session)。
参数:
host:zookeeper主机列表,用逗号间隔。
fn:用于监视的回调函数。
clientid:客户端尝试重连的先前会话的ID,如果不需要重连先前的会话,则设置为 0。客户端可以通过调用 zoo_client_id来访问一个已经连接上的并且有效的会话ID,如果clientid对应的会话超时,或者由于某种原因 clientid变为无效了,那么zookeeper_init 将返回一个非法的 zhandle_t,通过 zhandle_t 的状态可以获知 zookeeper_init 调用失败的原因。 (通常为 ZOO_EXPIRED_SESSION_STATE).
context:暂时用不到,忽略。(TODO)
flags:设置为0,zookeeper开发团队保留以后使用。
大量,包括代码里面的注释上都没有说recv_timeout的意思,按字面意思,肯定不是session_timeout,而是多长时间zk创建连接不成功的时间?
2)在服务器端zoo.conf中有相关设置:minSessionTimeout,最小的客户端超时时间,默认值为2个ticktime,单位是毫秒:
minSessionTimeout
最小的客户端session超时时间,默认值为2个tickTime,单位是毫秒
maxSessionTimeout
最大的客户端session超时时间,默认值为20个tickTime,单位是毫秒
3)于是我们最终修改的zoo.conf文件为:
tickTime=1000
dataDir=/opt/zookeeper/zkdata
dataLogDir=/opt/zookeeper/zklogs
clientPort=2181
initLimit=5
syncLimit=2
minSessionTimeout=16000
maxSessionTimeout=30000
server.1=xxxx:2888:3888
server.2=xxxx:2888:3888
server.3=xxxx:2888:3888
注意,在仅配置了minSessionTimeout参数时,zk会启动失败,提示该参数超过了maxSessionTimeout值,这个时候需要在配置文件把最大值也配上。
4)服务端配置详解:
(1)dataDir
用于存放内存数据库快照的文件夹,同时用于集群的myid文件也存在这个文件夹里。
(2)dataLogDir
用于单独设置transaction log的目录,transaction log分离可以避免和普通log还有快照的竞争。
(3)tickTime
心跳时间,为了确保client-server连接存在的,以毫秒为单位,最小超时时间为两个心跳时间。
(4)clientPort
客户端监听端口。
(5)globalOutstandingLimit
client请求队列的最大长度,防止内存溢出,默认值为1000。
(6)preAllocSize
预分配的Transaction log空间block为proAllocSize KB,默认block为64M,一般不需要更改,除非snapshot过于频繁。
(7)snapCount
在snapCount个snapshot后写一次transaction log,默认值是100,000。
(8)traceFile
用于记录请求的log,打开会影响性能,用于debug,最好不要定义。
(9)maxClientCnxns
最大并发客户端数,用于防止DOS的,默认值是10,设置为0是不加限制。
(11)clientPortBindAddress
可以设置指定的client ip以及端口,不设置的话等于ANY:clientPort
(12)minSessionTimeout
最小的客户端session超时时间,默认值为2个tickTime,单位是毫秒
(13)maxSessionTimeout
最大的客户端session超时时间,默认值为20个tickTime,单位是毫秒
(14)electionAlg
用于选举的实现的参数:
①0:为以原始的基于UDP的方式协作
②1:为不进行用户验证的基于UDP的快速选举
③2:为进行用户验证的基于UDP的快速选举
④3:为基于TCP的快速选举,默认值为3
(15)initLimit
多少个tickTime内,允许其他server连接并初始化数据,如果zooKeeper管理的数据较大,则应相应增大这个值。
(16)syncLimit
多少个tickTime内,允许follower同步,如果follower落后太多,则会被丢弃。
(17)leaderServes
leader是否接受客户端连接。默认值为yes。leader负责协调更新。当更新吞吐量远高于读取吞吐量时,可以设置为不接受客户端连接,以便leader可以专注于同步协调工作。
(18)server.x=[hostname]:nnnnn[:nnnnn]
配置集群里面的主机信息,其中: ①server.x:server.x的x要写在myid文件中,决定当前机器的id