国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁(yè) > news >正文

020網(wǎng)站建設(shè)專(zhuān)業(yè)網(wǎng)站建設(shè)公司

020網(wǎng)站建設(shè),專(zhuān)業(yè)網(wǎng)站建設(shè)公司,義烏小商品批發(fā)市場(chǎng)網(wǎng)上進(jìn)貨app,注冊(cè)公司最好用老年人文章目錄背景環(huán)境工具選型實(shí)操M(fèi)M1MM2以MM2集群運(yùn)行以Standalone模式運(yùn)行驗(yàn)證附錄MM2配置表其他背景 一個(gè)測(cè)試環(huán)境的kafka集群,Topic有360,Partition有2000,部署在虛擬機(jī)上,由于多方面原因,要求遷移至k8s容器內(nèi)&#x…

文章目錄

  • 背景
  • 環(huán)境
  • 工具選型
  • 實(shí)操
    • MM1
    • MM2
      • 以MM2集群運(yùn)行
      • 以Standalone模式運(yùn)行
  • 驗(yàn)證
  • 附錄
    • MM2配置表
    • 其他

背景

一個(gè)測(cè)試環(huán)境的kafka集群,Topic有360+,Partition有2000+,部署在虛擬機(jī)上,由于多方面原因,要求遷移至k8s容器內(nèi)(全量遷移),正好可以拿來(lái)練一下手。本文主要記錄對(duì)MM1和MM2的實(shí)際操作過(guò)程,以及使用過(guò)程中遇到的問(wèn)題及解決方案。

環(huán)境

source集群:kafka-2.6.0、2個(gè)broker、虛擬機(jī)

target集群:kafka-2.6.0、3個(gè)broker、k8s

工具:MM1(kafka-mirror-maker.sh)、MM2(connect-mirror-maker.sh)

需求:Topic名稱(chēng)不能改變、數(shù)據(jù)完整

條件:target集群需要開(kāi)啟自動(dòng)創(chuàng)建Topic:auto.create.topics.enable=true

工具選型

本質(zhì)上MM1是Kafka的消費(fèi)者和生產(chǎn)者結(jié)合體,可以有效地將數(shù)據(jù)從源群集移動(dòng)到目標(biāo)群集,但沒(méi)有提供太多其他功能。

并且在MM1多年的使用過(guò)程中發(fā)現(xiàn)了以下局限性:

  1. 靜態(tài)的黑名單和白名單
  2. Topic信息不能同步,所有Topic同步到目標(biāo)端都只有一個(gè)Partition
  3. 必須通過(guò)手動(dòng)配置來(lái)解決active-active場(chǎng)景下的循環(huán)同步問(wèn)題(MM2為解決這個(gè)問(wèn)題,也做了體驗(yàn)很不好的改動(dòng))
  4. rebalance導(dǎo)致的性能問(wèn)題
  5. 缺乏監(jiān)控手段
  6. 無(wú)法保證Exactly Once
  7. 無(wú)法提供容災(zāi)恢復(fù)
  8. 無(wú)法同步Topic列表,只能同步有數(shù)據(jù)的Topic

MM2是基于kafka connect框架開(kāi)發(fā)的。與其它的kafka connecet一樣MM2有source connector和sink connetor組成,可以支持同步以下數(shù)據(jù):

  1. 完整的Topic列表
  2. Topic配置
  3. ACL信息(如果有)
  4. consumer group和offset(kafka2.7.0之后版本才行)
  5. 其他功能:
    • 支持循環(huán)同步檢測(cè)
    • 多集群自定義同步(同一個(gè)任務(wù)中,可以多集群同步:A->B、B->C、B->D)
    • 提供可監(jiān)控Metrics
    • 可通過(guò)配置保證Exactly Once

實(shí)操

秉著實(shí)操前先演練的原則,我自己搭建了一個(gè)和目標(biāo)集群相同配置的集群,用于驗(yàn)證不同工具的操作結(jié)果。有足夠把握之后,再對(duì)目標(biāo)集群實(shí)際操作。

MM1

執(zhí)行 --help 查看參數(shù)選項(xiàng):

[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./kafka-mirror-maker.sh --help
This tool helps to continuously copy data between two Kafka clusters.
Option                                   Description
------                                   -----------
--abort.on.send.failure <String: Stop    Configure the mirror maker to exit onthe entire mirror maker when a send      a failed send. (default: true)failure occurs>
--consumer.config <String: config file>  Embedded consumer config for consumingfrom the source cluster.
--consumer.rebalance.listener <String:   The consumer rebalance listener to useA custom rebalance listener of type      for mirror maker consumer.ConsumerRebalanceListener>
--help                                   Print usage information.
--message.handler <String: A custom      Message handler which will processmessage handler of type                  every record in-between consumer andMirrorMakerMessageHandler>               producer.
--message.handler.args <String:          Arguments used by custom messageArguments passed to message handler      handler for mirror maker.constructor.>
--new.consumer                           DEPRECATED Use new consumer in mirrormaker (this is the default so thisoption will be removed in a futureversion).
--num.streams <Integer: Number of        Number of consumption streams.threads>                                 (default: 1)
--offset.commit.interval.ms <Integer:    Offset commit interval in ms.offset commit interval in                (default: 60000)millisecond>
--producer.config <String: config file>  Embedded producer config.
--rebalance.listener.args <String:       Arguments used by custom rebalanceArguments passed to custom rebalance     listener for mirror maker consumer.listener constructor as a string.>
--version                                Display Kafka version.
--whitelist <String: Java regex          Whitelist of topics to mirror.(String)>
[root@XXGL-T-TJSYZ-REDIS-03 bin]#         

核心參數(shù)就兩個(gè):消費(fèi)者和生產(chǎn)者的配置文件:

consumer.properties:(消費(fèi)source集群)

bootstrap.servers=source:9092
auto.offset.reset=earliest
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=mm1-consumer
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";

producer.properties:(發(fā)送消息至目標(biāo)集群)

bootstrap.servers= target:29092
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";
acks=-1
linger.ms=10
batch.size=10000
retries=3

執(zhí)行腳本:

./kafka-mirror-maker.sh --consumer.config ./consumer.properties --producer.config ./producer.properties --offset.commit.interval.ms 5000 --num.streams 2 --whitelist "projects.*"

MM1比較簡(jiǎn)單,只要兩個(gè)配置文件沒(méi)問(wèn)題,sasl配置正確,基本就OK了,適合簡(jiǎn)單的數(shù)據(jù)同步,比如指定topic進(jìn)行同步。

MM2

有四種運(yùn)行MM2的方法:

  • As a dedicated MirrorMaker cluster.(作為專(zhuān)用的MirrorMaker群集)
  • As a Connector in a distributed Connect cluster.(作為分布式Connect群集中的連接器)
  • As a standalone Connect worker.(作為獨(dú)立的Connect工作者)
  • In legacy mode using existing MirrorMaker scripts.(在舊模式下,使用現(xiàn)有的MirrorMaker腳本。)

本文介紹第一種和第三種:作為專(zhuān)用的MirrorMaker群集、作為獨(dú)立的Connect工作者,第二種需要搭建connect集群,操作比較復(fù)雜。

以MM2集群運(yùn)行

這種模式是最簡(jiǎn)單的,只需要提供一個(gè)配置文件即可,配置文件定制化程度比較高,根據(jù)業(yè)務(wù)需求配置即可

老樣子,執(zhí)行 --help 看看使用說(shuō)明:

[root@XXGL-T-TJSYZ-REDIS-03 bin]# ./connect-mirror-maker.sh --help
usage: connect-mirror-maker [-h] [--clusters CLUSTER [CLUSTER ...]] mm2.propertiesMirrorMaker 2.0 driverpositional arguments:mm2.properties         MM2 configuration file.optional arguments:-h, --help             show this help message and exit--clusters CLUSTER [CLUSTER ...]Target cluster to use for this node.
[root@XXGL-T-TJSYZ-REDIS-03 bin]#  

可以看到,參數(shù)簡(jiǎn)單了許多,核心參數(shù)就一個(gè)配置文件。

mm2.properties:

name = event-center-connector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 2# 定義集群別名
clusters = event-center, event-center-new# 設(shè)置event-center集群的kafka地址列表
event-center.bootstrap.servers = source:9193
event-center.security.protocol=SASL_PLAINTEXT
event-center.sasl.mechanism=PLAIN
event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 設(shè)置event-center-new集群的kafka地址列表
event-center-new.bootstrap.servers = target:29092
event-center-new.security.protocol=SASL_PLAINTEXT
event-center-new.sasl.mechanism=PLAIN
event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 開(kāi)啟event-center集群向event-center-new集群同步
event-center->event-center-new.enabled = true
# 允許同步topic的正則
event-center->event-center-new.topics = projects.*
event-center->event-center-new.groups = .*# MM2內(nèi)部同步機(jī)制使用的topic,replication數(shù)量設(shè)置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1# 自定義參數(shù)
# 是否同步源topic配置
sync.topic.configs.enabled=true
# 是否同步源event-centerCL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 連接器是否發(fā)送心跳
emit.heartbeats.enabled=true
# 心跳間隔
emit.heartbeats.interval.seconds=5
# 是否發(fā)送檢查點(diǎn)
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新間隔
refresh.topics.interval.seconds=60
# 是否刷新消費(fèi)者組id
refresh.groups.enabled=true
# 刷新間隔
refresh.groups.interval.seconds=60
# DefaultReplicationPolicy / CustomReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
# 遠(yuǎn)端創(chuàng)建新topic的replication數(shù)量設(shè)置
replication.factor=3

需要注意的是:replication.policy.class 默認(rèn)為:DefaultReplicationPolicy,這個(gè)策略會(huì)把同步至目標(biāo)集群的topic都加上一個(gè)源集群別名的前綴,比如源集群別名為A,topic為:bi-log,該topic同步到目標(biāo)集群后會(huì)變成:A.bi-log,為啥這么做呢,就是為了避免雙向同步的場(chǎng)景出現(xiàn)死循環(huán)。

官方也給出了解釋:

這是 MirrorMaker 2.0 中的默認(rèn)行為,以避免在復(fù)雜的鏡像拓?fù)渲兄貙?xiě)數(shù)據(jù)。 需要在復(fù)制流設(shè)計(jì)和主題管理方面小心自定義此項(xiàng),以避免數(shù)據(jù)丟失。 可以通過(guò)對(duì)“replication.policy.class”使用自定義復(fù)制策略類(lèi)來(lái)完成此操作。

針對(duì)如何自定義策略及使用方法,見(jiàn)我的另一篇文章:

為了保證腳本后臺(tái)運(yùn)行,寫(xiě)一個(gè)腳本包裝一下:

run-mm2.sh:

#!/bin/bashexec ./connect-mirror-maker.sh MM2.properties >log/mm2.log 2>&1 &

之后執(zhí)行腳本即可。

以Standalone模式運(yùn)行

這種模式會(huì)麻煩點(diǎn),需要提供一個(gè)kafka,作為worker節(jié)點(diǎn)來(lái)同步數(shù)據(jù),使用的腳本為:connect-standalone.sh

–help看看如何使用:

./connect-standalone.sh --help
[2023-03-09 20:36:33,479] INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:63)
[root@XXGL-T-TJSYZ-REDIS-03 bin]# 

需要兩個(gè)配置文件,一個(gè)是作為worker的kafka集群信息(worker.properties),另一個(gè)是同步數(shù)據(jù)的配置(connector.properties)

worker.properties:

bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAINkey.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverteroffset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connector.properties:

name = MirrorSourceConnector
topics = projects.*
groups = *
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1# source
# 這個(gè)配置會(huì)使同步之后的Topic都加上一個(gè)前綴,慎重
source.cluster.alias = old
source.cluster.bootstrap.servers = source:9193
source.cluster.security.protocol=SASL_PLAINTEXT
source.cluster.sasl.mechanism=PLAIN
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# target
target.cluster.alias = new
target.cluster.bootstrap.servers = target:29092
target.cluster.security.protocol=SASL_PLAINTEXT
target.cluster.sasl.mechanism=PLAIN
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";# 是否同步源topic配置信息
sync.topic.configs.enabled=true
# 是否同步源ACL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 連接器是否發(fā)送心跳
emit.heartbeats.enabled=true
# 心跳間隔
emit.heartbeats.interval.seconds=5
# 是否發(fā)送檢查點(diǎn)
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新間隔
refresh.topics.interval.seconds=30
# 是否刷新消費(fèi)者組id
refresh.groups.enabled=true
# 刷新間隔
refresh.groups.interval.seconds=30
# 連接器消費(fèi)者預(yù)讀隊(duì)列大小
# readahead.queue.capacity=500
# 使用自定義策略
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
replication.factor = 3

執(zhí)行:

./connect-standalone.sh worker.properties connector.properties

這種方式做一個(gè)簡(jiǎn)單的介紹,我最后采用的是上一種方式,比較簡(jiǎn)單直接

驗(yàn)證

驗(yàn)證:

  • 消息數(shù)量 OK

    使用kafka-tool工具連接上兩個(gè)集群進(jìn)行比對(duì)

  • Topic數(shù)量 OK

    • source:
    ./kafka-topics.sh --bootstrap-server source:9193 --command-config command.properties --list > topics-source.txt 
    
    • sink
    ./kafka-topics.sh --bootstrap-server sink:29092 --command-config command.properties --list > topics-sink.txt 
    
    • command.properties示例:
    security.protocol = SASL_PLAINTEXT
    sasl.mechanism = PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
    
  • 新消息是否同步 OK

  • 新Topic是否同步 OK

  • Consumer是否同步 NO

./kafka-consumer-groups.sh --bootstrap-server source:9193 --command-config command.properties --list > consumer-source.txt 

? 如果需要同步consumer,需要使用官方提供的工具:RemoteClusterUtils

  • consumer offset是否同步 NO

  • ACL是否同步 OK
    通過(guò)kafka-acls.sh或者客戶(hù)端工具kafka-tool可以查看

附錄

MM2配置表

propertydefault valuedescription
namerequiredname of the connector, e.g. “us-west->us-east”
topicsempty stringregex of topics to replicate, e.g. “topic1|topic2|topic3”. Comma-separated lists are also supported.
topics.blacklist“..internal, ..replica, __consumer_offsets” or similartopics to exclude from replication
groupsempty stringregex of groups to replicate, e.g. “.*”
groups.blacklistempty stringgroups to exclude from replication
source.cluster.aliasrequiredname of the cluster being replicated
target.cluster.aliasrequiredname of the downstream Kafka cluster
source.cluster.bootstrap.serversrequiredupstream cluster to replicate
target.cluster.bootstrap.serversrequireddownstream cluster
sync.topic.configs.enabledtruewhether or not to monitor source cluster for configuration changes
sync.topic.acls.enabledtruewhether to monitor source cluster ACLs for changes
emit.heartbeats.enabledtrueconnector should periodically emit heartbeats
emit.heartbeats.interval.seconds5 (seconds)frequency of heartbeats
emit.checkpoints.enabledtrueconnector should periodically emit consumer offset information
emit.checkpoints.interval.seconds5 (seconds)frequency of checkpoints
refresh.topics.enabledtrueconnector should periodically check for new topics
refresh.topics.interval.seconds5 (seconds)frequency to check source cluster for new topics
refresh.groups.enabledtrueconnector should periodically check for new consumer groups
refresh.groups.interval.seconds5 (seconds)frequency to check source cluster for new consumer groups
readahead.queue.capacity500 (records)number of records to let consumer get ahead of producer
replication.policy.classorg.apache.kafka.connect.mirror.DefaultReplicationPolicyuse LegacyReplicationPolicy to mimic legacy MirrorMaker
heartbeats.topic.retention.ms1 dayused when creating heartbeat topics for the first time
checkpoints.topic.retention.ms1 dayused when creating checkpoint topics for the first time
offset.syncs.topic.retention.msmax longused when creating offset sync topic for the first time
replication.factor2used when creating remote topics

其他

參考:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%253A+MirrorMaker+2.0

https://www.reddit.com/r/apachekafka/comments/q5s3al/mirrormaker2_is_not_able_to_replicate_groups_in/?sort=new

https://dev.to/keigodasu/transferring-commit-offset-with-mirrormaker-2-3kbf

https://learn.microsoft.com/zh-cn/azure/hdinsight/kafka/kafka-mirrormaker-2-0-guide

http://aloenet.com.cn/news/39588.html

相關(guān)文章:

  • 重慶網(wǎng)站建設(shè)排名武漢seo首頁(yè)
  • 網(wǎng)站負(fù)責(zé)人辦理幕布或站點(diǎn)拍照重要新聞今天8條新聞
  • 用html制作網(wǎng)站代碼百家號(hào)關(guān)鍵詞排名優(yōu)化
  • android安裝教程seo診斷書(shū)
  • 499全包網(wǎng)站建設(shè)東莞做網(wǎng)頁(yè)建站公司
  • 企業(yè)免費(fèi)網(wǎng)站優(yōu)化方案百度瀏覽器手機(jī)版
  • 做倫理電影網(wǎng)站百度推廣關(guān)鍵詞質(zhì)量度
  • 杭州網(wǎng)站建設(shè)哪家好seo深圳培訓(xùn)班
  • 北京道路建設(shè)在什么網(wǎng)站查詢(xún)網(wǎng)站推廣的軟件
  • 機(jī)械網(wǎng)站建設(shè)哪家好怎么樣在百度上推廣自己的產(chǎn)品
  • 做網(wǎng)站怎么收集資料太原免費(fèi)網(wǎng)站建站模板
  • 網(wǎng)站正常打開(kāi)速度慢semi
  • 單頁(yè)網(wǎng)站對(duì)攻擊的好處如何做好互聯(lián)網(wǎng)營(yíng)銷(xiāo)
  • 警惕成人網(wǎng)站免費(fèi)看手機(jī)引流推廣接單
  • 做網(wǎng)站所用的技術(shù)推廣普通話(huà)的宣傳語(yǔ)
  • 國(guó)內(nèi)網(wǎng)站開(kāi)發(fā)短視頻精準(zhǔn)獲客系統(tǒng)
  • 品牌專(zhuān)業(yè)建設(shè)網(wǎng)站常見(jiàn)的搜索引擎
  • 在哪能學(xué)到網(wǎng)站建設(shè)專(zhuān)業(yè)seo推廣是做什么
  • 做植物提取物好的推廣網(wǎng)站seo自動(dòng)優(yōu)化軟件下載
  • 校園網(wǎng)站規(guī)劃與建設(shè)工具大全
  • 網(wǎng)站建設(shè)app開(kāi)發(fā)合同范本百度普通下載
  • 新疆建設(shè)廳官方網(wǎng)站文件鏈接推廣
  • 無(wú)錫 網(wǎng)站建設(shè)黃頁(yè)88網(wǎng)官網(wǎng)
  • 同個(gè)主體新增網(wǎng)站備案外鏈吧怎么使用
  • 做網(wǎng)站看網(wǎng)頁(yè)效果手機(jī)網(wǎng)站排名優(yōu)化
  • 跨境電商千萬(wàn)別做亞馬遜seo排名優(yōu)化工具推薦
  • 地產(chǎn)建站規(guī)劃可以投放廣告的網(wǎng)站
  • 網(wǎng)站建設(shè)服務(wù)價(jià)格表seo顧問(wèn)公司
  • 網(wǎng)站icp備案新規(guī)推廣哪個(gè)平臺(tái)好
  • c 做網(wǎng)站教程百度seo教程視頻