《Canal:同步mysql增量數據工具,一篇詳解核心知識點》 老劉情感影視根據這張圖,老劉把 mysql 的主備複制原理分解為如下流程:主服務器首先必須啟動二進制日志 binlog,用來記錄任何修改了數據庫數據的事件。主服務器將數據的改變記錄到二進制 binlog 日志。從服務器會將主服務器的二進制日志複制到其本地的中繼日志(Relaylog)中。這一步細化地說就是首先從服務器會啟動一個工作線程 I/O 線程,I/O 線程會跟主庫建立一個普通的客戶單連接,然後在主服務器上啟動一個特殊的二進制轉儲(binlog dump)線程,這個 binlog dump 線程會讀取主服務器上二進制日志中的事件,然後向 I/O 線程發送二進制事件,並保存到從服務器上的中繼日志中。從服務器啟動 SQL 線程,從中繼日志中讀取二進制日志,並且在從服務器本地會再執行一次數據修改操作,從而實現從服務器數據的更新。那麼 mysql 主備複制實現原理就講完了,大家看完這個流程,能不能猜到 Canal 的工作原理?Canal 核心知識點Canal 的工作原理Canal 的工作原理就是它模擬 MySQL slave 的交互協議,把自己偽裝為 MySQL slave,向 MySQL master 發動 dump 協議。MySQL master 收到 dump 請求後,就會開始推送 binlog 給 Canal。最後 Canal 就會解析 binlog 對象。Canal 概念Canal,美,是這樣讀的,意思是水道/管道/渠道,主要用途就是用來同步 MySQL 中的增量數據(可以理解為實時數據),是阿裏巴巴旗下的一款純 Java 開發的開源項目。Canal 架構 server 代表一個 canal 運行實例,對應於一個 JVM。 instance 對應於一個數據隊列,1 個 canal server 對應 1..n 個 instance instance 下的子模塊:EventParser:數據源接入,模擬 salve 協議和 master 進行交互,協議解析EventSink:Parser 和 Store 連接器,進行數據過濾,加工,分發的工作EventStore:數據存儲MetaManager: 增量訂閱&消費信息管理器到現在 Canal 的基本概念就講完了,那接下來就要講 Canal 如何同步 mysql 的增量數據。Canal 同步 MySQL 增量數據開啟 mysql binlog我們用 Canal 同步 mysql 增量數據的前提是 mysql 的 binlog 是開啟的,阿裏雲的 mysql 數據庫是默認開啟 binlog 的,但是如果我們是自己安裝的 mysql 需要手動開啟 binlog 日志功能。先找到 mysql 的配置文件:etc/my.cnfserver-id=1log-bin=mysql-binbinlog-format=ROW這裏有一個知識點是關於 binlog 的格式,老劉給大家講講。binlog 的格式有三種:STATEMENT、ROW、MIXEDROW 模式(一般就用它)日志會記錄每一行數據被修改的形式,不會記錄執行 SQL 語句的上下文相關信息,只記錄要修改的數據,哪條數據被修改了,修改成了什麼樣子,只有 value,不會有 SQL 多表關聯的情況。 優點:它僅僅只需要記錄哪條數據被修改了,修改成什麼樣子了,所以它的日志內容會非常清楚地記錄下每一行數據修改的細節,非常容易理解。 缺點:ROW 模式下,特別是數據添加的情況下,所有執行的語句都會記錄到日志中,都將以每行記錄的修改來記錄,這樣會產生大量的日志內容STATEMENT 模式每條都會修改數據的 SQL 語句都會被記錄下來。 缺點:由於它是記錄的執行語句,所以,為了讓這些語句在 slave 端也能正確執行,那他還必須記錄每條語句在執行過程中的一些相關信息,也就是上下文信息,以保證所有語句在 slave 端被執行的時候能夠得到和在 master 端執行時候相同的結果。 但目前例如 step()函數在有些版本中就不能被正確複制,在存儲過程中使用了 last-insert-id()函數,可能會使 slave 和 master 上得到不一致的 id,就是會出現數據不一致的情況,ROW 模式下就沒有。MIXED 模式以上兩種模式都使用。Canal 實時同步第一:首先我們要配置環境,在 conf/example/instance.properties 下編輯如下代碼## mysql serverIdcanal.instance.mysql.slaveId = 1234#position info,需要修改成自己的數據庫信息canal.instance.master.address = 127.0.0.1:3306canal.instance.master.journal.name =canal.instance.master.position =canal.instance.master.timestamp =#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#username/password,需要修改成自己的數據庫信息canal.instance.dbUsername = canalcanal.instance.dbPassword = canalcanal.instance.defaultDatabaseName =canal.instance.connectionCharset = UTF-8#table regexcanal.instance.filter.regex = .*\\..* 其中,canal.instance.connectionCharset 代表數據庫的編碼方式對應到 java 中的編碼類型,比如 UTF-8,GBK,ISO-8859-1。第二:配置完後,就要啟動了。sh bin/startup.sh關閉使用 bin/stop.sh第三:觀察日志 一般使用 cat 查看 canal/canal.log、example/example.log第四:啟動客戶端 在 IDEA 中業務代碼,mysql 中如果有增量數據就拉取過來,在 IDEA 控制台打印出來 在 pom.xml 文件中添加:<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.12</version></dependency> 添加客戶端代碼:public class Demo { public static void main(String args) { //創建連接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111), "example", "", ""); connector.connect(); //訂閱 connector.subscribe(); connector.rollback(); int batchSize = 1000; int emptyCount = 0; int totalEmptyCount = 100; while (totalEmptyCount > emptyCount) { Message msg = connector.getWithoutAck(batchSize); long id = msg.getId(); List<CanalEntry.Entry> entries = msg.getEntries(); if(id == -1 || entries.size() == 0){ emptyCount++; System.out.println("emptyCount : " + emptyCount); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }else{ emptyCount = 0; printEntry(entries); } connector.ack(id); } } // batch -> entries -> rowchange - rowdata -> cols private static void printEntry(List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries){ if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){ continue; } CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } CanalEntry.EventType eventType = rowChange.getEventType(); System.out.println(entry.getHeader().getLogfileName()+" __ " + entry.getHeader().getSchemaName() + " __ " + eventType); List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); for(CanalEntry.RowData rowData : rowDatasList){ for(CanalEntry.Column column: rowData.getAfterColumnsList()){ System.out.println(column.getName() + " - " + column.getValue() + " - " + column.getUpdated()); } } } }}第五:在mysql中寫數據,客戶端就會把增量數據打印到控制台。Canal 的 HA 機制設計在大數據領域很多框架都會有 HA 機制,Canal 的 HA 分為兩部分,Canal server 和 Canal client 分別有對應的 HA 實現:canal server:為了減少對 mysql dump 的請求,不同 server 上的 instance 要求同一時間只能有一個處於 running,其他的處於 standby 狀態。canal client:為了保證有序性,一份 instance 同一時間只能有一個 canal client 進行 get/ack/rollback 操作,否則客戶端接收無法保證有序。整個 HA 機制的控制主要是依賴了 ZooKeeper 的幾個特性,ZooKeeper 這裏就不講了。Canal Server:canal server 要啟動某個 canal instance 時都先向 ZooKeeper 進行一次嘗試啟動判斷(創建 EPHEMERAL 節點,誰創建成功就允許誰啟動)。創建 ZooKeeper 節點成功後,對應的 canal server 就啟動對應的 canal instance,沒有創建成功的 canal instance 就會處於 standby 狀態。一旦 ZooKeeper 發現 canal server 創建的節點消失後,立即通知其他的 canal server 再次進行步驟 1 的操作,重新選出一個 canal server 啟動 instance。canal client 每次進行 connect 時,會首先向 ZooKeeper 詢問當前是誰啟動了 canal instance,然後和其建立連接,一旦連接不可用,會重新嘗試 connect。canal client 的方式和 canal server 方式類似,也是利用 ZooKeeper 的搶占 EPHEMERAL 節點的方式進行控制。Canal HA 的配置,並把數據實時同步到 kafka 中。第一:修改 conf/canal.properties 文件canal.zkServers = hadoop02:2181,hadoop03:2181,hadoop04:2181canal.serverMode = kafkacanal.mq.servers = hadoop02:9092,hadoop03:9092,hadoop04:9092第二:配置 conf/example/example.instancecanal.instance.mysql.slaveId = 790 /兩台canal server的slaveID唯一canal.mq.topic = canal_log //指定將數據發送到kafka的topic數據同步方案總結講完了 Canal 工具,現在給大家簡單總結下目前常見的數據采集工具,不會涉及架構知識,只是簡單總結,讓大家有個印象。常見的數據采集工具有:DataX、Flume、Canal、Sqoop、LogStash 等。DataX (處理離線數據)DataX 是阿裏巴巴開源的一個異構數據源離線同步工具,異構數據源離線同步指的是將源端數據同步到目的端,但是端與端的數據源類型種類繁多,在沒有 DataX 之前,端與端的鏈路將組成一個複雜的網狀結構,非常零散無法把同步核心邏輯抽象出來。 為了解決異構數據源同步問題,DataX 將複雜的網狀的同步鏈路變成了星型數據鏈路,DataX 作為中間傳輸載體負責連接各種數據源。所以,當需要接入一個新的數據源的時候,只需要將此數據源對接到 DataX,就可以跟已有的數據源做到無縫數據同步。 DataX本身作為離線數據同步框架,采用Framework+plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。Reader: 它為數據采集模塊,負責采集數據源的數據,將數據發送給Framework。Writer: 它為數據寫入模塊,負責不斷向Framework取數據,並將數據寫入到目的端。Framework:它用於連接Reader和Writer,作為兩者的數據傳輸通道,並處理緩沖、並發、數據轉換等問題。DataX的核心架構如下圖: 核心模塊介紹:DataX完成單個數據同步的作業,我們把它稱之為Job,DataX接收到一個Job之後,將啟動一個進程來完成整個作業同步過程。DataX Job啟動後,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便於並發執行。切分多個Task之後,DataX Job會調用Scheduler模塊,根據配置的並發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的並發運行完畢分配好的所有Task,默認單個任務組的並發數量為5。每一個Task都由TaskGroup負責啟動,Task啟動後,會固定啟動Reader->Channel->Writer的線程來完成任務同步工作。DataX作業運行完成之後,Job監控並等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成後Job成功退出。否則,異常退出。Flume(處理實時數據) Flume主要應用的場景是同步日志數據,主要包含三個組件:Source、Channel、Sink。Flume最大的優點就是官網提供了豐富的Source、Channel、Sink,根據不同的業務需求,我們可以在官網查找相關配置。另外,Flume還提供了自定義這些組件的接口。Logstash(處理離線數據) Logstash就是一根具備實時數據傳輸能力的管道,負責將數據信息從管道的輸入端傳輸到管道的輸出端;與此同時這根管道還可以讓你根據自己的需求在中間加上過濾網,Logstash提供了很多功能強大的過濾網來滿足各種應用場景。Logstash是由JRuby編寫,使用基於消息的簡單架構,在JVM上運行。在管道內的數據流稱之為event,它分為inputs階段、filters階段、outputs階段。Sqoop(處理離線數據) Sqoop是Hadoop和關系型數據庫之間傳送數據的一種工具,它是用來從關系型數據庫如MySQL到Hadoop的HDFS從Hadoop文件系統導出數據到關系型數據庫。Sqoop底層用的還是MapReducer,用的時候一定要注意數據傾斜。總結老劉本篇文章主要講述了Canal工具的核心知識點及其數據采集工具的對比,其中數據采集工具只是大致講了講概念和應用,目的也是讓大家有個印象。老劉敢做保證看完這篇文章基本等於入門,剩下的就是練習了。好啦,同步mysql增量數據的工具Canal的內容就講完了,盡管當前水平可能不及各位大佬,但老劉會努力變得更加優秀,讓各位小夥伴自學從此不求人!文章都看到這了,點贊關注支持一波!。 《Canal:同步mysql增量數據工具,一篇詳解核心知識點》完,請繼續朗讀精采文章。 喜歡 小編的世界 e4to.com,請記得按讚、收藏及分享!
音調
速度
音量
語言
Canal:同步mysql增量數據工具,一篇詳解核心知識點
精確朗讀模式適合大多數瀏覽器,也相容於桌上型與行動裝置。
不過,使用Chorme瀏覽器仍存在一些問題,不建議使用Chorme瀏覽器進行精確朗讀。