Kafka / 消息队列 / 系统运维

Kafka集群安装部署和实践

温馨提示:本文最后更新于2022-02-27 18:24:56,某些文章具有时效性,若有错误或已失效,请在下方留言或问题自助查询
当前位置: 首页 » 大数据 » Kafka » Kafka集群安装部署和实践
浅时光博客 · 2月2日 · 2020年 本文11539个字,预计阅读29分钟 159760次已读

一、kafka介绍


1、什么是消息队列

消息队列技术是分布式应用间交换信息的一种技术。消息队列可驻留在内存或磁盘上, 队列存储消息直到它们被应用程序读走。通过消息队列,应用程序可独立地执行–它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。常用的消息队列技术是 Message Queue。

2、消息队列的作用

  • 应用程序解耦并行处理
  • 顺序保证
  • 高吞吐率
  • 高容错、高可用
  • 可扩展
  • 峰值处理

Message Queue 的通讯模式

  1. 点对点通讯:点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。
  2. 多点广播:MQ 适用于不同类型的应用。其中重要的,也是正在发展中的是”多点广播”应用,即能够将消息发送到多个目标站点 (Destination List)。可以使用一条 MQ 指令将单一消息发送到多个目标站点,并确保为每一站点可靠地提供信息。MQ 不仅提供了多点广播的功能,而且还拥有智能消息分发功能,在将一条消息发送到同一系统上的多个用户时,MQ 将消息的一个复制版本和该系统上接收者的名单发送到目标 MQ 系统。目标 MQ 系统在本地复制这些消息,并将它们发送到名单上的队列,从而尽可能减少网络的传输量。
  3. 发布/订阅 (Publish/Subscribe) 模式:发布/订阅功能使消息的分发可以突破目的队列地理指向的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。
  4. 群集 (Cluster):为了简化点对点通讯模式中的系统配置,MQ 提供 Cluster(群集) 的解决方案。群集类似于一个域 (Domain),群集内部的队列管理器之间通讯时,不需要两两之间建立消息通道,而是采用群集 (Cluster) 通道与其它成员通讯,从而大大简化了系统配置。此外,群集中的队列管理器之间能够自动进行负载均衡,当某一队列管理器出现故障时,其它队列管理器可以接管它的工作,从而大大提高系统的高可靠性。

3、Kafka介绍

  • Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
  • 支持通过Kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

4、Kafka原理

  • Kaf文章来源(Source):https://www.dqzboy.comka 是一个消息系统,原本开发自 LinkedIn,用作 LinkedIn 的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在它已被多家公司作为多种类型的数据管道和消息系统使用。活动流数据是几乎所有站点在对其网站使用情况做报表时都要用到的数据中最常规的部分。活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索情况等内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO 使用率、请求时间、服务日志等等数据),总的来说,运营数据的统计方法种类繁多。

Kafka 专用术语

  • Broker:Kafka 集群包含一个或多个服务器,这种服务器被称为 broker。
  • Topic:每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(物理上不同 Topic 的消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。
  • Partition:Partition 是物理上的概念,每个 Topic 包含一个或多个 Partition。
  • Producer:负责发布消息到 Kafka broker。
  • Consumer:消息消费者,向 Kafka broker 读取消息的客户端。
  • Consumer Group:每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name,若不指定 group name 则属于默认的 group)。

Kafka 交互流程

  • Kafka 是一个基于分布式的消息发布-订阅系统,它被设计成快速、可扩展的、持久的。与其他消息发布-订阅系统类似,Kafka 在主题当中保存消息的信息。生产者向主题写入数据,消费者从主题读取数据。由于 Kafka 文章来源(Source):浅时光博客的特性是支持分布式,同时也是基于分布式的,所以主题也是可以在多个节点上被分区和覆盖的。
  • 信息是一个字节数组,程序员可以在这些字节数组中存储任何对象,支持的数据格式包括 String、JSON、Avro。Kafka 通过给每一个消息绑定一个键值的方式来保证生产者可以把所有的消息发送到指定位置。属于某一个消费者群组的消费者订阅了一个主题,通过该订阅消费者可以跨节点地接收所有与该主题相关的消息,每一个消息只会发送给群组中的一个消费者,所有拥有相同键值的消息都会被确保发给这一个消费者。
  • Kafka 设计中将每一个主题分区当作一个具有顺序排列的日志。同处于一个分区中的消息都被设置了一个唯一的偏移量。Kafka 只会保持跟踪未读消息,一旦消息被置为已读状态,Kafka 就不会再去管理它了。Kafka 的生产者负责在消息队列中对生产出来的消息保证一定时间的占有,消费者负责追踪每一个主题 (可以理解为一个日志通道) 的消息并及时获取它们。基于这样的设计,Kafka 可以在消息队列中保存大量的开销很小的数据,并且支持大量的消费者订阅。

5、Kafka 特性

  • Kafka 的几个特性非常满足我们的需求:可扩展性、数据分区、低延迟、处理大量不同消费者的能力。这个案例我们可以配置在 Kafka 中为登陆和交易配置同一个主题。由于 Kafka 支持在单一主题内的排序,而不是跨主题的排序,所以我们为了保证用户在交易前使用实际的 IP 地址登陆系统,我们采用了同一个主题来存储登陆信息和交易信息。
  • 当用户登陆或者发起交易动作后,负责接收的服务器立即发事件给 Kafka。这里我们采用用户 id 作为消息的主键,具体事件作为值。这保证了同一个用户的所有的交易信息和登陆信息被发送到 Kafka 分区。每一个事件处理服务被当作一个 Kafka 消费者来运行,所有的消费者被配置到了同一个消费者群组,这样每一台服务器从一些 Kafka 分区读取数据,一个分区的所有数据被送到同一个事件处理服务器 (可以与接收服务器不同)。当事件处理服务器从 Kafka 读取了用户交易信息,它可以把该信息加入到保存在本地内存中的历史信息列表里面,这样可以保证事件处理服务器在本地内存中调用用户的历史信息并做出预警,而不需要额外的网络或磁盘开销。

6、与 Flume 的区别

Kafka 与 Flume 很多功能确实是重复的。以下是评估两个系统的一些建议:

  1. Kafka 是一个通用型系统。你可以有许多的生产者和消费者分享多个主题。相反地,Flume 被设计成特定用途的工作,特定地向 HDFS 和 HBase 发送出去。Flume 为了更好地为 HDFS 服务而做了特定的优化,并且与 Hadoop 的安全体系整合在了一起。基于这样的结论,Hadoop 开发商 Cloudera 推荐如果数据需要被多个应用程序消费的话,推荐使用 Kafka,如果数据只是面向 Hadoop 的,可以使用 Flume。
  2. Flume 拥有许多配置的来源 (sources) 和存储池 (sinks)。然后,Kafka 拥有的是非常小的生产者和消费者环境体系,Kafka 社区并不是非常支持这样。如果你的数据来源已经确定,不需要额外的编码,那你可以使用 Flume 提供的 sources 和 sinks,反之,如果你需要准备自己的生产者和消费者,那你需要使用 Kafka。
  3. Flume 可以在拦截器里面实时处理数据。这个特性对于过滤数据非常有用。Kafka 需要一个外部系统帮助处理数据。
  4. 无论是 Kafka 或是 Flume,两个系统都可以保证不丢失数据。然后,Flume 不会复制事件。相应地,即使我们正在使用一个可以信赖的文件通道,如果 Flume agent 所在的这个节点宕机了,你会失去所有的事件访问能力直到你修复这个受损的节点。使用 Kafka 的管道特性不会有这样的问题。
  5. Flume 和 Kafka 可以一起工作的。如果你需要把流式数据从 Kafka 转移到 Hadoop,可以使用 Flume 代理 (agent),将 kafka 当作一个来源 (source),这样可以从 Kafka 读取数据到 Hadoop。你不需要去开发自己的消费者,你可以使用 Flume 与 Hadoop、HBase 相结合的特性,使用 Cloudera Manager 平台监控消费者,并且通过增加过滤器的方式处理数据。

二、Zookeeper集群部署


1、环境介绍

  • 由于Kafka依赖zookeeper管理自身集群,所以需要先安装zookeeper
名称 服务器IP 版本
Kafka-node1 192.168.66.10 kafka_2.13-2.4.0.tgz apache-zookeeper-3.5.6-bin.tar.gz
Kafka-node2 192.168.66.11 kafka_2.13-2.4.0.tgz apache-zookeeper-3.5.6-bin.tar.gz
Kafka-node3 192.168.66.12 kafka_2.13-2.4.0.tgz apache-zookeeper-3.5.6-bin.tar.gz

官网下载地址如下

  • 注意一定要下载代bin的源码包,从目前的最新版本3.5.5开始,带有bin名称的包才是我们想要的下载可以直接使用的里面有编译后的二进制的包,而之前的普通的tar.gz的包里面是只是源码的包无法直接使用。
Kafka集群安装部署和实践-浅时光博客
Kafka集群安装部署和实践-浅时光博客

2、安装JDK

  • 3个节点都需要执行安装JDK,在node1上安装好Kafka和zookeeper后直接拷贝给另外2个节点即可,以下只记录node1节点,另外2个节点类似
[root@kafka-node1 ~]# mkdir -p /opt/soft
[root@kafka-node1 ~]# cd /opt/soft/
[root@kafka-node1 soft]# rpm -ivh jdk-8u231-linux-x64.rpm

3、解压安装

[root@kafka-node1 soft]# tar -xf apache-zookeeper-3.5.6-bin.tar.gz -C /usr/local/
[root@kafka-node1 soft]# cd /usr/local/
[root@kafka-node1 local]# mv apache-zookeeper-3.5.6-bin/ zookeeper

4、修改配置

#三个节点的配置都需要进行创建对应的目录,复制一份新的进行修改;
#配置文件存储路径:zookeeper安装路径/zookeeper/conf

[root@kafka-node1 ~]# cd /usr/local/zookeeper/conf/
[root@kafka-node1 conf]# cp zoo_sample.cfg zoo.cfg

#启动之前先创建数据和日志的存储目录
[root@kafka-node1 conf]# mkdir -p /data/{zkdata,zklog}
[root@kafka-node1 conf]# vim zoo.cfg
Kafka集群安装部署和实践-浅时光博客
2:tickTime=2000
5:initLimit=10
8:syncLimit=5
12:dataDir=/data/zkdata	#数据存储目录
13:datalogDir=/data/zklog	#日志存储目录,默认配置文件中没有
15:clientPort=2181
16:server.1=192.168.66.10:3001:3002	#集群中个节点IP,默认配置文件中没有,需要添加
17:server.2=192.168.66.11:3001:3002
18:server.3=192.168.66.12:3001:3002
  • server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
  • 192.168.66.10为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888
  • dataDirdataLogDir 需要在启动前创建完成
  • clientPort 为 zookeeper的服务端口
  • server.1server.2server.3 为 zk 集群中三个 node 的信息,定义格式为 hostname:port1:port2,其中 port1 原文链接:https://www.dqzboy.com是 node 间通信使用的端口,port2 是node 选举使用的端口,需确保三台主机的这两个端口都是互通的。

5、更改日志配置

  • Zookeeper 默认会将控制台信息输出到启动路径下的 zookeeper.out 中,通过如下方法,可以让 Zookeeper 输出按尺寸切分的日志文件:

5.1:修改conf/log4j.properties文件

将zookeeper.root.logger=INFO, CONSOLE
    改为
zookeeper.root.logger=INFO, ROLLINGFILE

[root@kafka-node1 conf]# vim log4j.properties
Kafka集群安装部署和实践-浅时光博客

5.2:修改bin/zkEnv.sh文件

将ZOO_LOG4J_PROP="INFO,CONSOLE"
    改为
ZOO_LOG4J_PROP="INFO,ROLLINGFILE"

[root@kafka-node1 conf]# cd ../bin/
[root@kafka-node1 bin]# vim zkEnv.sh
Kafka集群安装部署和实践-浅时光博客

6、将程序拷贝给另外的节点

  • 由于配置文件都一样,所有我们在node1上安装配置程序后,通过scp命令将程序拷贝到其他2个节点下即可
[root@kafka-node1 ~]# scp -r /usr/local/zookeeper/ root@192.168.66.11:/usr/local/
[root@kafka-node1 ~]# scp -r /usr/local/zookeeper/ root@192.168.66.12:/usr/local/

7、创建myid文件

  • 在3台节点服务器的dataDir的路径下分别创建一个文件名为myid的文件,文件内容为该 zk 节点的编号
[root@kafka-node1 ~]# cd /data/zkdata/
[root@kafka-node1 zkdata]# ls
[root@kafka-node1 zkdata]# vim myid
[root@kafka-node1 zkdata]# cat myid 
1

[root@kafka-node1 ~]# cd /data/zkdata/
[root@kafka-node2 zkdata]# vim myid
[root@kafka-node2 zkdata]# cat myid 
2

[root@kafka-node3 ~]# cd /data/zkdata/
[root@kafka-node3 zkdata]# vim myid
[root@kafka-node3 zkdata]# cat myid 
3

#启动前把3节点的3001和3002端口放通
firewall-cmd --permanent --zone=public --add-port=3001-3002/tcp
firewall-cmd --reload

8、启动程序

  • 依次启动三台主机上的 zookeeper 服务
[root@kafka-node1 ~]# cd /usr/local/zookeeper/bin/
[root@kafka-node1 bin]# ./zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

9、查看集群状态

  • 3个节点启动完成后,可依次执行如下命令查看集群状态
[root@kafka-node1 ~]# cd /usr/local/zookeeper/bin/
[root@kafka-node1 bin]# ./zkServer.sh status
Kafka集群安装部署和实践-浅时光博客
Kafka集群安装部署和实践-浅时光博客
Kafka集群安装部署和实践-浅时光博客
  • 可以看到192.168.66.12这台机器被zk选择为了leader,10和11为follower,假如11机器down掉后,zk会再次选择集群中的一台主机作为新的的leader,如果集群down掉一半那么zk就无法选择leader了,同时无法正常工作了。

三、kafka集群部署


1、下载源码

  • 虽然kafka本身有自带的zookeeper,但是建议用外部的zk
  • 官网下载地址:
  • 在官网选择下载时请选择二进制下载
Kafka集群安装部署和实践-浅时光博客
Kafka集群安装部署和实践-浅时光博客

2、解压安装

[root@kafka-node1 soft]# tar -xf kafka_2.13-2.4.0.tgz -C /usr/local/

[root@kafka-node1 soft]# cd /usr/local/
[root@kafka-node1 local]# mv kafka_2.13-2.4.0 kafka

3、修改配置

  • 三个节点的配置都需要进行修改,node1上安装好程序后拷贝至另外2个节文章来源(Source):浅时光博客点,然后进行修改配置文件
  • 注意 每台主机的broker.idhost.name不一样
  • 配置文件路径:kafka安装目录/config/server.properties
#备份默认配置,然后将下面的配置写入到新的配置文件中

[root@kafka-node1 ~]# mkdir /data/kafka/	#创建消息目录,3节点都需创建
[root@kafka-node1 ~]# cd /usr/local/kafka/config/
[root@kafka-node1 config]# mv server.properties{,_bak}

[root@kafka-node1 config]# vim server.properties
broker.id=10 
port=9092  
host.name=192.168.66.10
advertised.port=9092
advertised.host.name=192.168.66.10
num.network.threads=3
num.io.threads=8  
log.dirs=/data/kafka
delete.topic.enable=true
socket.send.buffer.bytes=102400  
socket.receive.buffer.bytes=102400  
socket.request.max.bytes=104857600  
num.partitions=1  
log.retention.hours=168  
message.max.byte=5242880   
default.replication.factor=2   
replica.fetch.max.bytes=5242880  
log.segment.bytes=1073741824  
log.retention.check.interval.ms=300000  
log.cleaner.enable=false  
zookeeper.connect=192.168.66.10:2181, 192.168.66.11:2181, 192.168.66.12:2181
zookeeper.connection.timeout.ms=60000

4、配置文件详解

  • broker.id 当前机器在集群中的唯一标识,和zookeeper的myid性质一样  建议用自己主机的后三位  每台(主机)broker不一致
  • port 当前kafka对外提供服务的端口默认是9092  生产者(producer)要以这个端口为准
  • host.name 这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。  (填写本机地址即可)
  • num.network.threads=3 这个是borker进行网络处理的线程数
  • num.io.threads=8         #这个是borker进行I/O处理的线程数
  • log.dirs= 消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
  • socket.send.buffer.bytes=102400 发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
  • socket.receive.buffer.bytes=102400 kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
  • socket.request.max.bytes=104857600   这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
  • num.partitions=1 默认的分区数,一个topic默认1个分区数
  • log.retention.hours=168 默认消息的最大持久化时间,168小时,7天
  • message.max.byte=5242880 消息保存的最大值5M
  • default.replication.factor=2 kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
  • replica.fetch.max.bytes=5242880 取消息的最大直接数
  • log.segment.bytes=1073741824    这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
  • log.retention.check.interval.ms=300000 每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
  • log.cleaner.enable=false   是否启用log压缩,一般不用启用,启用的话可以提高性能
  • zookeeper.connect=192.168.66.10:2181, 192.168.原文链接:https://www.dqzboy.com66.11:2181, 192.168.66.12:2181    设置zookeeper的连接端口 消费的时候要以这个端口消费
  • zookeeper.connection.timeout.ms=60000    修改kafka 配置 连接超时间,这个一定配置上不然在默认的时间会出现连接超时

5、拷贝至另外2节点

[root@kafka-node1 ~]# scp -r /usr/local/kafka/ root@192.168.66.12:/usr/local/

[root@kafka-node1 ~]# scp -r /usr/local/kafka/ root@192.168.66.12:/usr/local/
  • 注意:修改另外2节点kafka的配置文件,修改broker.idhost.nameadvertised.host.name为各节点的IP地址最后三位数字和对应主机IP地址
[root@kafka-node2 ~]# vim /usr/local/kafka/config/server.properties
broker.id=11
host.name=192.168.66.11
advertised.host.name=192.168.66.11

[root@kafka-node2 ~]# vim /usr/local/kafka/config/server.properties
broker.id=12
host.name=192.168.66.12
advertised.host.name=192.168.66.12

# 放通2181、9092端口
[root@kafka-node1 ~]# firewall-cmd --permanent --zone=public --add-port=9092/tcp
[root@kafka-node1 ~]# firewall-cmd --permanent --zone=public --add-port=2181/tcp
[root@kafka-node1 ~]# firewall-cmd --reload

6、三节点依次启动kafka服务

[root@kafka-node1 ~]# cd /usr/local/kafka/bin
[root@kafka-node1 bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

[root@kafka-node2 bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

[root@kafka-node3 bin]# ./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

7、创建Topic

  • 创建一个topic名为test,拥有2个分区,2个副本
[root@kafka-node1 bin]# ./kafka-topics.sh --create --zookeeper 192.168.66.10:2181,192.168.66.11:2181,192.168.66.12:2181 --replication-factor 1 --partitions 1 --topic test

Created topic test.

8、查看zk集群上的 topic列表

[root@kafka-node1 bin]# ./kafka-topics.sh --list --zookeeper 192.168.66.10:2181,192.168.66.11:2181,192.168.66.12:2181

test

9、查看topic描述

[root@kafka-node1 bin]# ./kafka-topics.sh --describe --list --zookeeper 192.168.66.10:2181,192.168.66.11:2181,192.168.66.12:2181 test

Topic: test	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: test	Partition: 0	Leader: 12	Replicas: 12	Isr: 12

10、创建生产者producer

  • 在192.168.66.10主机上执行
[root@kafka-node1 bin]# kafka-console-producer.sh --broker-list 192.168.66.10:9092,192.168.66.11:9092,192.168.66.12:9092 --topic test

11、创建消费者consumer

  • 在192.168.66.12主机上执行
[root@kafka-node3 bin]# ./kafka-console-consumer.sh –zookeeper  192.168.66.10:2181,192.168.66.11:2181,192.168.66.12:2181  --from-beginning  --topic  test
  • 如果看到我们在192.168.66.10主机上 输入的内容在 192.168.66.12主机上会被同步打印出来,这样我们的kafka集群就搭建成功了




关注本站官方微信公众号『精彩程序人生』

扫描左侧二维码关注我们的微信公众帐号,在微信公众帐号中回复【资料】获取IT技术文档。
关注博主不迷路~


本文作者:浅时光博客
原文链接:https://www.dqzboy.com/80.html
版权声明:知识共享署名-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)协议进行许可,转载时请以超链接形式标明文章原始出处和作者信息
免责声明:本站发布的内容(图片、视频和文字)以及一切破解补丁、注册激活和软件解密分析文章仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。访问和下载本站内容,说明您已同意上述条款!


0 条回应

必须 注册 为本站用户, 登录 后才可以发表评论!

    本站已稳定运行: | 耗时 0.276 秒 | 查询 88 次 | 内存 13.10 MB