Canal 使用总结 Zookeeper mysql
准备工作:
Canal下载地址: https://github.com/alibaba/canal/releases
Zookeeper 下载: https://zookeeper.apache.org/documentation.html
1. 首先 创建 mysql 授权用户
CREATE USER canal IDENTIFIED BY '123456';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
错误: 启动canal 时报错:
/canal/deployer/CanalLauncher : Unsupported major.minor version 52.0
解决方法:
javac -version
java -version
显示均为1.7.
下载安装1.8版本java jdk:
yum -y install java-1.8.0-openjdk-devel.x86_64
安装完成,验证: java -version 是否为 1.8
如果错误, 可生成新的软链接:
rm /etc/alternatives/javac
rm /etc/alternatives/java
ln -s /usr/lib/jvm/java-1.8.0/bin/java /etc/alternatives/java
ln -s /usr/lib/jvm/java-1.8.0/bin/javac /etc/alternatives/javac
2. 使用Admin版本时, 配置文件将不再生效,
错误:
2020-07-03 22:33:19.338 [destination = example , address = /172.20.35.139:3306 , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /172.20.35.139:3306 has an error, retrying. caused by
3. 当出现 driver.mysql.MysqlConnector.connect 错误时, 请检查 指定的 binlog 与 position 是否有错误.
错误:
2020-07-03 22:56:15.281 [destination = example , address = /172.20.35.139:3306 , EventParser] ERROR c.a.o.canal.parse.inbound.mysql.dbsync.DirectLogFetcher - I/O
error while reading from client socket
2020-07-03 20:03:06.095 [destination = example , address = /172.20.35.139:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: java.io.IOException: connect /172.20.35.139:3306 failure
Caused by: java.io.IOException: connect /172.20.35.139:3306 failure
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:83)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:89)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.
解决方法:
指定binlog 与 position:
canal.instance.master.journal.name 一般在/var/lib/mysql路径下,可以自己决定从哪个binlog开始。
canal.instance.master.position 可以在mysql下通过show master status in 'xxx'(binlog名) limit 0, 10指令来获取你需要的position;
如:
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=4
4. subscribe 使用:
# table regex 设置白名单,如果在instance.properties配置文件中进行该项配置,则在代码中不应该再配置
# connector.subscribe(".*\\..*");,如果还在代码中配置,则配置文件将会失效!!!
canal.instance.filter.regex = .*\\..*
# table black regex 设置黑名单
canal.instance.filter.black.regex =
所以当你只关心部分库表更新时,设置了canal.instance.filter.regex,一定不要在客户端调用CanalConnector.subscribe(".*\\..*"),不然等于没设置canal.instance.filter.regex。
如果一定要调用CanalConnector.subscribe(".*\\..*"),那么可以设置instance.properties的canal.instance.filter.black.regex参数添加黑名单,过滤非关注库表。
========================================================
mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)
5. 多个相同客户端连接同一个 instance 时:
则通过串连 轮询方式提供数据, client1, client2, client3
6. 使用canal 帐号密码时报错:
./yii canal/example/watcher
unpack(): Type N: not enough input, need 4, have 0
Exception 'Socket\Raw\Exception' with message 'Socket operation failed: Broken pipe (SOCKET_EPIPE)'
暂时无解决方法, 只要开启了 就会报错误,可能是php处理的问题, 此处的 passwd 在mysql 中执行: select password('123456');
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
7. canal.properties 配置文件不支持 热加载, instance 支持热加载处理, 保存即生效
8. 错误:
2020-07-09T08:09:23.498472Z 21 [Note] Aborted connection 21 to db: 'unconnected' user: 'canal' host: '172.20.35.92' (Got an error reading communication packets)
原因, 可能是binlog和position找不到导致的错误,
解决方法: 修改配置文件中的meta.dat, 如: canalServer/conf/example/meta.dat 中的 journalName 和 position
{"clientDatas":[{"clientIdentity":{"clientId":1009,"destination":"example","filter":".*\\..*"},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"172.20.35.139","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000008","position":154,"serverId":1,"timestamp":1594281603000}}}],"destination":"example"}
9. 支持 HA Zookeeper 需开启配置 canal.instance.global.spring.xml = classpath:spring/default-instance.xml, 不然的话只会写入 meta.dat 中
Zookeeper 下载: https://zookeeper.apache.org/documentation.html
错误:
2020-07-17 12:27:56,151 [myid:] - ERROR [main:QuorumPeerMain@98] - Invalid config, exiting abnormally
org.apache.zookeeper.server.quorum.QuorumPeerConfig$ConfigException: Error processing /opt/zookeeper/bin/../conf/zoo.cfg
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:197)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:124)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:90)
Caused by: java.lang.IllegalArgumentException: myid file is missing
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.checkValidity(QuorumPeerConfig.java:810)
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.setupQuorumPeerConfig(QuorumPeerConfig.java:681)
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parseProperties(QuorumPeerConfig.java:506)
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:193)
... 2 more
Invalid config, exiting abnormally
解决方法:
echo 1 > /tmp/zookeeper/data/myid
此处的1为你的配置中的ID值 可以是server.x 中的x值
按照部署和配置,在单台机器上各自完成配置,演示时instance name为example
① 修改主机:192.168.43.110 的canal.properties,加上zookeeper配置,spring配置选择default-instance.xml
canal.zkServers =192.168.43.110:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
② example目录,并修改instance.properties
canal.instance.mysql.slaveId=1234 ##另外一台机器改成1235,保证slaveId不重复即可
canal.instance.master.address=192.168.43.110:3306
注意: 两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置
10. Zookeeper 中的节点信息查看:
集群列表:
[zk: 172.20.35.104:2181(CONNECTED) 35] ls /otter/canal/destinations/JgCanalOrder/cluster
[172.20.35.103:11111, 172.20.35.104:11111, 172.20.35.92:11111]
meta 数据库位置信息:
[zk: 172.20.35.104:2181(CONNECTED) 32] get -s /otter/canal/destinations/JgCanalOrder/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"172.20.35.139","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000010","position":256911,"serverId":1,"timestamp":1594816430000}}
客户端只能使用 running 里的IP和端口访问:
[zk: 172.20.35.104:2181(CONNECTED) 37] get /otter/canal/destinations/JgCanalOrder/running
{"active":true,"address":"172.20.35.104:11111"}
11. PHP Zookeeper 扩展安装,
pecl install zookeeper-0.7.2
如报错 libzookeeper, 请先安装 Zookeeper 后再执行 pecl, MacOS 下先执行 brew install zookeeper 后再执行 pecl install zookeeper-0.7.2
12. 当mysql master down机, 如何切换到 slave 上?
请自行修改对应的 meta.dat 或 对应的 zookeeper 中 cursor 接点信息
OR
采用 mysql 的 GTID , 这里 没做测试 暂不知效果
13. 参考mysql表结构
CREATE TABLE `sy_canal_item_order` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`event_type` tinyint(1) DEFAULT '0' COMMENT 'DML事件类型 1:insert 2:update 3:delete',
`event_name` varchar(10) DEFAULT '' COMMENT 'DML事件名称',
`logfile_name` varchar(20) DEFAULT '' COMMENT 'binlog名称',
`position` varchar(20) DEFAULT '0' COMMENT 'binlog中的位置',
`server_id` int(5) DEFAULT '0' COMMENT 'Mysql 的Master ID',
`db_name` varchar(20) DEFAULT '' COMMENT 'Mysql 数据库名',
`table_name` varchar(20) DEFAULT '' COMMENT 'Mysql 数据库表名',
`unique_id` varchar(30) DEFAULT '0' COMMENT '表主键值',
`origin` longtext COMMENT '原始值',
`current` longtext COMMENT '当前值',
`updated` longtext COMMENT '有变更的字段和值',
`event_state` tinyint(1) DEFAULT '0' COMMENT '处理结果: 0:未处理, 1:已处理',
`event_state_time` int(10) DEFAULT '0' COMMENT '事件状态变更时间',
`create_time` int(10) DEFAULT '0',
PRIMARY KEY (`id`),
KEY `ind_event_type` (`event_type`),
KEY `ind_event_name` (`event_name`),
KEY `ind_position` (`position`),
KEY `ind_logfile_name` (`logfile_name`),
KEY `ind_db_name` (`db_name`),
KEY `ind_table_name` (`table_name`),
KEY `ind_create_time` (`create_time`),
KEY `ind_event_state` (`event_state`),
KEY `ind_unique_id` (`unique_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
14. 参考PHP
# CanalBaseController
<?php
namespace console\controllers\canal;
use Com\Alibaba\Otter\Canal\Protocol\Entry;
use Com\Alibaba\Otter\Canal\Protocol\EntryType;
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use common\components\eai\Eai;
use common\models\BaseModel;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use Yii;
use yii\console\Controller;
class CanalBaseController extends Controller
{
public function init()
{
parent::init();
}
public function watcher($instance)
{
try {
if (empty(Yii::$app->params['canalClient']) || empty(Yii::$app->params['canalClient'][$instance])) {
exit("Canal 未配置实例 {$instance}, 请在 params 中进行配置!");
}
$config = Yii::$app->params['canalClient'][$instance];
$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
$client->connect($config['server'], $config['port']);
$client->checkValid();
$client->subscribe($config['clientid'], $config['instance'], $config['filter']);
# $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤
while (true) {
$message = $client->getWithoutAck($config['size']); // 无需自动响应成功结果
$message_id = $message->getId();
if (!empty($message_id)) {
if ($entries = $message->getEntries()) {
foreach ($entries as $entry) {
$event_list = self::_getData($entry);
(!empty($event_list)) && $this->_insertItem($event_list, $config);
}
}
v($message_id);
$client->ack($message_id); // 事务处理完毕后 响应成功结果, 方便下次从错误位置调用
}
sleep(1);
}
$client->disConnect();
} catch (\Exception $e) {
echo $e->getMessage(), PHP_EOL;
}
}
/**
* 写入binglog 事件数据
*/
private function _insertItem($event_list, $config)
{
$columns = array_keys($event_list[0]);
foreach ($event_list as &$v) {
$v['origin'] = json_encode($v['origin'], JSON_UNESCAPED_UNICODE);
$v['current'] = json_encode($v['current'], JSON_UNESCAPED_UNICODE);
$v['updated'] = json_encode($v['updated'], JSON_UNESCAPED_UNICODE);
}
BaseModel::batchInsertItem($event_list, $columns, $config['item_table'], Eai::writer());
}
/**
* @param Entry $entry
* @throws \Exception
*/
private static function _getData($entry)
{
switch ($entry->getEntryType()) {
case EntryType::TRANSACTIONBEGIN:
case EntryType::TRANSACTIONEND:
return false;
break;
}
$row_change = new RowChange();
$row_change->mergeFromString($entry->getStoreValue());
$eventType = $row_change->getEventType();
if ($eventType !== EventType::DELETE
&& $eventType !== EventType::INSERT
&& $eventType !== EventType::UPDATE) {
return false;
}
$header = $entry->getHeader();
$event_type = $header->getEventType();
$common_data = [
'event_type' => $event_type,
'event_name' => EventType::name($event_type),
'logfile_name' => $header->getLogfileName(),
'position' => $header->getLogfileoffset(),
'server_id' => $header->getServerId(),
'db_name' => $header->getSchemaName(),
'table_name' => $header->getTableName(),
'origin' => [], # 原始值
'current' => [], # 当前值
'updated' => [], # 有变更的字段和值
'create_time' => time()
];
$new_data = [];
foreach ($row_change->getRowDatas() as $rowData) {
switch ($eventType) {
case EventType::DELETE:
$new_data[] = self::_formatInfo($common_data, $rowData->getBeforeColumns());
break;
case EventType::INSERT:
$new_data[] = self::_formatInfo($common_data, $rowData->getAfterColumns());
break;
default:
$new_data[] = self::_formatInfo($common_data, $rowData->getAfterColumns(), $rowData->getBeforeColumns());
break;
}
}
// throw new \Exception('12');
p($new_data);
return $new_data;
}
/**
* @desc 兼容批量处理的情况 delete from, insert values
* @param $common_data
* @param $columns
* @param null $before_columns
* @return array
*/
private static function _formatInfo($common_data, $columns, $before_columns = null)
{
$has_change = !empty($before_columns) ? true : false;
foreach ($columns as $key => $column) {
$field = $column->getName();
if ($common_data['event_type'] == EventType::DELETE) {
$common_data['origin'][$field] = $column->getValue();
} else {
$common_data['current'][$field] = $column->getValue();
}
if ($has_change && $column->getUpdated()) {
$common_data['origin'][$field] = $before_columns[$key]->getValue();
$common_data['updated'][$field] = $column->getValue();
}
}
if ($common_data['event_type'] == EventType::UPDATE) {
$common_data['origin'] = array_merge($common_data['current'], $common_data['origin']);
}
return $common_data;
}
}
# CanalClientController
<?php
namespace console\controllers\canal;
class CanalClientController extends CanalBaseController
{
public function init()
{
parent::init();
}
/**
* Canal 客户端, 观察并记录 Canal Server 变更信息 ./yii canal/canal-client/order
*/
public function actionOrder()
{
$this->watcher('order');
}
/**
* Canal 客户端, 观察并记录 Canal Server 变更信息 ./yii canal/canal-client/log
*/
public function actionLog()
{
$this->watcher('log');
}
/**
* Canal 客户端, 观察并记录 Canal Server 变更信息 ./yii canal/canal-client/example
*/
public function actionExample()
{
$this->watcher('example');
}
}
# 配置参考 return [ 'canalClient' => [ 'order' => [ 'server' => '172.20.35.104', 'port' => 11111, 'instance' => 'JgCanalOrder', 'clientid' => 1001, 'filter' => 'b2b.bd_order', 'size' => 100, 'item_table' => 'sy_canal_item_order',# 记录数据流的表 ], 'example' => [ 'server' => '127.0.0.1', 'port' => 11111, 'instance' => 'example', 'clientid' => 1009, 'filter' => '.*\\..*', 'size' => 100, 'item_table' => 'sy_canal_item_log', # 记录数据流的表 ], ] ];
快速安装:
https://github.com/alibaba/canal/wiki/QuickStart
参考文档:
https://www.jianshu.com/p/6299048fad66
https://blog.csdn.net/weixin_42462138/article/details/86742352
https://blog.csdn.net/varyall/article/details/79208574
https://www.cnblogs.com/f-zhao/p/9111690.html
http://alibaba.github.io/canal/apidocs/1.0.13/com/alibaba/otter/canal/client/impl/ClusterCanalConnector.html
https://www.bookstack.cn/read/canal-v1.1.4/aa1b771944934f89.md
https://pdf.us/2018/08/30/1819.html
相关资料:
1)一般我们会设置连接超时时间,在客户端设置,其API为:
ZOOAPI zhandle_t *zookeeper_init(const char *host,
watcher_fn fn,
int recv_timeout,
const clientid_t * clientid,
void *context, int flags);
功能: 创建一个句柄(handle)和一个响应(response)这个句柄的会话(session)。
参数:
host:zookeeper主机列表,用逗号间隔。
fn:用于监视的回调函数。
clientid:客户端尝试重连的先前会话的ID,如果不需要重连先前的会话,则设置为 0。客户端可以通过调用 zoo_client_id来访问一个已经连接上的并且有效的会话ID,如果clientid对应的会话超时,或者由于某种原因 clientid变为无效了,那么zookeeper_init 将返回一个非法的 zhandle_t,通过 zhandle_t 的状态可以获知 zookeeper_init 调用失败的原因。 (通常为 ZOO_EXPIRED_SESSION_STATE).
context:暂时用不到,忽略。(TODO)
flags:设置为0,zookeeper开发团队保留以后使用。
大量,包括代码里面的注释上都没有说recv_timeout的意思,按字面意思,肯定不是session_timeout,而是多长时间zk创建连接不成功的时间?
2)在服务器端zoo.conf中有相关设置:minSessionTimeout,最小的客户端超时时间,默认值为2个ticktime,单位是毫秒:
minSessionTimeout
最小的客户端session超时时间,默认值为2个tickTime,单位是毫秒
maxSessionTimeout
最大的客户端session超时时间,默认值为20个tickTime,单位是毫秒
3)于是我们最终修改的zoo.conf文件为:
tickTime=1000
dataDir=/opt/zookeeper/zkdata
dataLogDir=/opt/zookeeper/zklogs
clientPort=2181
initLimit=5
syncLimit=2
minSessionTimeout=16000
maxSessionTimeout=30000
server.1=xxxx:2888:3888
server.2=xxxx:2888:3888
server.3=xxxx:2888:3888
注意,在仅配置了minSessionTimeout参数时,zk会启动失败,提示该参数超过了maxSessionTimeout值,这个时候需要在配置文件把最大值也配上。
4)服务端配置详解:
(1)dataDir
用于存放内存数据库快照的文件夹,同时用于集群的myid文件也存在这个文件夹里。
(2)dataLogDir
用于单独设置transaction log的目录,transaction log分离可以避免和普通log还有快照的竞争。
(3)tickTime
心跳时间,为了确保client-server连接存在的,以毫秒为单位,最小超时时间为两个心跳时间。
(4)clientPort
客户端监听端口。
(5)globalOutstandingLimit
client请求队列的最大长度,防止内存溢出,默认值为1000。
(6)preAllocSize
预分配的Transaction log空间block为proAllocSize KB,默认block为64M,一般不需要更改,除非snapshot过于频繁。
(7)snapCount
在snapCount个snapshot后写一次transaction log,默认值是100,000。
(8)traceFile
用于记录请求的log,打开会影响性能,用于debug,最好不要定义。
(9)maxClientCnxns
最大并发客户端数,用于防止DOS的,默认值是10,设置为0是不加限制。
(11)clientPortBindAddress
可以设置指定的client ip以及端口,不设置的话等于ANY:clientPort
(12)minSessionTimeout
最小的客户端session超时时间,默认值为2个tickTime,单位是毫秒
(13)maxSessionTimeout
最大的客户端session超时时间,默认值为20个tickTime,单位是毫秒
(14)electionAlg
用于选举的实现的参数:
①0:为以原始的基于UDP的方式协作
②1:为不进行用户验证的基于UDP的快速选举
③2:为进行用户验证的基于UDP的快速选举
④3:为基于TCP的快速选举,默认值为3
(15)initLimit
多少个tickTime内,允许其他server连接并初始化数据,如果zooKeeper管理的数据较大,则应相应增大这个值。
(16)syncLimit
多少个tickTime内,允许follower同步,如果follower落后太多,则会被丢弃。
(17)leaderServes
leader是否接受客户端连接。默认值为yes。leader负责协调更新。当更新吞吐量远高于读取吞吐量时,可以设置为不接受客户端连接,以便leader可以专注于同步协调工作。
(18)server.x=[hostname]:nnnnn[:nnnnn]
配置集群里面的主机信息,其中: ①server.x:server.x的x要写在myid文件中,决定当前机器的id