但行好事
莫论前程❤

kafka分布式安装手册

参考资料:

​ 1 https://www.cnblogs.com/wangb0402/p/6187503.html

​ 2 相关概念说明:

​ 3 包含zookeeper安装:

​ 4 分布式安装与测试详解

安装kafka

  • kafka的安装需要jdk和zookeeper

  • 准备好kafka安装包,官网下载地址:

    <http://kafka.apache.org/downloads.html>

  • 上传kafka包到家目录~

    rz

  • 解压kafka包到/data文件夹下

    tar -zxvf kafka_2.10-0.10.1.0.tgz -C /data
    
  • 目前搭建了三个节点的kafka集群,分别在192.168.156.102,192.168.156.103和192.168.156.104服务器上;

  • 查看配置文件
    进入kafka的config的目录:

    这里写图片描述

    修改配置文件

    • 主要修改server.properties

    参数详解


    broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样

    port=9092 #当前kafka对外提供服务的端口默认是9092

    host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问

    ​ 题。

    num.network.threads=3 #这个是borker进行网络处理的线程数

    num.io.threads=8 #这个是borker进行I/O处理的线程数

    log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的

    ​ num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,

    ​ 新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,

    ​ 那个分区数最少就放那一个

    log.dirs必须保证目录存在,不会根据配置文件自动生成

    socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲

    ​ 区了到达一定的大小后在发送,能提高性能

    socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘

    socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的

    ​ 最大数,这个值不能超过java的堆栈大小

    num.partitions=1 #默认的分区数,一个topic默认1个分区数

    log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天

    message.max.byte=5242880 #消息保存的最大值5M

    default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提

    ​ 供服务

    replica.fetch.max.bytes=5242880 #取消息的最大直接数

    log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过

    ​ 这个值的时候,kafka会新起一个文件

    log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间

    ​ (log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除

    log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能

    zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口


    需要修改的参数


    参数如下:

    # 每台服务器的broker.id都不能相同
    broker.id=0
    
    # 端口
    port=9092
    ==================================
    # 主机名
    host.name=10.0.0.52    
    =========或者===============
    #本机对应的ip地址
    listeners=PLAINTEXT://192.168.32.128:9092
    ===================================
    # 是否可以删除topic
    delete.topic.enable=true
    
    #具体的一些参数
    log.retention.hours=168 
    message.max.byte=5242880
    default.replication.factor=2
    replica.fetch.max.bytes=5242880
    
    # 设置zookeeper集群地址与端口
    zookeeper.connect=192.168.156.102:2181,192.168.156.103:2181,192.168.156.104:2181
    

    启动kafka

    • 启动kafka之前要启动对应的zk集群:

    • 启动服务

      bin/kafka-server-start.sh -daemon config/server.properties
      
    • 检查kafka是否启动

      img

      $jps
      4351 kafka             //kafka进程
      4031 QuorumPeerMain      //zookeeper进程
      
    • 创建topic来验证是否创建成功
      #创建Topic
      
      bin/kafka-topics.sh --create --zookeeper 192.168.7.100:2181 --replication-factor 2 
      --partitions 1 --topic kafkaTest
      
      --replication-factor 2   #复制两份
      
      --partitions 1             #创建1个分区
      
      --topic kafkaTest      #主题为kafkaTest
      
      
    • 以下部门我们在代码部分实现功能:
      '''在一台服务器上创建一个发布者'''    #创建一个broker,发布者
      
      bin/kafka-console-producer.sh --broker-list 192.168.156.101:9092 --topic kafkaTest
      
      '''在一台服务器上创建一个订阅者'''
      
      bin/kafka-console-consumer.sh --zookeeper op-apm-02:2181 --topic kafkaTest --from-beginning
      

      上述代码中的[^192.168.156.101:9092]

      [^192.168.156.101:9092]: kafka所在主机地址

    查看其他命令

    • 查看已经存在的topic

      bin/kafka-topics.sh --list --zookeeper op-apm-02:2181

      就会显示我们创建的所有topic

      zookeeper 后面的参数写一个和写三个效果一样,连接上任意一台zookeeper都是集群

    • 查看topic状态

      bin/kafka-topics.sh --describe --zookeeper op-apm-02:2181 --topic kafkaTest

      下面是显示信息

      Topic: ssports    PartitionCount:1    ReplicationFactor:2    Configs:
      Topic: kafkaTest    Partition: 0    Leader: 1    Replicas: 0,1    Isr: 1
      

      #分区为为1 复制因子为2 他的 shuaige的分区为0

      #Replicas: 0,1 复制的为0,1

          ##         日志说明
      

    kafka的日志是保存在/data/kafka_2.10-0.10.1.0/logs目录下的

    server.log #kafka的运行日志

    state-change.log #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就

    ​ 保存在这里

    controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在

    ​ 游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所

    ​ 有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会被

    ​ 切换为新的controller.

kafka删除相关

​ 从kafka 0.8.2.1开始可以直接删除topic

步骤如下:

  1. 在kafka配置文件中添加删除参数

    delete.topic.enable=true

  2. 利用命令删除需要删除的topic

bin/kafka-topics.sh --delete --zookeeper op-apm-02:2181:2181 --topic kafkaTest

            #         kafka-manager安装指南

Kafka-manager的基本配置和运行

想要查看和管理Kafka,完全使用命令并不方便,我们可以使用雅虎开源的Kafka-manager,GitHub地址如下:

https://github.com/yahoo/kafka-manager

我们可以使用Git或者直接从Releases中下载,此处从下面的地址下载 1.3.3.7 版本:

https://github.com/yahoo/kafka-manager/releases

下载完成后解压。

注意:上面下载的是源码,下载后需要按照后面步骤进行编译。如果觉得麻烦,可以直接从下面地址下载编译好的 kafka-manager-1.3.3.7.zip。
链接: http://pan.baidu.com/s/1qY8sGoO 密码: ye7b

准备

请先准备好Java8环境!

在开始安装官方文档进行配置前,因为Kafka-manager使用的Play框架,为了编译的速度更快,先配置sbt的maven仓库,由于默认仓库速度较慢,因此使用aliyun提供的maven仓库。

通过 cd ~进入当前用户目录,然后通过命令mkdir .sbt创建.sbt目录,进入创建的该目录,使用vi创建repositories文件,编辑内容如下:

[repositories]
local
aliyun: http://maven.aliyun.com/nexus/content/groups/public
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly1234

配置参考:http://www.scala-sbt.org/0.13.2/docs/Detailed-Topics/Library-Management.html#override-all-resolvers-for-all-builds

然后进入解压后的 Kafka-manager 目录,执行下面的命令:

> ./sbt clean dist1

如果之前从来没有用过Play框架,这个步骤会需要一定时间,主要是下载依赖的jar包,使用aliyun的私服可以让这个过程缩短很多。

命令执行完成后,在 target/universal 目录中会生产一个zip压缩包kafka-manager-1.3.3.7.zip。将压缩包拷贝到要部署的目录下解压。

在解压后的conf目录中打开 application.conf文件,修改其中的配置信息,最主要的内容为:

kafka-manager.zkhosts="localhost:2181"

配置为Kafka的 zookeeper 服务器。你还可以通过环境变量ZK_HOSTS配置这个参数值。

其他详细的配置信息参考官方文档:https://github.com/yahoo/kafka-manager

启动

在解压的目录中,可以使用命令启动Kafka-manager。

默认情况下端口为9000,你还可以通过下面的命令指定配置文件和端口:

启动时指定配置文件位置和启动端口号,默认为9000

nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000 &

注: nohup 不挂断的运行 &: 在后台运行

如果没有配置Java的环境变量,还可以通过下面的方式指定Java8的目录:

> bin/kafka-manager -java-home /usr/local/oracle-java-81

启动后,从浏览器打开,显示如下:  

​ 第一次进入web UI要进行kafka cluster的相关配置,根据自己的信息进行配置。

这里写图片描述

配置集群

点击【Cluster】>【Add Cluster】打开如下添加集群的配置界面:

这里写图片描述

输入集群的名字(如Kafka-Cluster-1)和 Zookeeper 服务器地址(如localhost:2181)。

选择最接近的Kafka版本(如0.10.1.0)。

注意:如果没有在 Kafka 中配置过 JMX_PORT,千万不要选择第一个复选框。

Enable JMX Polling

如果选择了该复选框,Kafka-manager 可能会无法启动。

其他broker的配置可以根据自己需要进行配置,默认情况下,点击【保存】时,会提示几个默认值为1的配置错误,需要配置为>=2的值。保存成功后,提示如下。

这里写图片描述

点击【Go to cluster view.】打开当前的集群界面。

这里写图片描述

在集群界面显示了主题和Broker的个数,点击数字可以查看具体的信息。

同时在顶部多了好几个菜单,可以查看集群、Broker、主题等信息。

新建主题

点击【Topic】>【Create】可以方便的创建并配置主题。如下显示。

这里写图片描述

这里只在Topic中输入t1(或其他名字),分区和复制因子这里简单说明一下,详细的可以查看官方介绍。

这里写图片描述

在上图一个Kafka集群中,有两个服务器,每个服务器上都有2个分区。P0,P3可能属于同一个主题,也可能是两个不同的主题。

如果设置的Partitons和Replication Factor都是2,这种情况下该主题的分步就和上图中Kafka集群显示的相同,此时P0,P3是同一个主题的两个分区。P1,P2也是同一个主题的两个分区,Server1和Server2其中一个会作为Leader进行读写操作,另一个通过复制进行同步。

如果设置的Partitons和Replication Factor都是1,这时只会根据算法在某个Server上创建一个分区,可以是P0~4中的某一个(分区都是新建的,不是先存在4个然后从中取1个)。

这里我们都设置为1,点击【Create】然后进入创建的这个主题,显示如下。

这里写图片描述

这里显示了主题的基本信息,在右侧中间的Broker这里显示了当前的Broker Id为0,分区数为1,如果集群中存在多个服务,在创建该主题时,不一定会分配到哪个Broker。

这个页面的右上部分显示了针对主题的一些操作。这些操作相对命令的方式简单和直观了很多。

其他操作

关于kafka-manager的其他功能和操作可以参考官网:https://github.com/yahoo/kafka-manager

笔记参考文献:

​ https://www.cnblogs.com/yinchengzhe/p/5126360.html

nohup command & 命令使用说明

​ https://www.cnblogs.com/baby123/p/6477429.html

赞(1) 打赏
未经允许不得转载:刘鹏博客 » kafka分布式安装手册
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!

 

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏