Kafka生產環境應用方案:高可用叢集部署與運維實戰

Kafka生產環境應用方案:高可用叢集部署與運維實戰

架構圖

┌─────────────────────────────────────────────────────────────────────────────────┐│                           Kafka生產環境架構                                        │├─────────────────────────────────────────────────────────────────────────────────┤│                                                                                 ││  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐                         ││  │  Producer1  │    │  Producer2  │    │  Producer3  │                         ││  └─────────────┘    └─────────────┘    └─────────────┘                         ││           │                 │                 │                                ││           └─────────────────┼─────────────────┘                                ││                             │                                                  ││  ┌─────────────────────────────────────────────────────────────────────────┐   ││  │                      Kafka Cluster                                     │   ││  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐   │   ││  │  │   Broker1   │  │   Broker2   │  │   Broker3   │  │   Broker4   │   │   ││  │  │192.168.1.11 │  │192.168.1.12 │  │192.168.1.13 │  │192.168.1.14 │   │   ││  │  │   Port:9092 │  │   Port:9092 │  │   Port:9092 │  │   Port:9092 │   │   ││  │  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘   │   ││  └─────────────────────────────────────────────────────────────────────────┐   ││                             │                                             │   ││  ┌─────────────────────────────────────────────────────────────────────────┐   ││  │                      ZooKeeper Cluster                                 │   ││  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                    │   ││  │  │     ZK1     │  │     ZK2     │  │     ZK3     │                    │   ││  │  │192.168.1.21 │  │192.168.1.22 │  │192.168.1.23 │                    │   ││  │  │  Port:2181  │  │  Port:2181  │  │  Port:2181  │                    │   ││  │  └─────────────┘  └─────────────┘  └─────────────┘                    │   ││  └─────────────────────────────────────────────────────────────────────────┘   ││                             │                                                  ││  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐                         ││  │  Consumer1  │    │  Consumer2  │    │  Consumer3  │                         ││  │ (Group A)   │    │ (Group B)   │    │ (Group C)   │                         ││  └─────────────┘    └─────────────┘    └─────────────┘                         ││                                                                                 ││  ┌─────────────────────────────────────────────────────────────────────────┐   ││  │                      監控系統                                            │   ││  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                    │   ││  │  │ Prometheus  │  │   Grafana   │  │   Kafka     │                    │   ││  │  │   Metrics   │  │  Dashboard  │  │   Manager   │                    │   ││  │  └─────────────┘  └─────────────┘  └─────────────┘                    │   ││  └─────────────────────────────────────────────────────────────────────────┘   │└─────────────────────────────────────────────────────────────────────────────────┘

引言

Apache Kafka作為分散式流處理平臺,在現代大資料架構中扮演著訊息中介軟體的核心角色。本文將從運維工程師的角度,詳細介紹Kafka在生產環境中的部署方案、配置最佳化、監控運維等關鍵技術。透過實戰案例和程式碼示例,幫助運維團隊構建穩定、高效的Kafka叢集。

1. Kafka叢集自動化部署

1.1 ZooKeeper叢集部署指令碼

#!/bin/bash# ZooKeeper叢集自動化部署指令碼set -eZK_VERSION="3.8.1"ZK_NODES=("192.168.1.21""192.168.1.22""192.168.1.23")ZK_DATA_DIR="/data/zookeeper"ZK_LOG_DIR="/logs/zookeeper"# 建立ZooKeeper使用者useradd -r -s /bin/false zookeeper# 下載安裝ZooKeeperinstall_zookeeper() {cd /tmp    wget https://archive.apache.org/dist/zookeeper/zookeeper-${ZK_VERSION}/apache-zookeeper-${ZK_VERSION}-bin.tar.gz    tar -xzf apache-zookeeper-${ZK_VERSION}-bin.tar.gzmv apache-zookeeper-${ZK_VERSION}-bin /opt/zookeeperchown -R zookeeper:zookeeper /opt/zookeeper}# 配置ZooKeeperconfigure_zookeeper() {local node_id=$1local node_ip=$2# 建立資料目錄mkdir -p ${ZK_DATA_DIR}${ZK_LOG_DIR}chown -R zookeeper:zookeeper ${ZK_DATA_DIR}${ZK_LOG_DIR}# 設定節點IDecho${node_id} > ${ZK_DATA_DIR}/myid# 生成配置檔案cat > /opt/zookeeper/conf/zoo.cfg << EOFtickTime=2000initLimit=10syncLimit=5dataDir=${ZK_DATA_DIR}dataLogDir=${ZK_LOG_DIR}clientPort=2181maxClientCnxns=60# 叢集配置server.1=192.168.1.21:2888:3888server.2=192.168.1.22:2888:3888server.3=192.168.1.23:2888:3888# 效能最佳化autopurge.snapRetainCount=10autopurge.purgeInterval=1EOF}# 啟動ZooKeeper服務start_zookeeper() {# 建立systemd服務檔案cat > /etc/systemd/system/zookeeper.service << EOF[Unit]Description=Apache ZooKeeper serverDocumentation=http://zookeeper.apache.orgRequires=network.target remote-fs.targetAfter=network.target remote-fs.target[Service]Type=forkingUser=zookeeperGroup=zookeeperEnvironment=JAVA_HOME=/usr/lib/jvm/java-11-openjdkExecStart=/opt/zookeeper/bin/zkServer.sh startExecStop=/opt/zookeeper/bin/zkServer.sh stopExecReload=/opt/zookeeper/bin/zkServer.sh restartTimeoutSec=30Restart=on-failure[Install]WantedBy=multi-user.targetEOF    systemctl daemon-reload    systemctl enable zookeeper    systemctl start zookeeper}# 執行部署install_zookeeperconfigure_zookeeper $1$2start_zookeeper
ZooKeeper作為Kafka的協調服務,需要奇數個節點組成叢集以保證高可用性。透過自動化指令碼可以快速部署標準化的ZooKeeper環境。

1.2 Kafka叢集部署配置

#!/bin/bash# Kafka叢集部署指令碼KAFKA_VERSION="2.8.2"KAFKA_NODES=("192.168.1.11""192.168.1.12""192.168.1.13""192.168.1.14")KAFKA_DATA_DIR="/data/kafka"KAFKA_LOG_DIR="/logs/kafka"# 安裝Kafkainstall_kafka() {cd /tmp    wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-${KAFKA_VERSION}.tgz    tar -xzf kafka_2.13-${KAFKA_VERSION}.tgzmv kafka_2.13-${KAFKA_VERSION} /opt/kafka# 建立kafka使用者    useradd -r -s /bin/false kafkachown -R kafka:kafka /opt/kafka# 建立資料目錄mkdir -p ${KAFKA_DATA_DIR}${KAFKA_LOG_DIR}chown -R kafka:kafka ${KAFKA_DATA_DIR}${KAFKA_LOG_DIR}}# 生成Kafka伺服器配置generate_kafka_config() {local broker_id=$1local node_ip=$2cat > /opt/kafka/config/server.properties << EOF# 伺服器基礎配置broker.id=${broker_id}listeners=PLAINTEXT://${node_ip}:9092advertised.listeners=PLAINTEXT://${node_ip}:9092num.network.threads=8num.io.threads=16socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600# 日誌配置log.dirs=${KAFKA_DATA_DIR}num.partitions=3num.recovery.threads.per.data.dir=2offsets.topic.replication.factor=3transaction.state.log.replication.factor=3transaction.state.log.min.isr=2log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000# ZooKeeper配置zookeeper.connect=192.168.1.21:2181,192.168.1.22:2181,192.168.1.23:2181/kafkazookeeper.connection.timeout.ms=18000# 效能最佳化配置replica.fetch.max.bytes=1048576message.max.bytes=1000000replica.socket.timeout.ms=30000replica.socket.receive.buffer.bytes=65536replica.fetch.wait.max.ms=500replica.high.watermark.checkpoint.interval.ms=5000fetch.purgatory.purge.interval.requests=1000producer.purgatory.purge.interval.requests=1000delete.topic.enable=true# JVM配置export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"EOF}# 建立Kafka系統服務create_kafka_service() {cat > /etc/systemd/system/kafka.service << EOF[Unit]Description=Apache Kafka server (broker)Documentation=http://kafka.apache.org/documentation.htmlRequires=network.target remote-fs.targetAfter=network.target remote-fs.target zookeeper.service[Service]Type=simpleUser=kafkaGroup=kafkaEnvironment=JAVA_HOME=/usr/lib/jvm/java-11-openjdkExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.propertiesExecStop=/opt/kafka/bin/kafka-server-stop.shTimeoutSec=30Restart=on-failure[Install]WantedBy=multi-user.targetEOF    systemctl daemon-reload    systemctl enable kafka    systemctl start kafka}# 執行部署install_kafkagenerate_kafka_config $1$2create_kafka_service

2. 生產環境效能最佳化

2.1 生產者效能調優

#!/usr/bin/env python3# Kafka生產者效能最佳化配置from kafka import KafkaProducerimport jsonimport timeimport threadingfrom concurrent.futures import ThreadPoolExecutorclassOptimizedKafkaProducer:def__init__(self, bootstrap_servers, topic):self.topic = topicself.producer = KafkaProducer(            bootstrap_servers=bootstrap_servers,# 效能最佳化配置            batch_size=16384,              # 批處理大小            linger_ms=10,                  # 延遲傳送時間            buffer_memory=33554432,        # 緩衝區大小32MB            compression_type='snappy',     # 壓縮演算法            max_in_flight_requests_per_connection=5,            retries=3,                     # 重試次數            retry_backoff_ms=100,            request_timeout_ms=30000,# 序列化配置            value_serializer=lambda v: json.dumps(v).encode('utf-8'),            key_serializer=lambda k: str(k).encode('utf-8')        )defsend_message_sync(self, key, value):"""同步傳送訊息"""try:            future = self.producer.send(self.topic, key=key, value=value)            record_metadata = future.get(timeout=10)return {'topic': record_metadata.topic,'partition': record_metadata.partition,'offset': record_metadata.offset            }except Exception as e:print(f"傳送訊息失敗: {e}")returnNonedefsend_message_async(self, key, value, callback=None):"""非同步傳送訊息"""try:            future = self.producer.send(self.topic, key=key, value=value)if callback:                future.add_callback(callback)return futureexcept Exception as e:print(f"傳送訊息失敗: {e}")returnNonedefbatch_send_performance_test(self, message_count=100000):"""批次傳送效能測試"""        start_time = time.time()# 使用執行緒池併發傳送with ThreadPoolExecutor(max_workers=10as executor:            futures = []for i inrange(message_count):                message = {'id': i,'timestamp': time.time(),'data'f'test_message_{i}','source''performance_test'                }                future = executor.submit(self.send_message_async, str(i), message)                futures.append(future)# 等待所有訊息傳送完成for future in futures:try:                    future.result(timeout=30)except Exception as e:print(f"訊息傳送異常: {e}")# 確保所有訊息都發送出去self.producer.flush()        end_time = time.time()        duration = end_time - start_time        throughput = message_count / durationprint(f"傳送 {message_count} 條訊息")print(f"總耗時: {duration:.2f} 秒")print(f"吞吐量: {throughput:.2f} 訊息/秒")defclose(self):self.producer.close()# 使用示例if __name__ == "__main__":    producer = OptimizedKafkaProducer(        bootstrap_servers=['192.168.1.11:9092''192.168.1.12:9092'],        topic='performance_test'    )# 執行效能測試    producer.batch_send_performance_test(50000)    producer.close()

2.2 消費者效能最佳化

#!/usr/bin/env python3# Kafka消費者效能最佳化配置from kafka import KafkaConsumerimport jsonimport timeimport threadingfrom concurrent.futures import ThreadPoolExecutorclassOptimizedKafkaConsumer:def__init__(self, topics, group_id, bootstrap_servers):self.topics = topicsself.group_id = group_idself.consumer = KafkaConsumer(            *topics,            bootstrap_servers=bootstrap_servers,            group_id=group_id,# 效能最佳化配置            fetch_min_bytes=1024,          # 最小拉取位元組數            fetch_max_wait_ms=500,         # 最大等待時間            max_poll_records=500,          # 單次拉取最大記錄數            max_poll_interval_ms=300000,   # 最大輪詢間隔            session_timeout_ms=30000,      # 會話超時時間            heartbeat_interval_ms=10000,   # 心跳間隔# 消費策略            auto_offset_reset='earliest',            enable_auto_commit=False,      # 手動提交偏移量# 反序列化配置            value_deserializer=lambda m: json.loads(m.decode('utf-8')),            key_deserializer=lambda m: m.decode('utf-8'if m elseNone        )defconsume_messages_batch(self, batch_size=100, timeout=5000):"""批次消費訊息"""        message_batch = []try:# 批次拉取訊息            message_pack = self.consumer.poll(timeout_ms=timeout)for topic_partition, messages in message_pack.items():for message in messages:                    message_batch.append({'topic': message.topic,'partition': message.partition,'offset': message.offset,'key': message.key,'value': message.value,'timestamp': message.timestamp                    })iflen(message_batch) >= batch_size:# 處理批次訊息self.process_message_batch(message_batch)                        message_batch = []# 處理剩餘訊息if message_batch:self.process_message_batch(message_batch)# 手動提交偏移量self.consumer.commit()except Exception as e:print(f"消費訊息異常: {e}")defprocess_message_batch(self, messages):"""批次處理訊息"""with ThreadPoolExecutor(max_workers=5as executor:            futures = []for message in messages:                future = executor.submit(self.process_single_message, message)                futures.append(future)# 等待所有訊息處理完成for future in futures:try:                    future.result(timeout=30)except Exception as e:print(f"處理訊息異常: {e}")defprocess_single_message(self, message):"""處理單條訊息"""try:# 模擬業務處理            time.sleep(0.001)# 記錄處理日誌print(f"處理訊息: Topic={message['topic']}, "f"Partition={message['partition']}, "f"Offset={message['offset']}")except Exception as e:print(f"處理單條訊息異常: {e}")defstart_consuming(self):"""開始消費訊息"""print(f"開始消費主題: {self.topics}")try:whileTrue:self.consume_messages_batch()except KeyboardInterrupt:print("停止消費")finally:self.consumer.close()# 使用示例if __name__ == "__main__":    consumer = OptimizedKafkaConsumer(        topics=['performance_test'],        group_id='performance_consumer_group',        bootstrap_servers=['192.168.1.11:9092''192.168.1.12:9092']    )    consumer.start_consuming()

3. 監控與運維自動化

3.1 Kafka叢集監控指令碼

#!/bin/bash# Kafka叢集監控指令碼KAFKA_HOME="/opt/kafka"KAFKA_BROKERS="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"ALERT_EMAIL="[email protected]"LOG_FILE="/var/log/kafka_monitor.log"# 檢查Kafka叢集狀態check_kafka_cluster() {echo"$(date): 檢查Kafka叢集狀態" >> $LOG_FILE# 檢查broker列表    broker_list=$(${KAFKA_HOME}/bin/kafka-broker-api-versions.sh --bootstrap-server ${KAFKA_BROKERS} 2>/dev/null | grep -c "id:")if [ "$broker_list" -lt 3 ]; thenecho"ALERT: Kafka叢集可用broker不足: $broker_list" | mail -s "Kafka Cluster Alert"$ALERT_EMAILecho"$(date): ALERT - 可用broker不足: $broker_list" >> $LOG_FILEfi}# 檢查主題狀態check_topic_health() {echo"$(date): 檢查主題健康狀態" >> $LOG_FILE# 獲取主題列表    topics=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --list)for topic in$topicsdo# 檢查主題描述        topic_desc=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic $topic)# 檢查是否有離線分割槽        offline_partitions=$(echo"$topic_desc" | grep -c "Leader: -1")if [ "$offline_partitions" -gt 0 ]; thenecho"ALERT: 主題 $topic 有 $offline_partitions 個離線分割槽" | mail -s "Kafka Topic Alert"$ALERT_EMAILecho"$(date): ALERT - 主題 $topic 離線分割槽: $offline_partitions" >> $LOG_FILEfidone}# 檢查消費者組延遲check_consumer_lag() {echo"$(date): 檢查消費者組延遲" >> $LOG_FILE# 獲取消費者組列表    consumer_groups=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --list)for group in$consumer_groupsdo# 獲取消費者組詳情        group_desc=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group $group)# 檢查延遲        max_lag=$(echo"$group_desc" | awk 'NR>1 {print $5}' | grep -v "-" | sort -n | tail -1)if [ -n "$max_lag" ] && [ "$max_lag" -gt 10000 ]; thenecho"ALERT: 消費者組 $group 最大延遲: $max_lag" | mail -s "Kafka Consumer Lag Alert"$ALERT_EMAILecho"$(date): ALERT - 消費者組 $group 延遲過高: $max_lag" >> $LOG_FILEfidone}# 收集效能指標collect_metrics() {echo"$(date): 收集Kafka效能指標" >> $LOG_FILE# 收集JVM指標for broker in 192.168.1.11 192.168.1.12 192.168.1.13; do        kafka_pid=$(ssh $broker"pgrep -f kafka")if [ -n "$kafka_pid" ]; then# 記憶體使用率            memory_usage=$(ssh $broker"ps -p $kafka_pid -o %mem --no-headers")echo"$(date): Broker $broker 記憶體使用率: $memory_usage%" >> $LOG_FILE# CPU使用率            cpu_usage=$(ssh $broker"ps -p $kafka_pid -o %cpu --no-headers")echo"$(date): Broker $broker CPU使用率: $cpu_usage%" >> $LOG_FILEfidone}# 主監控迴圈whiletruedo    check_kafka_cluster    check_topic_health    check_consumer_lag    collect_metricssleep 300  # 5分鐘檢查一次done

3.2 自動化運維指令碼

#!/usr/bin/env python3# Kafka自動化運維指令碼import subprocessimport jsonimport smtplibfrom email.mime.text import MIMETextfrom datetime import datetimeimport loggingclassKafkaOperations:def__init__(self, kafka_home, brokers):self.kafka_home = kafka_homeself.brokers = brokersself.logger = self.setup_logger()defsetup_logger(self):"""設定日誌記錄"""        logging.basicConfig(            level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',            handlers=[                logging.FileHandler('/var/log/kafka_operations.log'),                logging.StreamHandler()            ]        )return logging.getLogger(__name__)defcreate_topic(self, topic_name, partitions=3, replication_factor=2):"""建立主題"""try:            cmd = [f"{self.kafka_home}/bin/kafka-topics.sh","--bootstrap-server"self.brokers,"--create","--topic", topic_name,"--partitions"str(partitions),"--replication-factor"str(replication_factor)            ]            result = subprocess.run(cmd, capture_output=True, text=True)if result.returncode == 0:self.logger.info(f"成功建立主題: {topic_name}")returnTrueelse:self.logger.error(f"建立主題失敗: {result.stderr}")returnFalseexcept Exception as e:self.logger.error(f"建立主題異常: {e}")returnFalsedefdelete_topic(self, topic_name):"""刪除主題"""try:            cmd = [f"{self.kafka_home}/bin/kafka-topics.sh","--bootstrap-server"self.brokers,"--delete","--topic", topic_name            ]            result = subprocess.run(cmd, capture_output=True, text=True)if result.returncode == 0:self.logger.info(f"成功刪除主題: {topic_name}")returnTrueelse:self.logger.error(f"刪除主題失敗: {result.stderr}")returnFalseexcept Exception as e:self.logger.error(f"刪除主題異常: {e}")returnFalsedefincrease_partitions(self, topic_name, new_partition_count):"""增加分割槽數"""try:            cmd = [f"{self.kafka_home}/bin/kafka-topics.sh","--bootstrap-server"self.brokers,"--alter","--topic", topic_name,"--partitions"str(new_partition_count)            ]            result = subprocess.run(cmd, capture_output=True, text=True)if result.returncode == 0:self.logger.info(f"成功增加主題 {topic_name} 分割槽數到 {new_partition_count}")returnTrueelse:self.logger.error(f"增加分割槽失敗: {result.stderr}")returnFalseexcept Exception as e:self.logger.error(f"增加分割槽異常: {e}")returnFalsedefrebalance_partitions(self, topic_name):"""重新平衡分割槽"""try:# 生成重平衡計劃            reassignment_file = f"/tmp/reassignment-{topic_name}.json"# 獲取當前分割槽分配            cmd_current = [f"{self.kafka_home}/bin/kafka-topics.sh","--bootstrap-server"self.brokers,"--describe","--topic", topic_name            ]            current_result = subprocess.run(cmd_current, capture_output=True, text=True)if current_result.returncode == 0:# 生成重平衡計劃                cmd_generate = [f"{self.kafka_home}/bin/kafka-reassign-partitions.sh","--bootstrap-server"self.brokers,"--topics-to-move-json-file""/tmp/topics.json","--broker-list""0,1,2,3","--generate"                ]# 執行重平衡                cmd_execute = [f"{self.kafka_home}/bin/kafka-reassign-partitions.sh","--bootstrap-server"self.brokers,"--reassignment-json-file", reassignment_file,"--execute"                ]self.logger.info(f"開始重平衡主題: {topic_name}")returnTrueelse:self.logger.error(f"獲取主題資訊失敗: {current_result.stderr}")returnFalseexcept Exception as e:self.logger.error(f"重平衡異常: {e}")returnFalsedefbackup_consumer_offsets(self, group_id):"""備份消費者偏移量"""try:            cmd = [f"{self.kafka_home}/bin/kafka-consumer-groups.sh","--bootstrap-server"self.brokers,"--describe","--group", group_id            ]            result = subprocess.run(cmd, capture_output=True, text=True)if result.returncode == 0:                backup_file = f"/backup/consumer_offsets_{group_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"withopen(backup_file, 'w'as f:                    f.write(result.stdout)self.logger.info(f"成功備份消費者組 {group_id} 偏移量到 {backup_file}")returnTrueelse:self.logger.error(f"備份偏移量失敗: {result.stderr}")returnFalseexcept Exception as e:self.logger.error(f"備份偏移量異常: {e}")returnFalse# 使用示例if __name__ == "__main__":    kafka_ops = KafkaOperations(        kafka_home="/opt/kafka",        brokers="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"    )# 建立主題    kafka_ops.create_topic("test_topic", partitions=6, replication_factor=3)# 增加分割槽    kafka_ops.increase_partitions("test_topic"12)# 備份消費者偏移量    kafka_ops.backup_consumer_offsets("test_consumer_group")

4. 高可用與故障恢復

4.1 叢集健康檢查

#!/bin/bash# Kafka叢集健康檢查與自動恢復KAFKA_HOME="/opt/kafka"KAFKA_BROKERS="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"# 檢查並修復不同步副本check_and_fix_isr() {echo"檢查不同步副本..."# 獲取所有主題    topics=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --list)for topic in$topicsdo# 檢查主題詳情        topic_desc=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic $topic)# 檢查ISR不足的分割槽        isr_issues=$(echo"$topic_desc" | grep -E "Isr:|Replicas:" | awk '{            if ($1 == "Replicas:") replicas = NF-1;            if ($1 == "Isr:") isr = NF-1;            if (isr < replicas) print "ISR不足"        }')if [ -n "$isr_issues" ]; thenecho"主題 $topic 存在ISR不足問題,嘗試修復..."# 觸發首選副本選舉${KAFKA_HOME}/bin/kafka-leader-election.sh --bootstrap-server ${KAFKA_BROKERS} --election-type preferred --topic $topicfidone}# 自動故障恢復auto_recovery() {echo"執行自動故障恢復..."# 重啟失敗的brokerfor broker in 192.168.1.11 192.168.1.12 192.168.1.13; doif ! ssh $broker"systemctl is-active kafka" > /dev/null 2>&1; thenecho"重啟broker: $broker"            ssh $broker"systemctl restart kafka"sleep 30fidone# 檢查並修復ISR    check_and_fix_isr# 驗證叢集狀態    validate_cluster_state}validate_cluster_state() {echo"驗證叢集狀態..."# 檢查所有broker是否線上    online_brokers=$(${KAFKA_HOME}/bin/kafka-broker-api-versions.sh --bootstrap-server ${KAFKA_BROKERS} 2>/dev/null | grep -c "id:")if [ "$online_brokers" -eq 3 ]; thenecho"叢集恢復正常,所有broker線上"elseecho"叢集恢復失敗,線上broker數量: $online_brokers"return 1fi}# 執行健康檢查和恢復auto_recovery

總結

Kafka生產環境部署涉及多個關鍵環節:叢集架構設計、效能引數調優、監控體系建設、自動化運維等。透過本文介紹的方案,運維工程師可以構建穩定、高效的Kafka叢集。關鍵要點包括:合理的叢集規模規劃、科學的配置引數調優、完善的監控告警機制、可靠的故障恢復策略。在實際生產環境中,還需要根據具體業務場景進行針對性最佳化,持續監控和改進系統性能,確保訊息佇列服務的穩定性和可靠性。
文末福利
就目前來說,傳統運維衝擊年薪30W+的轉型方向就是SRE&DevOps崗位。
為了幫助大家早日擺脫繁瑣的基層運維工作,給大家整理了一套高階運維工程師必備技能資料包,內容有多詳實豐富看下圖!
共有 20 個模組
1.38張最全工程師技能圖譜
2.面試大禮包
3.Linux書籍
4.go書籍
······
6.自動化運維工具
18.訊息佇列合集
 以上所有資料獲取請掃碼
備註:最新運維資料
100%免費領取
(後臺不再回復,掃碼一鍵領取)


相關文章