More  

收藏本站

電腦請使用 Ctrl + D 加入最愛
手機請使用 收藏
關閉

小編的世界 優質文選 資料

Canal:同步mysql增量數據工具,一篇詳解核心知識點


字體大小:
2021年1月29日 -
:       
 

老劉情感影視

根據這張圖,老劉把 mysql 的主備複制原理分解為如下流程:

主服務器首先必須啟動二進制日志 binlog,用來記錄任何修改了數據庫數據的事件。

主服務器將數據的改變記錄到二進制 binlog 日志。

從服務器會將主服務器的二進制日志複制到其本地的中繼日志(Relaylog)中。這一步細化地說就是首先從服務器會啟動一個工作線程 I/O 線程,I/O 線程會跟主庫建立一個普通的客戶單連接,然後在主服務器上啟動一個特殊的二進制轉儲(binlog dump)線程,這個 binlog dump 線程會讀取主服務器上二進制日志中的事件,然後向 I/O 線程發送二進制事件,並保存到從服務器上的中繼日志中。

從服務器啟動 SQL 線程,從中繼日志中讀取二進制日志,並且在從服務器本地會再執行一次數據修改操作,從而實現從服務器數據的更新。

那麼 mysql 主備複制實現原理就講完了,大家看完這個流程,能不能猜到 Canal 的工作原理?Canal 核心知識點Canal 的工作原理

Canal 的工作原理就是它模擬 MySQL slave 的交互協議,把自己偽裝為 MySQL slave,向 MySQL master 發動 dump 協議。MySQL master 收到 dump 請求後,就會開始推送 binlog 給 Canal。最後 Canal 就會解析 binlog 對象。Canal 概念

Canal,美,是這樣讀的,意思是水道/管道/渠道,主要用途就是用來同步 MySQL 中的增量數據(可以理解為實時數據),是阿裏巴巴旗下的一款純 Java 開發的開源項目。Canal 架構

server 代表一個 canal 運行實例,對應於一個 JVM。
instance 對應於一個數據隊列,1 個 canal server 對應 1..n 個 instance instance 下的子模塊:

EventParser:數據源接入,模擬 salve 協議和 master 進行交互,協議解析

EventSink:Parser 和 Store 連接器,進行數據過濾,加工,分發的工作

EventStore:數據存儲

MetaManager: 增量訂閱&消費信息管理器

到現在 Canal 的基本概念就講完了,那接下來就要講 Canal 如何同步 mysql 的增量數據。Canal 同步 MySQL 增量數據開啟 mysql binlog

我們用 Canal 同步 mysql 增量數據的前提是 mysql 的 binlog 是開啟的,阿裏雲的 mysql 數據庫是默認開啟 binlog 的,但是如果我們是自己安裝的 mysql 需要手動開啟 binlog 日志功能。

先找到 mysql 的配置文件:etc/my.cnfserver-id=1log-bin=mysql-binbinlog-format=ROW

這裏有一個知識點是關於 binlog 的格式,老劉給大家講講。

binlog 的格式有三種:STATEMENT、ROW、MIXED

ROW 模式(一般就用它)

日志會記錄每一行數據被修改的形式,不會記錄執行 SQL 語句的上下文相關信息,只記錄要修改的數據,哪條數據被修改了,修改成了什麼樣子,只有 value,不會有 SQL 多表關聯的情況。
優點:它僅僅只需要記錄哪條數據被修改了,修改成什麼樣子了,所以它的日志內容會非常清楚地記錄下每一行數據修改的細節,非常容易理解。
缺點:ROW 模式下,特別是數據添加的情況下,所有執行的語句都會記錄到日志中,都將以每行記錄的修改來記錄,這樣會產生大量的日志內容

STATEMENT 模式

每條都會修改數據的 SQL 語句都會被記錄下來。
缺點:由於它是記錄的執行語句,所以,為了讓這些語句在 slave 端也能正確執行,那他還必須記錄每條語句在執行過程中的一些相關信息,也就是上下文信息,以保證所有語句在 slave 端被執行的時候能夠得到和在 master 端執行時候相同的結果。
但目前例如 step()函數在有些版本中就不能被正確複制,在存儲過程中使用了 last-insert-id()函數,可能會使 slave 和 master 上得到不一致的 id,就是會出現數據不一致的情況,ROW 模式下就沒有。

MIXED 模式

以上兩種模式都使用。Canal 實時同步

第一:首先我們要配置環境,在 conf/example/instance.properties 下編輯如下代碼## mysql serverIdcanal.instance.mysql.slaveId = 1234#position info,需要修改成自己的數據庫信息canal.instance.master.address = 127.0.0.1:3306canal.instance.master.journal.name =canal.instance.master.position =canal.instance.master.timestamp =#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#username/password,需要修改成自己的數據庫信息canal.instance.dbUsername = canalcanal.instance.dbPassword = canalcanal.instance.defaultDatabaseName =canal.instance.connectionCharset = UTF-8#table regexcanal.instance.filter.regex = .*\\..* 其中,canal.instance.connectionCharset 代表數據庫的編碼方式對應到 java 中的編碼類型,比如 UTF-8,GBK,ISO-8859-1。

第二:配置完後,就要啟動了。sh bin/startup.sh關閉使用 bin/stop.sh

第三:觀察日志 一般使用 cat 查看 canal/canal.log、example/example.log

第四:啟動客戶端 在 IDEA 中業務代碼,mysql 中如果有增量數據就拉取過來,在 IDEA 控制台打印出來 在 pom.xml 文件中添加:<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.12</version></dependency>

添加客戶端代碼:public class Demo { public static void main(String<> args) { //創建連接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111), "example", "", ""); connector.connect(); //訂閱 connector.subscribe(); connector.rollback(); int batchSize = 1000; int emptyCount = 0; int totalEmptyCount = 100; while (totalEmptyCount > emptyCount) { Message msg = connector.getWithoutAck(batchSize); long id = msg.getId(); List<CanalEntry.Entry> entries = msg.getEntries(); if(id == -1 || entries.size() == 0){ emptyCount++; System.out.println("emptyCount : " + emptyCount); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }else{ emptyCount = 0; printEntry(entries); } connector.ack(id); } } // batch -> entries -> rowchange - rowdata -> cols private static void printEntry(List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries){ if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){ continue; } CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } CanalEntry.EventType eventType = rowChange.getEventType(); System.out.println(entry.getHeader().getLogfileName()+" __ " + entry.getHeader().getSchemaName() + " __ " + eventType); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for(CanalEntry.RowData rowData : rowDatasList){ for(CanalEntry.Column column: rowData.getAfterColumnsList()){ System.out.println(column.getName() + " - " + column.getValue() + " - " + column.getUpdated()); } } } }}

第五:在mysql中寫數據,客戶端就會把增量數據打印到控制台。Canal 的 HA 機制設計

在大數據領域很多框架都會有 HA 機制,Canal 的 HA 分為兩部分,Canal server 和 Canal client 分別有對應的 HA 實現:

canal server:為了減少對 mysql dump 的請求,不同 server 上的 instance 要求同一時間只能有一個處於 running,其他的處於 standby 狀態。

canal client:為了保證有序性,一份 instance 同一時間只能有一個 canal client 進行 get/ack/rollback 操作,否則客戶端接收無法保證有序。

整個 HA 機制的控制主要是依賴了 ZooKeeper 的幾個特性,ZooKeeper 這裏就不講了。

Canal Server:

canal server 要啟動某個 canal instance 時都先向 ZooKeeper 進行一次嘗試啟動判斷(創建 EPHEMERAL 節點,誰創建成功就允許誰啟動)。

創建 ZooKeeper 節點成功後,對應的 canal server 就啟動對應的 canal instance,沒有創建成功的 canal instance 就會處於 standby 狀態。

一旦 ZooKeeper 發現 canal server 創建的節點消失後,立即通知其他的 canal server 再次進行步驟 1 的操作,重新選出一個 canal server 啟動 instance。

canal client 每次進行 connect 時,會首先向 ZooKeeper 詢問當前是誰啟動了 canal instance,然後和其建立連接,一旦連接不可用,會重新嘗試 connect。

canal client 的方式和 canal server 方式類似,也是利用 ZooKeeper 的搶占 EPHEMERAL 節點的方式進行控制。

Canal HA 的配置,並把數據實時同步到 kafka 中。

第一:修改 conf/canal.properties 文件canal.zkServers = hadoop02:2181,hadoop03:2181,hadoop04:2181canal.serverMode = kafkacanal.mq.servers = hadoop02:9092,hadoop03:9092,hadoop04:9092

第二:配置 conf/example/example.instancecanal.instance.mysql.slaveId = 790 /兩台canal server的slaveID唯一canal.mq.topic = canal_log //指定將數據發送到kafka的topic數據同步方案總結

講完了 Canal 工具,現在給大家簡單總結下目前常見的數據采集工具,不會涉及架構知識,只是簡單總結,讓大家有個印象。

常見的數據采集工具有:DataX、Flume、Canal、Sqoop、LogStash 等。DataX (處理離線數據)

DataX 是阿裏巴巴開源的一個異構數據源離線同步工具,異構數據源離線同步指的是將源端數據同步到目的端,但是端與端的數據源類型種類繁多,在沒有 DataX 之前,端與端的鏈路將組成一個複雜的網狀結構,非常零散無法把同步核心邏輯抽象出來。

為了解決異構數據源同步問題,DataX 將複雜的網狀的同步鏈路變成了星型數據鏈路,DataX 作為中間傳輸載體負責連接各種數據源。

所以,當需要接入一個新的數據源的時候,只需要將此數據源對接到 DataX,就可以跟已有的數據源做到無縫數據同步。

DataX本身作為離線數據同步框架,采用Framework+plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。

Reader: 它為數據采集模塊,負責采集數據源的數據,將數據發送給Framework。

Writer: 它為數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。

Framework:它用於連接Reader和Writer,作為兩者的數據傳輸通道,並處理緩沖、並發、數據轉換等問題。

DataX的核心架構如下圖:

核心模塊介紹:

DataX完成單個數據同步的作業,我們把它稱之為Job,DataX接收到一個Job之後,將啟動一個進程來完成整個作業同步過程。

DataX Job啟動後,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。

切分多個Task之後,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。

每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動Reader->Channel->Writer的線程來完成任務同步工作。

DataX作業運行完成之後,Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出。

Flume(處理實時數據)

Flume主要應用的場景是同步日志數據,主要包含三個組件:Source、Channel、Sink。

Flume最大的優點就是官網提供了豐富的Source、Channel、Sink,根據不同的業務需求,我們可以在官網查找相關配置。另外,Flume還提供了自定義這些組件的接口。Logstash(處理離線數據)

Logstash就是一根具備實時數據傳輸能力的管道,負責將數據信息從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可以讓你根據自己的需求在中間加上過濾網,Logstash提供了很多功能強大的過濾網來滿足各種應用場景。

Logstash是由JRuby編寫,使用基於消息的簡單架構,在JVM上運行。在管道內的數據流稱之為event,它分為inputs階段、filters階段、outputs階段。Sqoop(處理離線數據)

Sqoop是Hadoop和關系型數據庫之間傳送數據的一種工具,它是用來從關系型數據庫如MySQL到Hadoop的HDFS從Hadoop文件系統導出數據到關系型數據庫。Sqoop底層用的還是MapReducer,用的時候一定要注意數據傾斜。總結

老劉本篇文章主要講述了Canal工具的核心知識點及其數據采集工具的對比,其中數據采集工具只是大致講了講概念和應用,目的也是讓大家有個印象。老劉敢做保證看完這篇文章基本等於入門,剩下的就是練習了。

好啦,同步mysql增量數據的工具Canal的內容就講完了,盡管當前水平可能不及各位大佬,但老劉會努力變得更加優秀,讓各位小夥伴自學從此不求人!

文章都看到這了,點贊關注支持一波!。