Canal 使用总结 Zookeeper mysql

分类:Linux |

准备工作: 

Canal下载地址: https://github.com/alibaba/canal/releases

Zookeeper 下载: https://zookeeper.apache.org/documentation.html


1. 首先 创建 mysql 授权用户


CREATE USER canal IDENTIFIED BY '123456';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

FLUSH PRIVILEGES;


错误:  启动canal 时报错:

/canal/deployer/CanalLauncher : Unsupported major.minor version 52.0



解决方法:

javac -version

java -version

显示均为1.7.


下载安装1.8版本java jdk:

yum -y install java-1.8.0-openjdk-devel.x86_64



安装完成,验证: java -version 是否为 1.8

如果错误, 可生成新的软链接:

rm /etc/alternatives/javac

rm /etc/alternatives/java

ln -s /usr/lib/jvm/java-1.8.0/bin/java /etc/alternatives/java

ln -s /usr/lib/jvm/java-1.8.0/bin/javac /etc/alternatives/javac


2. 使用Admin版本时, 配置文件将不再生效,

错误:

2020-07-03 22:33:19.338 [destination = example , address = /172.20.35.139:3306 , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /172.20.35.139:3306 has an error, retrying. caused by


3. 当出现 driver.mysql.MysqlConnector.connect 错误时, 请检查 指定的 binlog 与 position 是否有错误.

错误:

2020-07-03 22:56:15.281 [destination = example , address = /172.20.35.139:3306 , EventParser] ERROR c.a.o.canal.parse.inbound.mysql.dbsync.DirectLogFetcher - I/O

error while reading from client socket

2020-07-03 20:03:06.095 [destination = example , address = /172.20.35.139:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: java.io.IOException: connect /172.20.35.139:3306 failure

Caused by: java.io.IOException: connect /172.20.35.139:3306 failure

at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:83)

at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:89)

at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)

at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.


解决方法:

指定binlog 与 position:

canal.instance.master.journal.name 一般在/var/lib/mysql路径下,可以自己决定从哪个binlog开始。

canal.instance.master.position 可以在mysql下通过show master status in 'xxx'(binlog名) limit 0, 10指令来获取你需要的position;


如:

canal.instance.master.journal.name=mysql-bin.000001

canal.instance.master.position=4



4. subscribe 使用:


# table regex 设置白名单,如果在instance.properties配置文件中进行该项配置,则在代码中不应该再配置

# connector.subscribe(".*\\..*");,如果还在代码中配置,则配置文件将会失效!!!

canal.instance.filter.regex = .*\\..*

# table black regex 设置黑名单

canal.instance.filter.black.regex =



所以当你只关心部分库表更新时,设置了canal.instance.filter.regex,一定不要在客户端调用CanalConnector.subscribe(".*\\..*"),不然等于没设置canal.instance.filter.regex。


如果一定要调用CanalConnector.subscribe(".*\\..*"),那么可以设置instance.properties的canal.instance.filter.black.regex参数添加黑名单,过滤非关注库表。


========================================================


mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)

常见例子:

1. 所有表:.* or .*\\..*

2. canal schema下所有表: canal\\..*

3. canal下的以canal打头的表:canal\\.canal.*

4. canal schema下的一张表:canal.test1

5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)

注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)



5. 多个相同客户端连接同一个 instance 时:

则通过串连 轮询方式提供数据, client1, client2, client3




6. 使用canal 帐号密码时报错:

./yii canal/example/watcher

unpack(): Type N: not enough input, need 4, have 0

Exception 'Socket\Raw\Exception' with message 'Socket operation failed: Broken pipe (SOCKET_EPIPE)'


暂时无解决方法, 只要开启了 就会报错误,可能是php处理的问题, 此处的 passwd 在mysql 中执行: select password('123456');

# canal instance user/passwd

# canal.user = canal

# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458




7. canal.properties 配置文件不支持 热加载, instance 支持热加载处理, 保存即生效



8. 错误:

2020-07-09T08:09:23.498472Z 21 [Note] Aborted connection 21 to db: 'unconnected' user: 'canal' host: '172.20.35.92' (Got an error reading communication packets)


原因, 可能是binlog和position找不到导致的错误,

解决方法: 修改配置文件中的meta.dat, 如: canalServer/conf/example/meta.dat 中的 journalName 和 position

{"clientDatas":[{"clientIdentity":{"clientId":1009,"destination":"example","filter":".*\\..*"},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"172.20.35.139","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000008","position":154,"serverId":1,"timestamp":1594281603000}}}],"destination":"example"}




9. 支持 HA Zookeeper 需开启配置 canal.instance.global.spring.xml = classpath:spring/default-instance.xml, 不然的话只会写入 meta.dat 中

Zookeeper 下载: https://zookeeper.apache.org/documentation.html

错误:

2020-07-17 12:27:56,151 [myid:] - ERROR [main:QuorumPeerMain@98] - Invalid config, exiting abnormally

org.apache.zookeeper.server.quorum.QuorumPeerConfig$ConfigException: Error processing /opt/zookeeper/bin/../conf/zoo.cfg

        at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:197)

        at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:124)

        at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:90)

Caused by: java.lang.IllegalArgumentException: myid file is missing

        at org.apache.zookeeper.server.quorum.QuorumPeerConfig.checkValidity(QuorumPeerConfig.java:810)

        at org.apache.zookeeper.server.quorum.QuorumPeerConfig.setupQuorumPeerConfig(QuorumPeerConfig.java:681)

        at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parseProperties(QuorumPeerConfig.java:506)

        at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:193)

        ... 2 more

Invalid config, exiting abnormally


解决方法: 

echo 1 > /tmp/zookeeper/data/myid

此处的1为你的配置中的ID值 可以是server.x 中的x值



按照部署和配置,在单台机器上各自完成配置,演示时instance name为example

① 修改主机:192.168.43.110 的canal.properties,加上zookeeper配置,spring配置选择default-instance.xml


canal.zkServers =192.168.43.110:2181

canal.instance.global.spring.xml = classpath:spring/default-instance.xml


② example目录,并修改instance.properties


canal.instance.mysql.slaveId=1234  ##另外一台机器改成1235,保证slaveId不重复即可

canal.instance.master.address=192.168.43.110:3306


注意: 两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置


10. Zookeeper 中的节点信息查看:  

集群列表:

[zk: 172.20.35.104:2181(CONNECTED) 35] ls /otter/canal/destinations/JgCanalOrder/cluster

[172.20.35.103:11111, 172.20.35.104:11111, 172.20.35.92:11111]


meta 数据库位置信息:

[zk: 172.20.35.104:2181(CONNECTED) 32] get -s /otter/canal/destinations/JgCanalOrder/1001/cursor

{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"172.20.35.139","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000010","position":256911,"serverId":1,"timestamp":1594816430000}}


客户端只能使用 running 里的IP和端口访问:

[zk: 172.20.35.104:2181(CONNECTED) 37] get /otter/canal/destinations/JgCanalOrder/running

{"active":true,"address":"172.20.35.104:11111"}


11. PHP Zookeeper 扩展安装,

pecl install zookeeper-0.7.2

如报错 libzookeeper, 请先安装 Zookeeper 后再执行 pecl,  MacOS 下先执行 brew install zookeeper 后再执行 pecl install zookeeper-0.7.2


12. 当mysql master down机, 如何切换到 slave 上?

请自行修改对应的 meta.dat 或  对应的 zookeeper 中 cursor 接点信息

OR

采用 mysql 的 GTID , 这里 没做测试 暂不知效果


13. 参考mysql表结构


CREATE TABLE `sy_canal_item_order` (

`id` bigint(20) NOT NULL AUTO_INCREMENT,

`event_type` tinyint(1) DEFAULT '0' COMMENT 'DML事件类型 1:insert 2:update 3:delete',

`event_name` varchar(10) DEFAULT '' COMMENT 'DML事件名称',

`logfile_name` varchar(20) DEFAULT '' COMMENT 'binlog名称',

`position` varchar(20) DEFAULT '0' COMMENT 'binlog中的位置',

`server_id` int(5) DEFAULT '0' COMMENT 'Mysql 的Master ID',

`db_name` varchar(20) DEFAULT '' COMMENT 'Mysql 数据库名',

`table_name` varchar(20) DEFAULT '' COMMENT 'Mysql 数据库表名',

`unique_id` varchar(30) DEFAULT '0' COMMENT '表主键值',

`origin` longtext COMMENT '原始值',

`current` longtext COMMENT '当前值',

`updated` longtext COMMENT '有变更的字段和值',

`event_state` tinyint(1) DEFAULT '0' COMMENT '处理结果: 0:未处理, 1:已处理',

`event_state_time` int(10) DEFAULT '0' COMMENT '事件状态变更时间',

`create_time` int(10) DEFAULT '0',

PRIMARY KEY (`id`),

KEY `ind_event_type` (`event_type`),

KEY `ind_event_name` (`event_name`),

KEY `ind_position` (`position`),

KEY `ind_logfile_name` (`logfile_name`),

KEY `ind_db_name` (`db_name`),

KEY `ind_table_name` (`table_name`),

KEY `ind_create_time` (`create_time`),

KEY `ind_event_state` (`event_state`),

KEY `ind_unique_id` (`unique_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;


14. 参考PHP

# CanalBaseController

<?php

namespace console\controllers\canal;

use Com\Alibaba\Otter\Canal\Protocol\Entry;
use Com\Alibaba\Otter\Canal\Protocol\EntryType;
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use common\components\eai\Eai;
use common\models\BaseModel;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use Yii;
use yii\console\Controller;


class CanalBaseController extends Controller
{
    public function init()
    {
        parent::init();
    }

    public function watcher($instance)
    {
        try {
            if (empty(Yii::$app->params['canalClient']) || empty(Yii::$app->params['canalClient'][$instance])) {
                exit("Canal 未配置实例 {$instance}, 请在 params 中进行配置!");
            }
            $config = Yii::$app->params['canalClient'][$instance];
            $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
            # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);

            $client->connect($config['server'], $config['port']);
            $client->checkValid();
            $client->subscribe($config['clientid'], $config['instance'], $config['filter']);
            # $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤

            while (true) {
                $message = $client->getWithoutAck($config['size']); // 无需自动响应成功结果
                $message_id = $message->getId();
                if (!empty($message_id)) {
                    if ($entries = $message->getEntries()) {
                        foreach ($entries as $entry) {
                            $event_list = self::_getData($entry);
                            (!empty($event_list)) && $this->_insertItem($event_list, $config);
                        }
                    }
                    v($message_id);
                    $client->ack($message_id); // 事务处理完毕后 响应成功结果, 方便下次从错误位置调用
                }
                sleep(1);
            }

            $client->disConnect();
        } catch (\Exception $e) {
            echo $e->getMessage(), PHP_EOL;
        }
    }

    /**
     * 写入binglog 事件数据
     */
    private function _insertItem($event_list, $config)
    {
        $columns = array_keys($event_list[0]);
        foreach ($event_list as &$v) {
            $v['origin'] = json_encode($v['origin'], JSON_UNESCAPED_UNICODE);
            $v['current'] = json_encode($v['current'], JSON_UNESCAPED_UNICODE);
            $v['updated'] = json_encode($v['updated'], JSON_UNESCAPED_UNICODE);
        }
        BaseModel::batchInsertItem($event_list, $columns, $config['item_table'], Eai::writer());
    }

    /**
     * @param Entry $entry
     * @throws \Exception
     */
    private static function _getData($entry)
    {
        switch ($entry->getEntryType()) {
            case EntryType::TRANSACTIONBEGIN:
            case EntryType::TRANSACTIONEND:
                return false;
                break;
        }

        $row_change = new RowChange();
        $row_change->mergeFromString($entry->getStoreValue());
        $eventType = $row_change->getEventType();

        if ($eventType !== EventType::DELETE
            && $eventType !== EventType::INSERT
            && $eventType !== EventType::UPDATE) {
            return false;
        }

        $header = $entry->getHeader();
        $event_type = $header->getEventType();
        $common_data = [
            'event_type'   => $event_type,
            'event_name'   => EventType::name($event_type),
            'logfile_name' => $header->getLogfileName(),
            'position'     => $header->getLogfileoffset(),
            'server_id'    => $header->getServerId(),
            'db_name'      => $header->getSchemaName(),
            'table_name'   => $header->getTableName(),
            'origin'       => [], # 原始值
            'current'      => [], # 当前值
            'updated'      => [], # 有变更的字段和值
            'create_time'  => time()
        ];
        $new_data = [];
        foreach ($row_change->getRowDatas() as $rowData) {
            switch ($eventType) {
                case EventType::DELETE:
                    $new_data[] = self::_formatInfo($common_data, $rowData->getBeforeColumns());
                    break;
                case EventType::INSERT:
                    $new_data[] = self::_formatInfo($common_data, $rowData->getAfterColumns());
                    break;
                default:
                    $new_data[] = self::_formatInfo($common_data, $rowData->getAfterColumns(), $rowData->getBeforeColumns());
                    break;
            }
        }
//        throw new \Exception('12');
        p($new_data);
        return $new_data;
    }

    /**
     * @desc 兼容批量处理的情况 delete from,  insert values
     * @param $common_data
     * @param $columns
     * @param null $before_columns
     * @return array
     */
    private static function _formatInfo($common_data, $columns, $before_columns = null)
    {
        $has_change = !empty($before_columns) ? true : false;
        foreach ($columns as $key => $column) {
            $field = $column->getName();
            if ($common_data['event_type'] == EventType::DELETE) {
                $common_data['origin'][$field] = $column->getValue();
            } else {
                $common_data['current'][$field] = $column->getValue();
            }
            if ($has_change && $column->getUpdated()) {
                $common_data['origin'][$field] = $before_columns[$key]->getValue();
                $common_data['updated'][$field] = $column->getValue();
            }
        }
        if ($common_data['event_type'] == EventType::UPDATE) {
            $common_data['origin'] = array_merge($common_data['current'], $common_data['origin']);
        }

        return $common_data;
    }
}


# CanalClientController

<?php

namespace console\controllers\canal;

class CanalClientController extends CanalBaseController
{
    public function init()
    {
        parent::init();
    }

    /**
     * Canal 客户端, 观察并记录 Canal Server 变更信息 ./yii canal/canal-client/order
     */
    public function actionOrder()
    {
        $this->watcher('order');
    }

    /**
     * Canal 客户端, 观察并记录 Canal Server 变更信息 ./yii canal/canal-client/log
     */
    public function actionLog()
    {
        $this->watcher('log');
    }

    /**
     * Canal 客户端, 观察并记录 Canal Server 变更信息 ./yii canal/canal-client/example
     */
    public function actionExample()
    {
        $this->watcher('example');
    }
}
# 配置参考

return [
    'canalClient' => [
        'order'   => [
            'server'     => '172.20.35.104',
            'port'       => 11111,
            'instance'   => 'JgCanalOrder',
            'clientid'   => 1001,
            'filter'     => 'b2b.bd_order',
            'size'       => 100,
            'item_table' => 'sy_canal_item_order',# 记录数据流的表
        ],
        'example' => [
            'server'     => '127.0.0.1',
            'port'       => 11111,
            'instance'   => 'example',
            'clientid'   => 1009,
            'filter'     => '.*\\..*',
            'size'       => 100,
            'item_table' => 'sy_canal_item_log', # 记录数据流的表
        ],
    ]
];

快速安装:

https://github.com/alibaba/canal/wiki/QuickStart


参考文档:

https://www.jianshu.com/p/6299048fad66

https://blog.csdn.net/weixin_42462138/article/details/86742352

https://blog.csdn.net/varyall/article/details/79208574

https://www.cnblogs.com/f-zhao/p/9111690.html


http://alibaba.github.io/canal/apidocs/1.0.13/com/alibaba/otter/canal/client/impl/ClusterCanalConnector.html


https://www.bookstack.cn/read/canal-v1.1.4/aa1b771944934f89.md



https://pdf.us/2018/08/30/1819.html



相关资料:

1)一般我们会设置连接超时时间,在客户端设置,其API为:


ZOOAPI zhandle_t *zookeeper_init(const char *host, 

                    watcher_fn fn,

                                 int recv_timeout,

                                 const clientid_t * clientid,

                                 void *context, int flags);

功能: 创建一个句柄(handle)和一个响应(response)这个句柄的会话(session)。 


参数:


host:zookeeper主机列表,用逗号间隔。 


fn:用于监视的回调函数。 


clientid:客户端尝试重连的先前会话的ID,如果不需要重连先前的会话,则设置为 0。客户端可以通过调用 zoo_client_id来访问一个已经连接上的并且有效的会话ID,如果clientid对应的会话超时,或者由于某种原因 clientid变为无效了,那么zookeeper_init 将返回一个非法的 zhandle_t,通过 zhandle_t 的状态可以获知 zookeeper_init 调用失败的原因。 (通常为 ZOO_EXPIRED_SESSION_STATE).


 context:暂时用不到,忽略。(TODO)


 flags:设置为0,zookeeper开发团队保留以后使用。


 


大量,包括代码里面的注释上都没有说recv_timeout的意思,按字面意思,肯定不是session_timeout,而是多长时间zk创建连接不成功的时间?


 



2)在服务器端zoo.conf中有相关设置:minSessionTimeout,最小的客户端超时时间,默认值为2个ticktime,单位是毫秒:


minSessionTimeout

  最小的客户端session超时时间,默认值为2个tickTime,单位是毫秒

maxSessionTimeout

  最大的客户端session超时时间,默认值为20个tickTime,单位是毫秒

 


3)于是我们最终修改的zoo.conf文件为:


tickTime=1000

dataDir=/opt/zookeeper/zkdata

dataLogDir=/opt/zookeeper/zklogs

clientPort=2181

initLimit=5

syncLimit=2

minSessionTimeout=16000

maxSessionTimeout=30000

server.1=xxxx:2888:3888

server.2=xxxx:2888:3888

server.3=xxxx:2888:3888

 


注意,在仅配置了minSessionTimeout参数时,zk会启动失败,提示该参数超过了maxSessionTimeout值,这个时候需要在配置文件把最大值也配上。


 


4)服务端配置详解:


(1)dataDir

  用于存放内存数据库快照的文件夹,同时用于集群的myid文件也存在这个文件夹里。

(2)dataLogDir

  用于单独设置transaction log的目录,transaction log分离可以避免和普通log还有快照的竞争。

(3)tickTime

  心跳时间,为了确保client-server连接存在的,以毫秒为单位,最小超时时间为两个心跳时间。

(4)clientPort

  客户端监听端口。

(5)globalOutstandingLimit

  client请求队列的最大长度,防止内存溢出,默认值为1000。

(6)preAllocSize

  预分配的Transaction log空间block为proAllocSize KB,默认block为64M,一般不需要更改,除非snapshot过于频繁。

(7)snapCount

  在snapCount个snapshot后写一次transaction log,默认值是100,000。

(8)traceFile

  用于记录请求的log,打开会影响性能,用于debug,最好不要定义。

(9)maxClientCnxns

  最大并发客户端数,用于防止DOS的,默认值是10,设置为0是不加限制。

(11)clientPortBindAddress

  可以设置指定的client ip以及端口,不设置的话等于ANY:clientPort

(12)minSessionTimeout

  最小的客户端session超时时间,默认值为2个tickTime,单位是毫秒

(13)maxSessionTimeout

  最大的客户端session超时时间,默认值为20个tickTime,单位是毫秒

(14)electionAlg

  用于选举的实现的参数:

  ①0:为以原始的基于UDP的方式协作

  ②1:为不进行用户验证的基于UDP的快速选举

  ③2:为进行用户验证的基于UDP的快速选举

  ④3:为基于TCP的快速选举,默认值为3

(15)initLimit

  多少个tickTime内,允许其他server连接并初始化数据,如果zooKeeper管理的数据较大,则应相应增大这个值。

(16)syncLimit

  多少个tickTime内,允许follower同步,如果follower落后太多,则会被丢弃。

(17)leaderServes

  leader是否接受客户端连接。默认值为yes。leader负责协调更新。当更新吞吐量远高于读取吞吐量时,可以设置为不接受客户端连接,以便leader可以专注于同步协调工作。

(18)server.x=[hostname]:nnnnn[:nnnnn]

  配置集群里面的主机信息,其中:
  ①server.x:server.x的x要写在myid文件中,决定当前机器的id