午夜视频免费看_日韩三级电影网站_国产精品久久一级_亚洲一级在线播放_人妻体内射精一区二区三区_91夜夜揉人人捏人人添红杏_91福利在线导航_国产又粗又猛又黄又爽无遮挡_欧美日韩一区在线播放_中文字幕一区二区三区四区不卡 _日日夜夜精品视频免费观看_欧美韩日一区二区三区

主頁(yè) > 知識(shí)庫(kù) > 基于Docker結(jié)合Canal實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸功能

基于Docker結(jié)合Canal實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸功能

熱門標(biāo)簽:蕪湖呼叫中心外呼系統(tǒng)哪家強(qiáng) 咸陽(yáng)穩(wěn)定外呼系統(tǒng)公司 電銷機(jī)器人怎么錄音 天津外呼系統(tǒng)運(yùn)營(yíng)商 貴陽(yáng)語(yǔ)音電銷機(jī)器人 征服者快捷酒店地圖標(biāo)注 ec外呼系統(tǒng)怎么樣 漯河電銷外呼系統(tǒng)價(jià)格 貴港市機(jī)器人外呼系統(tǒng)團(tuán)隊(duì)

Canal的介紹

Canal的歷史由來(lái)

在早期的時(shí)候,阿里巴巴公司因?yàn)楹贾莺兔绹?guó)兩個(gè)地方的機(jī)房都部署了數(shù)據(jù)庫(kù)實(shí)例,但因?yàn)榭鐧C(jī)房同步數(shù)據(jù)的業(yè)務(wù)需求 ,便孕育而生出了Canal,主要是基于trigger(觸發(fā)器)的方式獲取增量變更。從2010年開始,阿里巴巴公司開始逐步嘗試數(shù)據(jù)庫(kù)日志解析,獲取增量變更的數(shù)據(jù)進(jìn)行同步,由此衍生出了增量訂閱和消費(fèi)業(yè)務(wù)。

當(dāng)前的Canal支持的數(shù)據(jù)源端MySQL版本包括:5.1.x 、5.5.x 、5.6.x、5.7.x、8.0.x。

Canal的應(yīng)用場(chǎng)景

目前普遍基于日志增量訂閱和消費(fèi)的業(yè)務(wù),主要包括:

  1. 基于數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)
  2. 數(shù)據(jù)庫(kù)鏡像 數(shù)據(jù)庫(kù)實(shí)時(shí)備份
  3. 索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等)
  4. 業(yè)務(wù)Cache刷新
  5. 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理
  6. Canal的工作原理

在介紹Canal的原理之前,我們先來(lái)了解下MySQL主從復(fù)制的原理。

MySQL主從復(fù)制原理

  • MySQL Master將數(shù)據(jù)變更的操作寫入二進(jìn)制日志binary log中, 其中記錄的內(nèi)容叫做二進(jìn)制日志事件binary log events,可以通過(guò)show binlog events命令進(jìn)行查看
  • MySQL Slave會(huì)將Master的binary log中的binary log events拷貝到它的中繼日志relay log
  • MySQL Slave重讀并執(zhí)行relay log中的事件,將數(shù)據(jù)變更映射到它自己的數(shù)據(jù)庫(kù)表中

了解了MySQL的工作原理,我們可以大致猜想到Canal應(yīng)該也是采用類似的邏輯去實(shí)現(xiàn)增量數(shù)據(jù)訂閱的功能,那么接下來(lái)我們看看實(shí)際上Canal的工作原理是怎樣的?

Canal工作原理

  • Canal模擬MySQL Slave的交互協(xié)議,偽裝自己為MySQL Slave,向MySQL Master發(fā)送dump協(xié)議
  • MySQL Master收到dump請(qǐng)求,開始推送binary log給Slave(也就是Canal)
  • Canal解析binary log對(duì)象(數(shù)據(jù)為byte流)

基于這樣的原理與方式,便可以完成數(shù)據(jù)庫(kù)增量日志的獲取解析,提供增量數(shù)據(jù)訂閱和消費(fèi),實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸?shù)墓δ堋?/p>

既然Canal是這樣的一個(gè)框架,又是純Java語(yǔ)言編寫而成,那么我們接下來(lái)就開始學(xué)習(xí)怎么使用它并把它用到我們的實(shí)際工作中。

Canal的Docker環(huán)境準(zhǔn)備

因?yàn)槟壳叭萜骰夹g(shù)的火熱,本文通過(guò)使用Docker來(lái)快速搭建開發(fā)環(huán)境,而傳統(tǒng)方式的環(huán)境搭建,在我們學(xué)會(huì)了Docker容器環(huán)境搭建后,也能自行依葫蘆畫瓢搭建成功。由于本篇主要講解Canal,所以關(guān)于Docker的內(nèi)容不會(huì)涉及太多,主要會(huì)介紹Docker的基本概念和命令使用。 如果你想和更多容器技術(shù)專家交流,可以加我微信liyingjiese,備注『加群』。群里每周都有全球各大公司的最佳實(shí)踐以及行業(yè)最新動(dòng)態(tài) 。

什么是Docker

相信絕大多數(shù)人都使用過(guò)虛擬機(jī)VMware,在使用VMware進(jìn)行環(huán)境搭建的時(shí)候,只需提供了一個(gè)普通的系統(tǒng)鏡像并成功安裝,剩下的軟件環(huán)境與應(yīng)用配置還是如我們?cè)诒緳C(jī)操作一樣在虛擬機(jī)里也操作一遍,而且VMware占用宿主機(jī)的資源較多,容易造成宿主機(jī)卡頓,而且系統(tǒng)鏡像本身也占用過(guò)多空間。

為了便于大家快速理解Docker,便與VMware做對(duì)比來(lái)做介紹,Docker提供了一個(gè)開始,打包,運(yùn)行APP的平臺(tái),把APP(應(yīng)用)和底層infrastructure(基礎(chǔ)設(shè)施)隔離開來(lái)。Docker中最主要的兩個(gè)概念就是鏡像(類似VMware的系統(tǒng)鏡像)與容器(類似VMware里安裝的系統(tǒng))。

什么是Image(鏡像)

  • 文件和meta data的集合(root filesystem)
  • 分層的,并且每一層都可以添加改變刪除文件,成為一個(gè)新的image
  • 不同的image可以共享相同的layer
  • Image本身是read-only的

什么是Container(容器)

  • 通過(guò)Image創(chuàng)建(copy)
  • 在Image layer之上建立一個(gè)container layer(可讀寫)
  • 類比面向?qū)ο螅侯惡蛯?shí)例
  • Image負(fù)責(zé)APP的存儲(chǔ)和分發(fā),Container負(fù)責(zé)運(yùn)行APP

Docker的網(wǎng)絡(luò)介紹

Docker的網(wǎng)絡(luò)類型有三種:

  • Bridge:橋接網(wǎng)絡(luò)。默認(rèn)情況下啟動(dòng)的Docker容器,都是使用Bridge,Docker安裝時(shí)創(chuàng)建的橋接網(wǎng)絡(luò),每次Docker容器重啟時(shí),會(huì)按照順序獲取對(duì)應(yīng)的IP地址,這個(gè)就導(dǎo)致重啟下,Docker的IP地址就變了。
  • None:無(wú)指定網(wǎng)絡(luò)。使用 --network=none,Docker容器就不會(huì)分配局域網(wǎng)的IP。
  • Host:主機(jī)網(wǎng)絡(luò)。使用--network=host,此時(shí),Docker容器的網(wǎng)絡(luò)會(huì)附屬在主機(jī)上,兩者是互通的。例如,在容器中運(yùn)行一個(gè)Web服務(wù),監(jiān)聽(tīng)8080端口,則主機(jī)的8080端口就會(huì)自動(dòng)映射到容器中。

創(chuàng)建自定義網(wǎng)絡(luò):(設(shè)置固定IP)

docker network create --subnet=172.18.0.0/16 mynetwork

查看存在的網(wǎng)絡(luò)類型docker network ls:

搭建Canal環(huán)境

附上Docker的下載安裝地址==> Docker Download 。

下載Canal鏡像docker pull canal/canal-server

下載MySQL鏡像docker pull mysql,下載過(guò)的則如下圖:

查看已經(jīng)下載好的鏡像docker images:

接下來(lái)通過(guò)鏡像生成MySQL容器與canal-server容器:

##生成mysql容器
docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root mysql
##生成canal-server容器
docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server
## 命令介紹
--net mynetwork #使用自定義網(wǎng)絡(luò)
--ip #指定分配ip

查看Docker中運(yùn)行的容器docker ps:

MySQL的配置修改

以上只是初步準(zhǔn)備好了基礎(chǔ)的環(huán)境,但是怎么讓Canal偽裝成Salve并正確獲取MySQL中的binary log呢?

對(duì)于自建MySQL,需要先開啟Binlog寫入功能,配置binlog-format為ROW模式,通過(guò)修改MySQL配置文件來(lái)開啟bin_log,使用find / -name my.cnf查找my.cnf,修改文件內(nèi)容如下:

[mysqld]
log-bin=mysql-bin # 開啟binlog
binlog-format=ROW # 選擇ROW模式
server_id=1 # 配置MySQL replaction需要定義,不要和Canal的slaveId重復(fù)

進(jìn)入MySQL容器docker exec -it mysql bash。

創(chuàng)建鏈接MySQL的賬號(hào)Canal并授予作為MySQL slave的權(quán)限,如果已有賬戶可直接GRANT:

mysql -uroot -proot
# 創(chuàng)建賬號(hào)
CREATE USER canal IDENTIFIED BY 'canal'; 
# 授予權(quán)限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
# 刷新并應(yīng)用
FLUSH PRIVILEGES;

數(shù)據(jù)庫(kù)重啟后,簡(jiǎn)單測(cè)試 my.cnf 配置是否生效:

show variables like 'log_bin';
show variables like 'log_bin';
show master status;

canal-server的配置修改

進(jìn)入canal-server容器docker exec -it canal-server bash

編輯canal-server的配置vi canal-server/conf/example/instance.properties

更多配置請(qǐng)參考==>Canal配置說(shuō)明 。

重啟canal-server容器docker restart canal-server 進(jìn)入容器查看啟動(dòng)日志:

docker exec -it canal-server bash
tail -100f canal-server/logs/example/example.log

至此,我們的環(huán)境工作準(zhǔn)備完成!

拉取數(shù)據(jù)并同步保存到ElasticSearch

本文的ElasticSearch也是基于Docker環(huán)境搭建,所以讀者可執(zhí)行如下命令:

# 下載對(duì)鏡像
docker pull elasticsearch:7.1.1
docker pull mobz/elasticsearch-head:5-alpine
# 創(chuàng)建容器并運(yùn)行
docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1
docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine

環(huán)境已經(jīng)準(zhǔn)備好了,現(xiàn)在就要開始我們的編碼實(shí)戰(zhàn)部分了,怎么通過(guò)應(yīng)用程序去獲取Canal解析后的binlog數(shù)據(jù)。首先我們基于Spring Boot搭建一個(gè)canal demo應(yīng)用。結(jié)構(gòu)如下圖所示:

Student.java

package com.example.canal.study.pojo;
import lombok.Data;
import java.io.Serializable;
// @Data 用戶生產(chǎn)getter、setter方法
@Data
public class Student implements Serializable {
private String id;
private String name;
private int age;
private String sex;
private String city;
} 

CanalConfig.java

package com.example.canal.study.common;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetSocketAddress;
/**
* @author haha
*/
@Configuration
public class CanalConfig {
// @Value 獲取 application.properties配置中端內(nèi)容
@Value("${canal.server.ip}")
private String canalIp;
@Value("${canal.server.port}")
private Integer canalPort;
@Value("${canal.destination}")
private String destination;
@Value("${elasticSearch.server.ip}")
private String elasticSearchIp;
@Value("${elasticSearch.server.port}")
private Integer elasticSearchPort;
@Value("${zookeeper.server.ip}")
private String zkServerIp;
// 獲取簡(jiǎn)單canal-server連接
@Bean
public CanalConnector canalSimpleConnector() {
 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, "", "");
 return canalConnector;
}
// 通過(guò)連接zookeeper獲取canal-server連接
@Bean
public CanalConnector canalHaConnector() {
 CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, "", "");
 return canalConnector;
}
// elasticsearch 7.x客戶端
@Bean
public RestHighLevelClient restHighLevelClient() {
 RestHighLevelClient client = new RestHighLevelClient(
   RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort))
 );
 return client;
}
} 

CanalDataParser.java

由于這個(gè)類的代碼較多,文中則摘出其中比較重要的部分,其它部分代碼可從GitHub上獲取:

public static class TwoTuple<A, B> {
 public final A eventType;
 public final B columnMap;
 public TwoTuple(A a, B b) {
  eventType = a;
  columnMap = b;
 }
}
public static List<TwoTuple<EventType, Map>> printEntry(List<Entry> entrys) {
 List<TwoTuple<EventType, Map>> rows = new ArrayList<>();
 for (Entry entry : entrys) {
  // binlog event的事件事件
  long executeTime = entry.getHeader().getExecuteTime();
  // 當(dāng)前應(yīng)用獲取到該binlog鎖延遲的時(shí)間
  long delayTime = System.currentTimeMillis() - executeTime;
  Date date = new Date(entry.getHeader().getExecuteTime());
  SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  // 當(dāng)前的entry(binary log event)的條目類型屬于事務(wù)
  if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
   if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
    TransactionBegin begin = null;
    try {
     begin = TransactionBegin.parseFrom(entry.getStoreValue());
    } catch (InvalidProtocolBufferException e) {
     throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
    }
    // 打印事務(wù)頭信息,執(zhí)行的線程id,事務(wù)耗時(shí)
    logger.info(transaction_format,
      new Object[]{entry.getHeader().getLogfileName(),
        String.valueOf(entry.getHeader().getLogfileOffset()),
        String.valueOf(entry.getHeader().getExecuteTime()),
        simpleDateFormat.format(date),
        entry.getHeader().getGtid(),
        String.valueOf(delayTime)});
    logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
    printXAInfo(begin.getPropsList());
   } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
    TransactionEnd end = null;
    try {
     end = TransactionEnd.parseFrom(entry.getStoreValue());
    } catch (InvalidProtocolBufferException e) {
     throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
    }
    // 打印事務(wù)提交信息,事務(wù)id
    logger.info("----------------\n");
    logger.info(" END ----> transaction id: {}", end.getTransactionId());
    printXAInfo(end.getPropsList());
    logger.info(transaction_format,
      new Object[]{entry.getHeader().getLogfileName(),
        String.valueOf(entry.getHeader().getLogfileOffset()),
        String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
        entry.getHeader().getGtid(), String.valueOf(delayTime)});
   }
   continue;
  }
  // 當(dāng)前entry(binary log event)的條目類型屬于原始數(shù)據(jù)
  if (entry.getEntryType() == EntryType.ROWDATA) {
   RowChange rowChage = null;
   try {
    // 獲取儲(chǔ)存的內(nèi)容
    rowChage = RowChange.parseFrom(entry.getStoreValue());
   } catch (Exception e) {
    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
   }
   // 獲取當(dāng)前內(nèi)容的事件類型
   EventType eventType = rowChage.getEventType();
   logger.info(row_format,
     new Object[]{entry.getHeader().getLogfileName(),
       String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
       entry.getHeader().getTableName(), eventType,
       String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
       entry.getHeader().getGtid(), String.valueOf(delayTime)});
   // 事件類型是query或數(shù)據(jù)定義語(yǔ)言DDL直接打印sql語(yǔ)句,跳出繼續(xù)下一次循環(huán)
   if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
    logger.info(" sql ----> " + rowChage.getSql() + SEP);
    continue;
   }
   printXAInfo(rowChage.getPropsList());
   // 循環(huán)當(dāng)前內(nèi)容條目的具體數(shù)據(jù)
   for (RowData rowData : rowChage.getRowDatasList()) {
    List<CanalEntry.Column> columns;
    // 事件類型是delete返回刪除前的列內(nèi)容,否則返回改變后列的內(nèi)容
    if (eventType == CanalEntry.EventType.DELETE) {
     columns = rowData.getBeforeColumnsList();
    } else {
     columns = rowData.getAfterColumnsList();
    }
    HashMap<String, Object> map = new HashMap<>(16);
    // 循環(huán)把列的name與value放入map中
    for (Column column: columns){
     map.put(column.getName(), column.getValue());
    }
    rows.add(new TwoTuple<>(eventType, map));
   }
  }
 }
 return rows;
} 

ElasticUtils.java

package com.example.canal.study.common;
import com.alibaba.fastjson.JSON;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Map;
/**
* @author haha
*/
@Slf4j
@Component
public class ElasticUtils {
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
 * 新增
 * @param student 
 * @param index 索引
 */
public void saveEs(Student student, String index) {
 IndexRequest indexRequest = new IndexRequest(index)
   .id(student.getId())
   .source(JSON.toJSONString(student), XContentType.JSON)
   .opType(DocWriteRequest.OpType.CREATE);
 try {
  IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
  log.info("保存數(shù)據(jù)至ElasticSearch成功:{}", response.getId());
 } catch (IOException e) {
  log.error("保存數(shù)據(jù)至elasticSearch失敗: {}", e);
 }
}
/**
 * 查看
 * @param index 索引
 * @param id _id
 * @throws IOException
 */
public void getEs(String index, String id) throws IOException {
 GetRequest getRequest = new GetRequest(index, id);
 GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
 Map<String, Object> fields = response.getSource();
 for (Map.Entry<String, Object> entry : fields.entrySet()) {
  System.out.println(entry.getKey() + ":" + entry.getValue());
 }
}
/**
 * 更新
 * @param student
 * @param index 索引
 * @throws IOException
 */
public void updateEs(Student student, String index) throws IOException {
 UpdateRequest updateRequest = new UpdateRequest(index, student.getId());
 updateRequest.upsert(JSON.toJSONString(student), XContentType.JSON);
 UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
 log.info("更新數(shù)據(jù)至ElasticSearch成功:{}", response.getId());
}
/**
 * 根據(jù)id刪除數(shù)據(jù)
 * @param index 索引
 * @param id _id
 * @throws IOException
 */
public void DeleteEs(String index, String id) throws IOException {
 DeleteRequest deleteRequest = new DeleteRequest(index, id);
 DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
 log.info("刪除數(shù)據(jù)至ElasticSearch成功:{}", response.getId());
}
} 

BinLogElasticSearch.java

package com.example.canal.study.action;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.example.canal.study.common.CanalDataParser;
import com.example.canal.study.common.ElasticUtils;
import com.example.canal.study.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* @author haha
*/
@Slf4j
@Component
public class BinLogElasticSearch {
@Autowired
private CanalConnector canalSimpleConnector;
@Autowired
private ElasticUtils elasticUtils;
//@Qualifier("canalHaConnector")使用名為canalHaConnector的bean
@Autowired
@Qualifier("canalHaConnector")
private CanalConnector canalHaConnector;
public void binLogToElasticSearch() throws IOException {
 openCanalConnector(canalHaConnector);
 // 輪詢拉取數(shù)據(jù)
 Integer batchSize = 5 * 1024;
 while (true) {
  Message message = canalHaConnector.getWithoutAck(batchSize);
//   Message message = canalSimpleConnector.getWithoutAck(batchSize);
  long id = message.getId();
  int size = message.getEntries().size();
  log.info("當(dāng)前監(jiān)控到binLog消息數(shù)量{}", size);
  if (id == -1 || size == 0) {
   try {
    // 等待2秒
    Thread.sleep(2000);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  } else {
   //1. 解析message對(duì)象
   List<CanalEntry.Entry> entries = message.getEntries();
   List<CanalDataParser.TwoTuple<CanalEntry.EventType, Map>> rows = CanalDataParser.printEntry(entries);
   for (CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple : rows) {
    if(tuple.eventType == CanalEntry.EventType.INSERT) {
     Student student = createStudent(tuple);
     // 2。將解析出的對(duì)象同步到elasticSearch中
     elasticUtils.saveEs(student, "student_index");
     // 3.消息確認(rèn)已處理
//     canalSimpleConnector.ack(id);
     canalHaConnector.ack(id);
    }
    if(tuple.eventType == CanalEntry.EventType.UPDATE){
     Student student = createStudent(tuple);
     elasticUtils.updateEs(student, "student_index");
     // 3.消息確認(rèn)已處理
//     canalSimpleConnector.ack(id);
     canalHaConnector.ack(id);
    }
    if(tuple.eventType == CanalEntry.EventType.DELETE){
     elasticUtils.DeleteEs("student_index", tuple.columnMap.get("id").toString());
     canalHaConnector.ack(id);
    }
   }
  }
 }
}
/**
 * 封裝數(shù)據(jù)至Student
 * @param tuple
 * @return
 */
private Student createStudent(CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple){
 Student student = new Student();
 student.setId(tuple.columnMap.get("id").toString());
 student.setAge(Integer.parseInt(tuple.columnMap.get("age").toString()));
 student.setName(tuple.columnMap.get("name").toString());
 student.setSex(tuple.columnMap.get("sex").toString());
 student.setCity(tuple.columnMap.get("city").toString());
 return student;
}
/**
 * 打開canal連接
 *
 * @param canalConnector
 */
private void openCanalConnector(CanalConnector canalConnector) {
 //連接CanalServer
 canalConnector.connect();
 // 訂閱destination
 canalConnector.subscribe();
}
/**
 * 關(guān)閉canal連接
 *
 * @param canalConnector
 */
private void closeCanalConnector(CanalConnector canalConnector) {
 //關(guān)閉連接CanalServer
 canalConnector.disconnect();
 // 注銷訂閱destination
 canalConnector.unsubscribe();
}
} 

CanalDemoApplication.java(Spring Boot啟動(dòng)類)

package com.example.canal.study;
import com.example.canal.study.action.BinLogElasticSearch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author haha
*/
@SpringBootApplication
public class CanalDemoApplication implements ApplicationRunner {
@Autowired
private BinLogElasticSearch binLogElasticSearch;
public static void main(String[] args) {
 SpringApplication.run(CanalDemoApplication.class, args);
}
// 程序啟動(dòng)則執(zhí)行run方法
@Override
public void run(ApplicationArguments args) throws Exception {
 binLogElasticSearch.binLogToElasticSearch();
}
} 

application.properties

server.port=8081
spring.application.name = canal-demo
canal.server.ip = 192.168.124.5
canal.server.port = 11111
canal.destination = example
zookeeper.server.ip = 192.168.124.5:2181
zookeeper.sasl.client = false
elasticSearch.server.ip = 192.168.124.5
elasticSearch.server.port = 9200

Canal集群高可用的搭建

通過(guò)上面的學(xué)習(xí),我們知道了單機(jī)直連方式的Canala應(yīng)用。在當(dāng)今互聯(lián)網(wǎng)時(shí)代,單實(shí)例模式逐漸被集群高可用模式取代,那么Canala的多實(shí)例集群方式如何搭建呢!

基于ZooKeeper獲取Canal實(shí)例

準(zhǔn)備ZooKeeper的Docker鏡像與容器:

docker pull zookeeper
docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper
docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server

1、機(jī)器準(zhǔn)備:

  • 運(yùn)行Canal的容器IP: 172.18.0.4 , 172.18.0.8
  • ZooKeeper容器IP:172.18.0.3:2181
  • MySQL容器IP:172.18.0.6:3306

2、按照部署和配置,在單臺(tái)機(jī)器上各自完成配置,演示時(shí)instance name為example。

3、修改canal.properties,加上ZooKeeper配置并修改Canal端口:

canal.port=11113
canal.zkServers=172.18.0.3:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

4、創(chuàng)建example目錄,并修改instance.properties:

canal.instance.mysql.slaveId = 1235 
#之前的canal slaveId是1234,保證slaveId不重復(fù)即可
canal.instance.master.address = 172.18.0.6:3306

注意: 兩臺(tái)機(jī)器上的instance目錄的名字需要保證完全一致,HA模式是依賴于instance name進(jìn)行管理,同時(shí)必須都選擇default-instance.xml配置。

啟動(dòng)兩個(gè)不同容器的Canal,啟動(dòng)后,可以通過(guò)tail -100f logs/example/example.log查看啟動(dòng)日志,只會(huì)看到一臺(tái)機(jī)器上出現(xiàn)了啟動(dòng)成功的日志。

比如我這里啟動(dòng)成功的是 172.18.0.4:

查看一下ZooKeeper中的節(jié)點(diǎn)信息,也可以知道當(dāng)前工作的節(jié)點(diǎn)為172.18.0.4:11111:

[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running 
{"active":true,"address":"172.18.0.4:11111","cid":1} 

客戶端鏈接, 消費(fèi)數(shù)據(jù)

可以通過(guò)指定ZooKeeper地址和Canal的instance name,canal client會(huì)自動(dòng)從ZooKeeper中的running節(jié)點(diǎn)獲取當(dāng)前服務(wù)的工作節(jié)點(diǎn),然后與其建立鏈接:

[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.4:11111","cid":1} 

對(duì)應(yīng)的客戶端編碼可以使用如下形式,上文中的CanalConfig.java中的canalHaConnector就是一個(gè)HA連接:

CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181", "example", "", "");

鏈接成功后,canal server會(huì)記錄當(dāng)前正在工作的canal client信息,比如客戶端IP,鏈接的端口信息等(聰明的你,應(yīng)該也可以發(fā)現(xiàn),canal client也可以支持HA功能):

[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"192.168.124.5:59887","clientId":1001} 

數(shù)據(jù)消費(fèi)成功后,canal server會(huì)在ZooKeeper中記錄下當(dāng)前最后一次消費(fèi)成功的binlog位點(diǎn)(下次你重啟client時(shí),會(huì)從這最后一個(gè)位點(diǎn)繼續(xù)進(jìn)行消費(fèi)):

[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor

{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalName":"binlog.000004","position":2169,"timestamp":1562672817000}} 

停止正在工作的172.18.0.4的canal server:

docker exec -it canal-server bash
cd canal-server/bin
sh stop.sh

這時(shí)172.18.0.8會(huì)立馬啟動(dòng)example instance,提供新的數(shù)據(jù)服務(wù):

[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running
{"active":true,"address":"172.18.0.8:11111","cid":1} 

與此同時(shí),客戶端也會(huì)隨著canal server的切換,通過(guò)獲取ZooKeeper中的最新地址,與新的canal server建立鏈接,繼續(xù)消費(fèi)數(shù)據(jù),整個(gè)過(guò)程自動(dòng)完成。

異常與總結(jié)

elasticsearch-head無(wú)法訪問(wèn)Elasticsearch

es與es-head是兩個(gè)獨(dú)立的進(jìn)程,當(dāng)es-head訪問(wèn)es服務(wù)時(shí),會(huì)存在一個(gè)跨域問(wèn)題。所以我們需要修改es的配置文件,增加一些配置項(xiàng)來(lái)解決這個(gè)問(wèn)題,如下:

[root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/
[root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml 
# 文件末尾加上如下配置
http.cors.enabled: true
http.cors.allow-origin: "*"

修改完配置文件后需重啟es服務(wù)。

elasticsearch-head查詢報(bào)406 Not Acceptable

解決方法:

1、進(jìn)入head安裝目錄;

2、cd _site/

3、編輯vendor.js 共有兩處

#6886行 contentType: "application/x-www-form-urlencoded
改成 contentType: "application/json;charset=UTF-8"
 #7574行 var inspectData = s.contentType === "application/x-www-form-urlencoded" &&
改成 var inspectData = s.contentType === "application/json;charset=UTF-8" &&

使用elasticsearch-rest-high-level-client報(bào)org.elasticsearch.action.index.IndexRequest.ifSeqNo

#pom中除了加入依賴
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.1.1</version>
</dependency>
#還需加入
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.1.1</version>
</dependency>

相關(guān)參考: git hub issues 。

為什么ElasticSearch要在7.X版本不能使用type?

參考: 為什么ElasticSearch要在7.X版本去掉type?

使用spring-data-elasticsearch.jar報(bào)org.elasticsearch.client.transport.NoNodeAvailableException

由于本文使用的是elasticsearch7.x以上的版本,目前spring-data-elasticsearch底層采用es官方TransportClient,而es官方計(jì)劃放棄TransportClient,工具以es官方推薦的RestHighLevelClient進(jìn)行調(diào)用請(qǐng)求。 可參考 RestHighLevelClient API 。

設(shè)置Docker容器開啟啟動(dòng)

如果創(chuàng)建時(shí)未指定 --restart=always ,可通過(guò)update 命令
docker update --restart=always [containerID]

Docker for Mac network host模式不生效

Host模式是為了性能,但是這卻對(duì)Docker的隔離性造成了破壞,導(dǎo)致安全性降低。 在性能場(chǎng)景下,可以用--netwokr host開啟Host模式,但需要注意的是,如果你用Windows或Mac本地啟動(dòng)容器的話,會(huì)遇到Host模式失效的問(wèn)題。原因是Host模式只支持Linux宿主機(jī)。

參見(jiàn)官方文檔:  https://docs.docker.com/network/host/  。

客戶端連接ZooKeeper報(bào)authenticate using SASL(unknow error)

  • zookeeper.jar與Dokcer中的ZooKeeper版本不一致
  • zookeeper.jar使用了3.4.6之前的版本

出現(xiàn)這個(gè)錯(cuò)的意思是ZooKeeper作為外部應(yīng)用需要向系統(tǒng)申請(qǐng)資源,申請(qǐng)資源的時(shí)候需要通過(guò)認(rèn)證,而sasl是一種認(rèn)證方式,我們想辦法來(lái)繞過(guò)sasl認(rèn)證。避免等待,來(lái)提高效率。

在項(xiàng)目代碼中加入System.setProperty("zookeeper.sasl.client", "false");,如果是Spring Boot項(xiàng)目可以在application.properties中加入zookeeper.sasl.client=false

參考: Increased CPU usage by unnecessary SASL checks 。

如果更換canal.client.jar中依賴的zookeeper.jar的版本

把Canal的官方源碼下載到本機(jī)git clone  https://github.com/alibaba/canal.git  ,然后修改client模塊下pom.xml文件中關(guān)于ZooKeeper的內(nèi)容,然后重新mvn install:

把自己項(xiàng)目依賴的包替換為剛剛mvn install生產(chǎn)的包:

關(guān)于選型的取舍

總結(jié)

以上所述是小編給大家介紹的基于Docker結(jié)合Canal實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸功能,希望對(duì)大家有所幫助,如果大家有任何疑問(wèn)請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)腳本之家網(wǎng)站的支持!
如果你覺(jué)得本文對(duì)你有幫助,歡迎轉(zhuǎn)載,煩請(qǐng)注明出處,謝謝!

標(biāo)簽:西藏 濰坊 怒江 淮北 東營(yíng) 西寧 攀枝花 香港

巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《基于Docker結(jié)合Canal實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸功能》,本文關(guān)鍵詞  基于,Docker,結(jié)合,Canal,實(shí)現(xiàn),;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問(wèn)題,煩請(qǐng)?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無(wú)關(guān)。
  • 相關(guān)文章
  • 下面列出與本文章《基于Docker結(jié)合Canal實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸功能》相關(guān)的同類信息!
  • 本頁(yè)收集關(guān)于基于Docker結(jié)合Canal實(shí)現(xiàn)MySQL實(shí)時(shí)增量數(shù)據(jù)傳輸功能的相關(guān)信息資訊供網(wǎng)民參考!
  • 推薦文章
    国内精品久久久| 538任你躁在线精品免费| 清纯唯美亚洲综合| 亚洲第一色在线| 一区二区高清视频在线观看| 国产在线精品一区二区夜色| 欧美男人亚洲天堂| 中文字幕成人动漫| 成人性生生活性生交12| 无遮挡亚洲一区| 国产精品久久久久久一区二区| 亚洲女同性videos| 色天天综合色天天久久| 久久精品人人做人人综合| 婷婷在线免费观看| 久草视频一区二区| www.4hu95.com四虎| 红桃视频 国产| 中文字幕色呦呦| 国产66精品久久久久999小说| 欧美国产日产韩国视频| 亚洲激情在线观看视频免费| 色婷婷综合久久久久中文一区二区 | 欧美激情亚洲综合一区| 日韩精品视频三区| 精品污污网站免费看| 亚洲视频一区在线| 99久久综合99久久综合网站| 亚洲欧美日本在线观看| 中文字幕久久网| 日韩激情综合网| 99re这里只有| 亚洲va在线va天堂va偷拍| 激情五月婷婷六月| 日韩精品久久久| 亚洲自拍偷拍在线| 欧美一级电影免费在线观看| 中文字幕在线亚洲| 亚洲第一中文字幕| 精品视频在线免费| 午夜私人影院久久久久| 国产精品电影一区二区| 99久久伊人精品| 韩国欧美国产1区| 亚洲av成人精品一区二区三区在线播放| 午夜精品一区二| 免费网站观看www在线观| 精品人妻一区二区三区视频| 久久久久久国产精品日本| 日韩 欧美 高清| av在线com| 婷婷精品国产一区二区三区日韩 | 国产欧美亚洲日本| 国产精品综合不卡av| 2023亚洲男人天堂| 欧美激情小视频| 久久精品亚洲一区| 中文字幕av一区二区三区谷原希美| 亚洲成人黄色网址| 欧美一二三区精品| 欧美性猛片aaaaaaa做受| 天天综合网天天综合色| 亚洲日本在线观看| 国产精品乱子久久久久| 国产日本亚洲高清| 久久众筹精品私拍模特| 成人精品一区二区三区四区 | 干b视频在线观看| 99riav国产精品视频| 午夜大片在线观看| 久久黄色片网站| 三级在线免费看| av免费网站观看| 久久婷婷五月综合色国产香蕉| 国产夫妻自拍一区| 日本高清视频免费在线观看| 三上悠亚免费在线观看| 日本精品免费视频| 黑人巨大国产9丨视频| 一本久道久久综合| 自拍偷拍视频在线| 偷拍盗摄高潮叫床对白清晰| 一区二区三区av在线| 亚洲一区美女| 欧美与动交zoz0z| 欧美日韩视频免费| 成年人午夜免费视频| 成人免费毛片在线观看| 国产婷婷一区二区三区| 69堂免费视频| 日本中文字幕高清| 中文字幕日韩综合| 欧洲成人午夜精品无码区久久| 国产成人av片| 国产精品无码永久免费不卡| 成都免费高清电影| 一本一本久久a久久| 人妻人人澡人人添人人爽| 免费日韩在线视频| 久久久久久少妇| 成人黄色激情视频| 国产一区二区女内射| 午夜久久久久久久久久| 亚洲三区在线观看无套内射| 美女视频黄免费的久久| 国产精品白丝jk黑袜喷水| caoporm超碰国产精品| 国产亚洲欧美在线| 亚洲蜜臀av乱码久久精品蜜桃| 亚洲永久免费av| 色婷婷久久99综合精品jk白丝| 欧美视频第二页| 日韩久久精品一区| 国产亚洲精品激情久久| 美日韩在线视频| 欧美在线视频观看| 亚洲a中文字幕| 蜜桃91精品入口| 亚洲国产精品影视| 女人扒开屁股爽桶30分钟| 亚洲最大成人在线观看| 久久人妻少妇嫩草av无码专区| www.av天天| 国产一国产二国产三| 中文字幕在线观看精品| 深爱五月激情五月| 国产成人av一区二区| 国产精品视频线看| 欧美午夜激情小视频| 日韩你懂的在线观看| 深夜福利一区二区| 久久久久亚洲精品国产| 成人看片人aa| 蜜桃999成人看片在线观看| 18视频在线观看娇喘| 免费黄色一级网站| 成人无码www在线看免费| 国产精品白嫩白嫩大学美女| 做爰视频毛片视频| 日韩和欧美一区二区| 99久久免费视频.com| 亚洲品质自拍视频| 欧美久久婷婷综合色| 一个人看的www久久| 欧美一级电影免费在线观看| 国产精品国产亚洲精品看不卡15| 一本色道久久综合亚洲精品婷婷| 欧美成人免费高清视频| 中文字幕一区二区久久人妻网站| 国产精品夜夜夜爽阿娇| 中文字幕资源网| 青娱乐精品在线视频| 久久九九久精品国产免费直播| 亚洲成人av一区| 精品欧美久久久| 欧美国产第一页| 国产精品成人一区二区三区| 一区二区三区四区在线视频 | 91精品国产高清久久久久久| 99国产超薄丝袜足j在线观看| 精品久久免费观看| 亚洲第一中文av| 日韩一区二区三区四区视频| 中文字幕理论片| 国内欧美视频一区二区| 一区二区三区在线免费播放| 日韩美女一区二区三区| 久久免费视频网站| 久久亚洲午夜电影| 亚洲一二三区av| 国产又色又爽又高潮免费| 亚洲影院一区二区三区| 国产成人在线影院| 婷婷开心久久网| 亚洲欧美日韩一区二区在线| 国产精品久久久久久久久久ktv | 亚洲最新视频在线| 国产精品三级久久久久久电影| 亚洲人成77777| 欧美视频亚洲图片| 激情五月色婷婷| 日韩电影免费一区| 亚洲欧美日韩久久| 亚洲精品久久久久久久久久久久 | 亚洲成人在线网| 秋霞午夜一区二区| 一本色道婷婷久久欧美| 黄色激情在线观看| 国产精品传媒在线观看| 懂色av一区二区三区蜜臀| 无码av免费一区二区三区试看 | 欧美日韩在线看| 中文字幕日韩av电影| 成人片在线免费看| 男人搞女人网站| 免费在线黄色网| 欧美 日韩 国产 成人 在线 91| 国产午夜精品久久| 日韩欧美成人午夜| 国产va免费精品高清在线| 亚洲小说欧美另类激情| 无遮挡aaaaa大片免费看| 国产精品视频久久久久久| www精品美女久久久tv| 4438亚洲最大| 日本一区二区在线播放| 国产尤物av一区二区三区| 谁有免费的黄色网址| 一级片在线免费观看视频| 91浏览器在线视频| 日韩欧美色综合网站| 国产精品视频精品| 欧美大片在线播放| 神马午夜精品91| 蜜臀av一区二区在线免费观看 | 久久精品成人欧美大片古装| 久久国产精品一区二区三区| 国产又粗又猛大又黄又爽| 国产精品尤物视频| 久久久久国产精品免费免费搜索 | 亚洲高潮无码久久| 午夜精产品一区二区在线观看的| 韩国av免费在线| 亚洲美女免费视频| 色婷婷久久av| 亚洲国产精品一区在线观看不卡| 国产中文字幕一区二区| 欧美一区二区三区成人片在线| 亚洲激情在线激情| 久久天天躁狠狠躁老女人| 亚洲欧美日韩另类精品一区二区三区 | 午夜精品久久久久久久99热黄桃| 18欧美乱大交hd1984| 亚洲桃花岛网站| 欧美一区二区三区成人久久片| 国产调教打屁股xxxx网站| a网站在线观看| 一区二区三区产品免费精品久久75| 色噜噜狠狠狠综合曰曰曰| 五月天亚洲综合小说网| 日本成人午夜影院| 久久精品国产久精国产| 欧美亚洲免费在线一区| 国产精品久久久久久搜索| 丰满少妇在线观看| 老熟妇一区二区三区啪啪| 国产精品国产三级国产aⅴ入口| 在线精品视频视频中文字幕| 手机成人在线| 日本在线一级片| 成人国产精品免费观看视频| 亚洲国产第一页| 久久精品国产理论片免费| 国产精品亚洲无码| 蜜桃av噜噜一区| 欧美一区二区三区性视频| av日韩中文字幕| 欧美xxxxx精品| 日韩高清不卡一区二区| 欧美日韩国产免费| 99re在线观看视频| 国产精品无码毛片| 麻豆中文一区二区| 日韩一区二区三区四区五区六区| wwwxx欧美| 久久无码人妻精品一区二区三区 | 亚洲精品国产精品国自产网站按摩| 国产精品高潮呻吟| 九九精品在线播放| aa在线观看视频| 亚洲视屏在线观看| 亚洲国产精品天堂| 国产福利视频一区| 久久出品必属精品| 午夜在线视频观看| 欧美一区二区视频在线观看2020| 国产一区二区免费在线观看| 天堂久久精品忘忧草| 国产不卡在线一区| 亚洲欧美日韩国产成人| 欧美日韩亚洲在线| 激情小说综合网| 国产精品日日摸夜夜爽| 日本在线不卡视频| 欧美成人性福生活免费看| 欧美精品在线一区| 三级av在线免费观看| 26uuu另类欧美亚洲曰本| 久久精品成人欧美大片| 男女午夜激情视频| 精品人妻一区二区三区换脸明星| 在线观看成人免费视频| 国产成人精品一区二区三区福利| 欧美亚洲色综久久精品国产| 成人国产免费视频| 久久综合伊人77777尤物| 亚洲熟妇av一区二区三区| 国产激情视频在线播放| 欧洲激情一区二区| 久久草视频在线看| 91porn在线视频| 最新日韩在线视频| 国产精品久久久久久久7电影| 亚洲av成人精品一区二区三区| 国产美女在线观看一区| 在线播放精品一区二区三区| 美女av免费观看| 国产精品一二三四五区| 欧美精品一二三| 亚洲免费不卡| 日本欧美www| 在线观看不卡一区| 欧美日韩免费观看一区| 看片网址国产福利av中文字幕| 午夜视频一区在线观看| 国产99在线免费| 欧美黄色免费在线观看| 亚洲黄一区二区三区| 91视频免费在线| 美女三级黄色片| 1024精品合集| 成人在线一区二区| 亚洲欧美综合7777色婷婷 | 国产精品欧美日韩一区二区| 午夜视频在线观看国产| 91丨porny丨蝌蚪视频| 91成人天堂久久成人| 香港三日本8a三级少妇三级99| 不卡一区中文字幕| 91国内揄拍国内精品对白| 女同性恋一区二区三区| 91丝袜高跟美女视频| 欧美一级淫片播放口| 亚欧洲乱码视频| 国产精品麻豆网站| 91丝袜美腿美女视频网站| 日韩黄色免费观看| 亚洲国产一区二区视频| 91精品国产91久久久久青草| 国产亚洲精久久久久久无码77777| 亚洲高清一区二区三区| 激情一区二区三区| 91丨九色丨海角社区| 91精品欧美福利在线观看| 成人国产一区二区三区| 秋霞视频一区二区| 在线播放国产精品| 人妻精油按摩bd高清中文字幕| 99久久精品免费看国产免费软件| 日本三级韩国三级久久| 91麻豆精品久久毛片一级| 亚洲一级电影视频| 欧美日韩电影一区二区三区| 亚洲影视一区二区| 亚洲激情自拍图| 亚洲美女性囗交| 91香蕉视频mp4| 成人av.网址在线网站| 欧美一级高潮片| 欧美视频精品在线观看| 国产一区二区三区乱码| 青青草原综合久久大伊人精品优势| 久久影视电视剧免费网站清宫辞电视 | 欧美一区二区三区视频在线| 激情深爱综合网| 久久精品国产一区二区三区免费看 | 99re热视频这里只精品| 国产精品女人久久久久久| 国产一级片免费看| 欧美美女喷水视频| 国产乱子伦农村叉叉叉| 国产剧情一区在线| 国产成人高清激情视频在线观看| 2025国产精品自拍| 欧美日韩一区二区在线视频| 欧美 日韩 亚洲 一区| 久久99精品久久只有精品| 欧美亚洲日本黄色| 欧美激情视频二区| 黄色91在线观看| 日本女人高潮视频| 日本欧美一区二区三区| 午夜精品一区二区三区在线播放 | 欧美最猛性xxxx| 久艹视频在线观看| 91麻豆精品国产91久久久更新时间| 虎白女粉嫩尤物福利视频| 99国产一区二区三精品乱码| 91亚洲精品在线| 亚洲手机在线观看| 国产一区二区美女视频| 国产三级视频网站| 欧美日韩精品国产| 久久国产精品免费观看| 韩国一区二区三区| 成人免费视频a| 中文字幕一二区| 在线播放国产一区中文字幕剧情欧美| 完美搭档在线观看| 亚洲国产综合91精品麻豆| 伊人网在线免费| 国产精品一区二区无线| 国产乱肥老妇国产一区二 | 亚洲av无码乱码在线观看性色| 少妇激情综合网| 国产中文字幕久久| 3d成人动漫网站| 日韩欧美中文视频| 一区二区三区成人在线视频| 国产精品日韩三级|