0%

安装配置Zookeeper和Kafka集群

此Kafka集群为了日志收集这一套,之前安装了Elasticsearch Cluster:参考:安装Elasticsearch 7.4集群(开启集群Auth + Transport SSL)以及 Kibana & Keystore

FELK:

  • F Filebeat | Fluentd | Fluent-bit
  • E Elasticsearch
  • L Logstash
  • K Kibana

FELK兼容性:

环境说明:

  • 一台EC2主机Amazon Linux 2 AMI,本身是基于CentOS 7
  • Zookeeper 3.5.6
  • Kafka 2.0.0

在安装Kafka之前需要安装Zookeeper,如果你只是想测试用Kafka,那么在kafka的包里边已经有了zookeeper了,直接启动即可,如下:

启动Zookeeper:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

然后再启动Kafka:

$ bin/kafka-server-start.sh config/server.properties

OK,一个测试用的Kafka和Zookeeper就安装完成了,但是我们要创建的是Kafka集群+Zookeeper集群,用于生产环境的,所以我们先部署Zookeeper集群:

Zookeeper:

官网:

https://zookeeper.apache.org/index.html

文档:

https://zookeeper.apache.org/doc/r3.5.6/

下载:

http://zookeeper.apache.org/releases.html

Zookeeper集群中ZK节点数一般为基数,当超过半数的ZK节点存活时,那么这个ZK集群就是可提供服务的。对于三台ZK节点组成的集群来说,如果有两台ZK存活,那么就可以提供服务,也就是说三台zk集群只允许宕机一台,五台zk组成的集群最多宕机两台。假定你想要容忍F台机器宕机,而集群仍需要提供服务的话,你需要部署2F+1台服务。

安装:

说明:我这里只有一台EC2主机,所以采用不同端口的方式安装Zookeeper。

IP Port Client Port Connect to Leader Port leader election Port AdminServer hosts 名
172.17.0.87 2181 2888 3888 8081 zk01
172.17.0.87 2182 2889 3889 8082 zk02
172.17.0.87 2183 2890 3890 8083 zk03

配置/etc/hosts,用于集群间通信,所有的zookeeper节点都需要配置。

cat >> /etc/hosts <<EOF
172.17.0.87 zk01 zk02 zk03
EOF

下载

$ wget -c "https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz"
$ cd /data/knner && tar xf /opt/softs/apache-zookeeper-3.5.6-bin.tar.gz
$ ln -s apache-zookeeper-3.5.6-bin zookeeper

目录结构

$ tree zookeeper/ -L 2
zookeeper/
├── bin
│   ├── README.txt
│   ├── zkCleanup.sh
│   ├── zkCli.cmd
│   ├── zkCli.sh
│   ├── zkEnv.cmd
│   ├── zkEnv.sh
│   ├── zkServer.cmd
│   ├── zkServer-initialize.sh
│   ├── zkServer.sh
│   ├── zkTxnLogToolkit.cmd
│   └── zkTxnLogToolkit.sh
├── conf
│   ├── configuration.xsl
│   ├── log4j.properties
│   └── zoo_sample.cfg
├── docs
│   ├── apidocs
│   ├── images
│   ├── index.html
│   ├── javaExample.html
│   ├── recipes.html
│   ├── releasenotes.html
│   ├── skin
│   ├── zookeeperAdmin.html
│   ├── zookeeperHierarchicalQuorums.html
│   ├── zookeeperInternals.html
│   ├── zookeeperJMX.html
│   ├── zookeeperObservers.html
│   ├── zookeeperOtherInfo.html
│   ├── zookeeperOver.html
│   ├── zookeeperProgrammers.html
│   ├── zookeeperQuotas.html
│   ├── zookeeperReconfig.html
│   ├── zookeeperStarted.html
│   └── zookeeperTutorial.html
├── lib
│   ├── audience-annotations-0.5.0.jar
│   ├── commons-cli-1.2.jar
│   ├── jackson-annotations-2.9.10.jar
│   ├── jackson-core-2.9.10.jar
│   ├── jackson-databind-2.9.10.jar
│   ├── javax.servlet-api-3.1.0.jar
│   ├── jetty-http-9.4.17.v20190418.jar
│   ├── jetty-io-9.4.17.v20190418.jar
│   ├── jetty-security-9.4.17.v20190418.jar
│   ├── jetty-server-9.4.17.v20190418.jar
│   ├── jetty-servlet-9.4.17.v20190418.jar
│   ├── jetty-util-9.4.17.v20190418.jar
│   ├── jline-2.11.jar
│   ├── jline-2.11.LICENSE.txt
│   ├── json-simple-1.1.1.jar
│   ├── json-simple-1.1.1.LICENSE.txt
│   ├── log4j-1.2.17.jar
│   ├── log4j-1.2.17.LICENSE.txt
│   ├── netty-buffer-4.1.42.Final.jar
│   ├── netty-buffer-4.1.42.Final.LICENSE.txt
│   ├── netty-codec-4.1.42.Final.jar
│   ├── netty-codec-4.1.42.Final.LICENSE.txt
│   ├── netty-common-4.1.42.Final.jar
│   ├── netty-common-4.1.42.Final.LICENSE.txt
│   ├── netty-handler-4.1.42.Final.jar
│   ├── netty-handler-4.1.42.Final.LICENSE.txt
│   ├── netty-resolver-4.1.42.Final.jar
│   ├── netty-resolver-4.1.42.Final.LICENSE.txt
│   ├── netty-transport-4.1.42.Final.jar
│   ├── netty-transport-4.1.42.Final.LICENSE.txt
│   ├── netty-transport-native-epoll-4.1.42.Final.jar
│   ├── netty-transport-native-epoll-4.1.42.Final.LICENSE.txt
│   ├── netty-transport-native-unix-common-4.1.42.Final.jar
│   ├── netty-transport-native-unix-common-4.1.42.Final.LICENSE.txt
│   ├── slf4j-1.7.25.LICENSE.txt
│   ├── slf4j-api-1.7.25.jar
│   ├── slf4j-log4j12-1.7.25.jar
│   ├── zookeeper-3.5.6.jar
│   └── zookeeper-jute-3.5.6.jar
├── LICENSE.txt
├── NOTICE.txt
├── README.md
└── README_packaging.txt

7 directories, 73 files

Zookeeper 配置详解

摘自:https://zookeeper.apache.org/doc/r3.5.6/zookeeperAdmin.html

Zookeeper 新版本中还支持动态的配置,请参考:https://zookeeper.apache.org/doc/r3.5.6/zookeeperReconfig.html

这里首先列出常用的配置以及说明,然后给出集群配置。

最低配置:

  • tickTime 基本的时间单位,毫秒,2000=2秒;心跳时间和最小的session过期时间通常是2倍的tickTime。

  • dataDir 数据存放目录,包括database snapshots;

  • clientPort 用于和客户端建立连接的端口;

  • secureClientPort:使用SSL侦听安全客户端连接的端口。clientPort指定用于纯文本连接的端口,而secureClientPort指定用于SSL连接的端口。同时指定两者都将启用混合模式,而省略其中任何一个将禁用该模式。请注意,当用户将Zookeeper.serverCnxnFactory,zookeeper.clientCnxnSocket插入为Netty时,将启用SSL功能。

进阶配置:

本节中的配置设置是可选的。您可以使用它们进一步调整ZooKeeper服务器的行为。也可以使用Java系统属性来设置某些属性,通常形式为zookeeper.keyword。可用时,确切的系统属性在下面列出。

  • dataLogDir 来指定transaction log的存放位置,如果不设定,那么transaction log将存放到dataDir 中,生产中建议这两者分开存储在不同的磁盘上以提高zk的吞吐量;

  • globalOutstandingLimit:(Java系统属性:zookeeper.globalOutstandingLimit。)客户端可以比ZooKeeper更快地提交请求,尤其是在有很多客户端的情况下。为了防止ZooKeeper由于排队的请求而耗尽内存,ZooKeeper会限制客户端,以便系统中的未完成请求不超过globalOutstandingLimit。默认限制为1,000。

  • preAllocSize:(Java系统属性:zookeeper.preAllocSize)为了避免查找,ZooKeeper以preAllocSize千字节的块为单位在事务日志文件中分配空间。默认块大小为64M。更改块大小的原因之一是如果更频繁地拍摄快照,则减小块大小。(另请参见snapCount)。

  • snapCount:(Java系统属性:zookeeper.snapCount)ZooKeeper使用快照和事务日志(请考虑预写日志)记录其事务。可以在拍摄快照之前记录在事务日志中的事务数(并且事务日志已滚动) )由snapCount确定。为了防止仲裁中的所有机器同时拍摄快照,当交易日志中的交易数量达到运行时生成的[snapCount / 2 + 1]随机值时,每个ZooKeeper服务器都将拍摄快照。 ,snapCount]范围。默认的snapCount是100,000。

  • maxClientCnxns:(非Java系统属性)将单个客户端(由IP地址标识)可以与ZooKeeper集成中的单个成员建立的并发连接数(在套接字级别)限制为多少。这用于防止某些类的DoS攻击,包括文件描述符耗尽。默认值为60。将其设置为0将完全消除并发连接的限制。

  • clientPortAddress:3.3.0中的新功能:用于侦听客户端连接的地址(ipv4,ipv6或主机名);即客户端尝试连接的地址。这是可选的,默认情况下,我们以这种方式绑定,即可以接受服务器上任何地址/接口/ NIC 到clientPort的任何连接。

  • minSessionTimeout:(非Java系统属性)3.3.0中的新增功能:服务器允许客户端进行协商的最小会话超时(以毫秒为单位)。默认为tickTime的 2倍。

  • maxSessionTimeout:(非Java系统属性)3.3.0中的新增功能:服务器允许客户端进行协商的最大会话超时(以毫秒为单位)。默认为tickTime的 20倍。

  • fsync.warningthresholdms:(Java系统属性:zookeeper.fsync.warningthresholdms)3.3.4中的新增功能:每当事务日志(WAL)中的fsync花费的时间超过此值时,就会向该日志输出警告消息。该值以毫秒为单位指定,默认为1000。只能将其设置为系统属性。

  • autopurge.snapRetainCount:(非Java系统属性)3.4.0中的新增功能启用后,ZooKeeper自动清除功能会将autopurge.snapRetainCount最新快照和相应的事务日志分别保留在dataDirdataLogDir中,并删除其余部分。默认值为3。最小值为3。

  • autopurge.purgeInterval:(非Java系统属性)3.4.0中的新增功能:必须触发清除任务的时间间隔(以小时为单位)。设置为正整数(1或更大)以启用自动清除。预设为0。也就是关闭了自动清除功能。

  • syncEnabled:(Java系统属性:zookeeper.observer.syncEnabled)新的3.4.6,3.5.0:现在的观察员在默认情况下,如参与者登录交易和写入快照磁盘。这减少了重新启动时观察者的恢复时间。设置为“ false”以禁用此功能。默认值为“ true”

  • zookeeper.extendedTypesEnabled:(仅Java系统属性:zookeeper.extendedTypesEnabled)3.5.4中的新增功能:定义为“ true”以启用扩展功能,例如创建TTL节点。默认情况下禁用它们。重要信息:由于内部限制,启用的服务器ID必须小于255。

  • zookeeper.emulate353TTLNodes:(仅Java系统属性:zookeeper.emulate353TTLNodes)3.5.4中的新增功能:由于ZOOKEEPER-2901,在3.5.4 / 3.6.0中不支持在3.5.3版中创建的TTL节点。但是,可通过zookeeper.emulate353TTLNodes系统属性提供解决方法。如果您在ZooKeeper 3.5.3中使用了TTL节点,并且需要维护兼容性,那么除了zookeeper.extendedTypesEnabled之外,请将zookeeper.emulate353TTLNodes设置为“ true” 。注意:由于该错误,服务器ID必须小于或等于127。此外,最大支持TTL值是1099511627775,小于3.5.3中允许的值(1152921504606846975)

  • serverCnxnFactory:(Java系统属性:zookeeper.serverCnxnFactory)指定ServerCnxnFactory的实现。应该将其设置为NettyServerCnxnFactory使用基于TLS的服务器通信。默认值为NIOServerCnxnFactory。

  • snapshot.trust.empty:(仅Java系统属性:zookeeper.snapshot.trust.empty)3.5.6中的新增功能:此属性控制ZooKeeper是否应将丢失的快照文件视为无法恢复的致命状态。设置为true允许ZooKeeper服务器在没有快照文件的情况下恢复。仅应在从旧版本的ZooKeeper(3.4.x,3.5.3之前)进行升级的过程中进行设置,在该版本中,ZooKeeper可能仅具有事务日志文件,而没有快照文件。如果在升级期间设置了该值,我们建议在升级后将该值重新设置为false并重新启动ZooKeeper进程,以便ZooKeeper可以在恢复过程中继续进行正常的数据一致性检查。默认值为false。

集群选项配置

  • lectionAlg:(非Java系统属性)要使用的选举实现。值“ 1”对应于快速领导者选举的未经身份验证的基于UDP的版本,“ 2”对应于快速领导者的基于身份验证的UDP的版本,而“ 3”对应于快速领导者的基于TCP的版本选举。当前,算法3是默认算法。

注意 现在不推荐使用领导人选举1和2的实现。下一个版本中删除它们,届时只有FastLeaderElection将可用。

  • initLimit 超时设定,用于限制zk集群中zk server连接到leader的时间长度,乘以基本事件单位tickTime为具体的时间,比如initLimit=5 那么就是5乘以tickTime=2000等于10000毫秒,即10秒。如果ZooKeeper管理的数据量很大,请根据需要增加此值。

  • leaderServes:leader负责接受客户端连接。默认值为“是”。领导机器协调更新。为了获得较高的更新吞吐量,而以读取吞吐量为代价,可以将领导者配置为不接受客户端并专注于协调。此选项的默认值为“是”,这意味着领导者将接受客户端连接。

注意 当您在一个集合中拥有三个以上的ZooKeeper服务器时,强烈建议打开领导者选择。

  • syncLimit 限制多长和leader的数据同步时间,limits how far out of date a server can be from a leader,乘以基本事件单位tickTime为具体的时间;允许以关注者与ZooKeeper同步的时间,如果追随者远远落后于领导者,他们将被丢弃。

  • server.X=host:port1:port2 指定zk集群中的节点数,一行代表一个节点,X为1-255的整数,用于表示节点id,不能重复,因为在zk集群中,每个节点的这个server list配置都是一样的,当zk启动的时候根据位于dataDir目录中的myid文件中的数(X)来指定当前节点需要监听的两个端口,以及和其他zk节点通信的端口。注意myid文件只能包含一个X字符,不能包含其他。port1:用于followers和leader建立连接所用;port2:用于zk间选举leader。

  • group.x = nnnnn [:nnnnn]:(无Java系统属性)启用分层仲裁。“ x”是组标识符,“ =”后的数字对应于服务器标识符。分配的左侧是用冒号分隔的服务器标识符列表。请注意,组必须是不相交的,并且所有组的并集必须是ZooKeeper集合。您会在这里找到一个例子

    基本思想很简单。首先,我们将服务器分为几组,并为每个组添加一行,列出组成该组的服务器。接下来,我们必须为每个服务器分配权重。

    以下示例显示如何配置一个由三组组成的系统,每组三台服务器,并且我们为每个服务器分配权重1:

    group.1=1:2:3
    group.2=4:5:6
    group.3=7:8:9

    weight.1=1
    weight.2=1
    weight.3=1
    weight.4=1
    weight.5=1
    weight.6=1
    weight.7=1
    weight.8=1
    weight.9=1

    在运行系统时,一旦我们获得了来自大多数非零权重组的多数票,便能够形成法定人数。权重为零的组将被丢弃,并且在形成仲裁时不会被考虑。看这个例子,一旦我们有来自两个不同组中每个组的至少两台服务器的投票,便能够形成法定人数。

  • weight.x = nnnnn:(无Java系统属性)与“ group”一起使用,它在形成仲裁时为服务器分配权重。该值对应于投票时服务器的权重。ZooKeeper的某些部分需要投票,例如领导人选举和原子广播协议。默认情况下,服务器的权重为1。如果配置定义了组,但没有定义权重,则将为所有服务器分配值1。您会在这里找到一个例子

  • cnxTimeout:(Java系统属性:饲养员cnxTimeout)设置为打开领导人选举通知连接超时值。仅在您使用选举算法3时适用。

    默认值为5秒。

  • standaloneEnabled:(无Java系统属性)3.5.0中的新增功能:设置为false时,可以以复制模式启动单个服务器,可以由观察者运行单个参与者,并且群集可以重新配置为一个节点,然后从一个节点。为了向后兼容,默认值为true。可以使用QuorumPeerConfig的setStandaloneEnabled方法或通过将“ standaloneEnabled = false”或“ standaloneEnabled = true”添加到服务器的配置文件中来进行设置。

  • reconfigEnabled:(无Java系统属性)3.5.3的新增功能:此功能控制启用或禁用动态重新配置功能。启用此功能后,假设用户被授权执行此类操作,则用户可以通过ZooKeeper客户端API或通过ZooKeeper命令行工具执行重新配置操作。禁用此功能后,包括超级用户在内的任何用户都无法执行重新配置。任何重新配置的尝试都将返回错误。可以将“ reconfigEnabled”选项设置为“ reconfigEnabled = false”“ reconfigEnabled = true”到服务器的配置文件,或使用QuorumPeerConfig的setReconfigEnabled方法。默认值为false。如果存在,则该值在整个集合中的每个服务器上都应保持一致。在某些服务器上将该值设置为true,而在其他服务器上将该值设置为false,则将导致不一致的行为,具体取决于哪个服务器被选为领导者。如果领导者的设置为“ reconfigEnabled = true”,则集成将启用重新配置功能。如果领导者的设置为“ reconfigEnabled = false”,则该集成将禁用重新配置功能。因此,建议在集成服务器中的“ reconfigEnabled”具有一致的值。

  • 4lw.commands.whitelist:(Java系统属性:zookeeper.4lw.commands.whitelist3.5.3的新增功能:用户要使用的逗号分隔的四个字母单词命令列表。必须在此列表中输入有效的四个字母的命令,否则ZooKeeper服务器将不会启用该命令。默认情况下,白名单仅包含zkServer.sh使用的“ srvr”命令。默认情况下,其余四个字母单词命令是禁用的。这是启用stat,ruok,conf和isro命令同时禁用其余四个字母单词命令的配置示例:

    4lw.commands.whitelist=stat, ruok, conf, isro

    如果确实需要默认情况下启用所有四个字母词命令,则可以使用星号选项,这样您就不必在列表中一个接一个地添加每个命令。例如,这将启用所有四个字母词命令:

    4lw.commands.whitelist=*
  • tcpKeepAlive:(Java系统属性:zookeeper.tcpKeepAlive3.5.4的新增功能:将此属性设置为true可以在仲裁成员用于执行选举的套接字上设置TCP keepAlive标志。当存在可能破坏仲裁成员的网络基础结构时,这将允许仲裁成员之间的连接保持连接状态。对于长时间运行或空闲的连接,某些NAT和防火墙可能会终止或丢失状态。启用此选项取决于操作系统级别的设置才能正常工作,有关更多信息,请检查操作系统有关TCP keepalive的选项。默认为false

  • lectionPortBindRetry:(仅Java系统属性:zookeeper.electionPortBindRetry)该属性设置当Zookeeper服务器无法绑定领导者选举端口时的最大重试次数。此类错误可以是临时的且可恢复的(例如ZOOKEEPER-3320中描述的DNS问题),也可以是不可重试的(例如已在使用的端口)。
    如果出现暂时性错误,此属性可以提高Zookeeper服务器的可用性并帮助其自我恢复。默认值3.在容器环境中,尤其是在Kubernetes中,应增加该值或将其设置为0(无限重试),以解决与DNS名称解析有关的问题。

加密,身份验证,授权选项

本节中的选项允许控制服务执行的加密/认证/授权。

  • DigestAuthenticationProvider.superDigest:(Java系统属性:zookeeper.DigestAuthenticationProvider.superDigest)默认情况下,此功能处于**禁用状态** 。3.2中的新增功能使ZooKeeper集成管理员以“超级”用户身份访问znode层次结构。特别是,对于通过身份验证为超级的用户,不会进行ACL检查。org.apache.zookeeper.server.auth.DigestAuthenticationProvider可用于生成superDigest,并使用一个参数“ super:提供生成的“超级”:作为启动集合的每个服务器时的系统属性值。(从ZooKeeper客户端对ZooKeeper服务器进行身份验证时,传递“ digest”方案和“ super:authdata”方案)“。请注意,摘要身份验证将纯文本身份验证数据传递给服务器,因此,最好仅在本地主机(而不是通过网络)或加密连接上使用此身份验证方法。
  • X509AuthenticationProvider.superUser:( Java系统属性:zookeeper.X509AuthenticationProvider.superUser)SSL支持的方法,使ZooKeeper集成管理员能够以“超级”用户身份访问znode层次结构。当此参数设置为X500主体名称时,只有具有该主体的经过身份验证的客户端才能够绕过ACL检查,并具有对所有znode的完全特权。
  • zookeeper.superUser:(Java系统属性:zookeeper.superUser)类似于zookeeper.X509AuthenticationProvider.superUser,但对于基于SASL的登录名是通用的。它存储可以作为“超级”用户访问znode层次结构的用户名。
  • ssl.authProvider:(Java系统属性:zookeeper.ssl.authProvider)指定用于安全客户端身份验证的org.apache.zookeeper.auth.X509AuthenticationProvider的子类。这在不使用JKS的证书密钥基础结构中很有用。可能需要扩展javax.net.ssl.X509KeyManagerjavax.net.ssl.X509TrustManager才能从SSL堆栈中获得所需的行为。要将ZooKeeper服务器配置为使用自定义提供程序进行身份验证,请为自定义AuthenticationProvider选择方案名称,然后设置属性zookeeper.authProvider。[方案]自定义实现的全限定类名。这会将提供程序加载到ProviderRegistry中。然后设置此属性zookeeper.ssl.authProvider = [scheme],该提供程序将用于安全身份验证。
  • sslQuorum:(Java系统属性:zookeeper.sslQuorum3.5.5的新增功能:启用加密的仲裁通信。默认值为false
  • ssl.keyStore.location和ssl.keyStore.passwordssl.quorum.keyStore.locationssl.quorum.keyStore.password:(Java系统属性:zookeeper.ssl.keyStore.locationzookeeper.ssl.keyStore.passwordzookeeper (.ssl.quorum.keyStore.locationzookeeper.ssl.quorum.keyStore.password3.5.5中的新增功能:指定Java密钥库的文件路径,该路径包含用于客户端和仲裁TLS连接的本地凭据以及密码解锁文件。
  • ssl.keyStore.typessl.quorum.keyStore.type:(Java系统属性:zookeeper.ssl.keyStore.typezookeeper.ssl.quorum.keyStore.type3.5.5中的新增功能:指定客户端和客户端的文件格式。法定密钥库。值:JKS,PEM,PKCS12或为空(按文件名检测)。
    默认值:空
  • ssl.trustStore.locationssl.trustStore.passwordssl.quorum.trustStore.locationssl.quorum.trustStore.password:(Java系统属性:zookeeper.ssl.trustStore.locationzookeeper.ssl.trustStore.passwordzookeeper .ssl.quorum.trustStore.locationzookeeper.ssl.quorum.trustStore.password3.5.5中的新增功能:指定Java信任库的文件路径,该路径包含用于客户端和仲裁TLS连接的远程凭据以及密码。解锁文件。
  • ssl.trustStore.typessl.quorum.trustStore.type:(Java系统属性:zookeeper.ssl.trustStore.typezookeeper.ssl.quorum.trustStore.type3.5.5中的新增功能:指定客户端和客户端的文件格式。仲裁信任库。值:JKS,PEM,PKCS12或为空(按文件名检测)。
    默认值:空
  • ssl.protocolssl.quorum.protocol:(Java系统属性:zookeeper.ssl.protocolzookeeper.ssl.quorum.protocol3.5.5中的新增功能:指定要在客户端和仲裁TLS协商中使用的协议。默认值:TLSv1.2
  • ssl.enabledProtocolsssl.quorum.enabledProtocols:(Java系统属性:zookeeper.ssl.enabledProtocolszookeeper.ssl.quorum.enabledProtocols3.5.5中的新增功能指定客户端和仲裁TLS协商中的已启用协议。默认值:protocol属性值
  • ssl.ciphersuitesssl.quorum.ciphersuites:(Java系统属性:zookeeper.ssl.ciphersuiteszookeeper.ssl.quorum.ciphersuites3.5.5中的新增功能指定在客户端和仲裁TLS协商中使用的已启用密码套件。默认值:启用的密码套件取决于所使用的Java运行时版本。
  • ssl.context.supplier.classssl.quorum.context.supplier.class:(Java系统属性:zookeeper.ssl.context.supplier.classzookeeper.ssl.quorum.context.supplier.class3.5.5中的新功能:指定用于在客户端和仲裁SSL通信中创建SSL上下文的类。这使您可以使用自定义SSL上下文并实现以下方案:
    1. 使用使用PKCS11或类似工具加载的硬件密钥库。
    2. 您无权访问软件密钥库,但可以从其容器中检索已经构造的SSLContext。默认值:空
  • ssl.hostnameVerificationssl.quorum.hostnameVerification:(Java系统属性:zookeeper.ssl.hostnameVerificationzookeeper.ssl.quorum.hostnameVerification3.5.5中的新增功能:指定是否在客户端和仲裁TLS协商过程中启用主机名验证。仅出于测试目的而建议禁用它。默认值:true
  • ssl.crlssl.quorum.crl:(Java系统属性:zookeeper.ssl.crlzookeeper.ssl.quorum.crl3.5.5中的新增功能:指定是否在客户端和仲裁TLS协议中启用证书吊销列表。默认值:false
  • ssl.ocspssl.quorum.ocsp:(Java系统属性:zookeeper.ssl.ocspzookeeper.ssl.quorum.ocsp3.5.5中的新增功能:指定是否在客户端和仲裁TLS协议中启用了“在线证书状态协议”。默认值:false
  • ssl.clientAuthssl.quorum.clientAuth:(Java系统属性:zookeeper.ssl.clientAuthzookeeper.ssl.quorum.clientAuth3.5.5中的新增功能: TBD
  • ssl.handshakeDetectionTimeoutMillisssl.quorum.handshakeDetectionTimeoutMillis:(Java系统属性:zookeeper.ssl.handshakeDetectionTimeoutMilliszookeeper.ssl.quorum.handshakeDetectionTimeoutMillis3.5.5中的新功能: TBD

性能调整选项

3.5.0中的新增功能:已对多个子系统进行了改进,以提高读取吞吐量。这包括NIO通信子系统和请求处理管道(Commit Processor)的多线程。NIO是默认的客户端/服务器通信子系统。它的线程模型包括1个接收器线程,1-N个选择器线程和0-M个套接字I / O工作线程。在请求处理管道中,系统可以配置为一次处理多个读取请求,同时保持相同的一致性保证(相同的会话写入后读取)。提交处理器线程模型包括1个主线程和0-N个工作线程。

默认值旨在最大化专用ZooKeeper机器上的读取吞吐量。这两个子系统都需要有足够数量的线程才能达到峰值读取吞吐量。

  • zookeeper.nio.numSelectorThreads:(仅Java系统属性:zookeeper.nio.numSelectorThreads3.5.0中的新增功能: NIO选择器线程数。至少需要1个选择器线程。对于大量的客户端连接,建议使用多个选择器。默认值为sqrt(cpu核心数/ 2)。
  • zookeeper.nio.numWorkerThreads:(仅Java系统属性:zookeeper.nio.numWorkerThreads3.5.0中的新增功能: NIO工作线程数。如果配置了0个工作线程,则选择器线程直接执行套接字I / O。默认值为cpu核心数的2倍。
  • zookeeper.commitProcessor.numWorkerThreads:(仅Java系统属性:zookeeper.commitProcessor.numWorkerThreads3.5.0中的新增功能:提交处理器工作线程数。如果配置了0个工作线程,则主线程将直接处理请求。默认值为cpu核心数。
  • znode.container.checkIntervalMs:(仅Java系统属性)3.5.1中的新增功能:每次检查候选容器和ttl节点的时间间隔(以毫秒为单位)。默认值为“ 60000”。
  • znode.container.maxPerMinute:(仅Java系统属性)3.5.1中的新增功能:每分钟可以删除的容器和ttl节点的最大数量。这样可以防止在删除容器时放牧。默认值为“ 10000”。

AdminServer配置

3.5.0中的新增功能:以下选项用于配置AdminServer

  • admin.enableServer:(Java系统属性:zookeeper.admin.enableServer)设置为“ false”以禁用AdminServer。默认情况下,AdminServer是启用的。
  • admin.serverAddress:(Java系统属性:zookeeper.admin.serverAddress)嵌入式Jetty服务器侦听的地址。默认为0.0.0.0。
  • admin.serverPort:(Java系统属性:zookeeper.admin.serverPort)嵌入式Jetty服务器侦听的端口。默认为8080
  • admin.idleTimeout:(Java系统属性:zookeeper.admin.idleTimeout)设置连接在发送或接收数据之前可以等待的最大空闲时间(以毫秒为单位)。默认为30000毫秒
  • admin.commandURL:(Java系统属性:zookeeper.admin.commandURL)相对于根URL列出和发布命令的URL。默认为“ /commands”。

Zookeeper 集群配置

了解的具体的配置的作用,下面来配置ZK集群。

zk01 配置

$ cat zk01/conf/zoo.cfg
# The number of milliseconds of each tick 时间单位为2s
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take。此时是20s
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement 此时是10s
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/data/zkdata/zk01

dataLogDir=/data/zklog/zk01
# the port at which the clients will connect 只监听的172.17.0.87这个地址上,端口是2181,多网卡很有用
clientPort=2181
clientPortAddress=172.17.0.87
# the maximum number of client connections.
# increase this if you need to handle more clients 客户端最大连接数
maxClientCnxns=1000
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir 保留3个snapshots
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature 每隔3h执行一次
autopurge.purgeInterval=3

# cluster 集群配置
server.1=zk01:2888:3888
server.2=zk02:2887:3887
server.3=zk03:2886:3886

# 4lw 可以通过telnet,nc,执行的4个字符的命令白名单,我这里全部允许。生产要注意
4lw.commands.whitelist=*

# admin server 开启admin server,可以代替4lw,通过http://IP:clientPort/knner/xxxx 方式访问,返回json格式。
admin.enableServer=true
admin.serverAddress=172.17.0.87
admin.serverPort=8081
admin.commandURL=/knner # 默认是/commands

zk02 配置

$ cat zk02/conf/zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/data/zkdata/zk02

dataLogDir=/data/zklog/zk02
# the port at which the clients will connect
clientPort=2182
clientPortAddress=172.17.0.87
# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=1000
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=3

# cluster
server.1=zk01:2888:3888
server.2=zk02:2887:3887
server.3=zk03:2886:3886

# 4lw
4lw.commands.whitelist=*

# admin server
admin.enableServer=true
admin.serverAddress=172.17.0.87
admin.serverPort=8082
admin.commandURL=/knner

zk03 配置

$ cat zk03/conf/zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/data/zkdata/zk03

dataLogDir=/data/zklog/zk03
# the port at which the clients will connect
clientPort=2183
clientPortAddress=172.17.0.87
# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=1000
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=3

# cluster
server.1=zk01:2888:3888
server.2=zk02:2887:3887
server.3=zk03:2886:3886

# 4lw
4lw.commands.whitelist=*

# admin server
admin.enableServer=true
admin.serverAddress=172.17.0.87
admin.serverPort=8083
admin.commandURL=/knner

创建所需的目录:

# 创建dataDir:
$ mkdir -p /data/zkdata/zk0{1..3} -pv
mkdir: created directory ‘/data/zkdata’
mkdir: created directory ‘/data/zkdata/zk01’
mkdir: created directory ‘/data/zkdata/zk02’
mkdir: created directory ‘/data/zkdata/zk03’

# 创建dataLogDir:
$ mkdir -p /data/zklog/zk0{1..3} -pv
mkdir: created directory ‘/data/zklog’
mkdir: created directory ‘/data/zklog/zk01’
mkdir: created directory ‘/data/zklog/zk02’
mkdir: created directory ‘/data/zklog/zk03’

创建myid文件:

$ echo 1 > /data/zkdata/zk01/myid
$ echo 2 > /data/zkdata/zk02/myid
$ echo 3 > /data/zkdata/zk03/myid

启动

配置完毕,我们准备启动ZK节点,首先查看启动命令的帮助:

# 查看帮助
$ ./zk01/bin/zkServer.sh
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/wanghk/zk01/bin/../conf/zoo.cfg
Usage: ./zk01/bin/zkServer.sh [--config <conf-dir>] {start|start-foreground|stop|restart|status|print-cmd}

# 我们先用print-cmd来打印一下启动的命令
# 注意--config 时指定配置文件所在的文件夹,配置文件必须名为zoo.cfg
$ ./zk01/bin/zkServer.sh --config zk01/conf print-cmd
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: zk01/conf/zoo.cfg
"java" -Dzookeeper.log.dir="/data/knner/zk01/bin/../logs" -Dzookeeper.log.file="zookeeper-ec2-user-server-test01.dev.awsbj.cn.log" -Dzookeeper.root.logger="INFO,CONSOLE" -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' -cp "/data/knner/zk01/bin/../zookeeper-server/target/classes:/data/knner/zk01/bin/../build/classes:/data/knner/zk01/bin/../zookeeper-server/target/lib/*.jar:/data/knner/zk01/bin/../build/lib/*.jar:/data/knner/zk01/bin/../lib/zookeeper-jute-3.5.6.jar:/data/knner/zk01/bin/../lib/zookeeper-3.5.6.jar:/data/knner/zk01/bin/../lib/slf4j-log4j12-1.7.25.jar:/data/knner/zk01/bin/../lib/slf4j-api-1.7.25.jar:/data/knner/zk01/bin/../lib/netty-transport-native-unix-common-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-transport-native-epoll-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-transport-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-resolver-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-handler-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-common-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-codec-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-buffer-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/log4j-1.2.17.jar:/data/knner/zk01/bin/../lib/json-simple-1.1.1.jar:/data/knner/zk01/bin/../lib/jline-2.11.jar:/data/knner/zk01/bin/../lib/jetty-util-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-servlet-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-server-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-security-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-io-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-http-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/javax.servlet-api-3.1.0.jar:/data/knner/zk01/bin/../lib/jackson-databind-2.9.10.jar:/data/knner/zk01/bin/../lib/jackson-core-2.9.10.jar:/data/knner/zk01/bin/../lib/jackson-annotations-2.9.10.jar:/data/knner/zk01/bin/../lib/commons-cli-1.2.jar:/data/knner/zk01/bin/../lib/audience-annotations-0.5.0.jar:/data/knner/zk01/bin/../zookeeper-*.jar:/data/knner/zk01/bin/../zookeeper-server/src/main/resources/lib/*.jar:zk01/conf:" -Xmx1000m -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain "zk01/conf/zoo.cfg" > "/data/knner/zk01/bin/../logs/zookeeper-ec2-user-server-test01.dev.awsbj.cn.out" 2>&1 < /dev/null

#我们发现-Xmx1000m,默认是1G的大小,配置位置:
$ vim bin/zkEnv.sh # 文章最后
# default heap for zookeeper server
ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-1000}"
export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS"

# default heap for zookeeper client
ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-256}"
export CLIENT_JVMFLAGS="-Xmx${ZK_CLIENT_HEAP}m $CLIENT_JVMFLAGS"

# 当然也可以使用环境变量,注意单位值m
$ ZK_SERVER_HEAP=512 ./zk01/bin/zkServer.sh --config zk01/conf print-cmd
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: zk01/conf/zoo.cfg
...... -Xmx512m

正式启动:

$ ./zk01/bin/zkServer.sh --config zk01/conf start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: zk01/conf/zoo.cfg
Starting zookeeper ... STARTED

查看进程以及端口监听:

$ ps -ef|grep zk01
ec2-user 30504 1 1 13:08 pts/5 00:00:01 java -Dzookeeper.log.dir=/data/knner/zk01/bin/../logs -Dzookeeper.log.file=zookeeper-ec2-user-server-test01.dev.awsbj.cn.log -Dzookeeper.root.logger=INFO,CONSOLE -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError=kill -9 %p -cp /data/knner/zk01/bin/../zookeeper-server/target/classes:/data/knner/zk01/bin/../build/classes:/data/knner/zk01/bin/../zookeeper-server/target/lib/*.jar:/data/knner/zk01/bin/../build/lib/*.jar:/data/knner/zk01/bin/../lib/zookeeper-jute-3.5.6.jar:/data/knner/zk01/bin/../lib/zookeeper-3.5.6.jar:/data/knner/zk01/bin/../lib/slf4j-log4j12-1.7.25.jar:/data/knner/zk01/bin/../lib/slf4j-api-1.7.25.jar:/data/knner/zk01/bin/../lib/netty-transport-native-unix-common-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-transport-native-epoll-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-transport-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-resolver-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-handler-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-common-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-codec-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-buffer-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/log4j-1.2.17.jar:/data/knner/zk01/bin/../lib/json-simple-1.1.1.jar:/data/knner/zk01/bin/../lib/jline-2.11.jar:/data/knner/zk01/bin/../lib/jetty-util-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-servlet-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-server-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-security-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-io-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-http-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/javax.servlet-api-3.1.0.jar:/data/knner/zk01/bin/../lib/jackson-databind-2.9.10.jar:/data/knner/zk01/bin/../lib/jackson-core-2.9.10.jar:/data/knner/zk01/bin/../lib/jackson-annotations-2.9.10.jar:/data/knner/zk01/bin/../lib/commons-cli-1.2.jar:/data/knner/zk01/bin/../lib/audience-annotations-0.5.0.jar:/data/knner/zk01/bin/../zookeeper-*.jar:/data/knner/zk01/bin/../zookeeper-server/src/main/resources/lib/*.jar:zk01/conf: -Xmx1001m -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain zk01/conf/zoo.cfg

# 监听端口
$ netstat -lnutp|grep 30504
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6 0 0 172.17.0.87:3888 :::* LISTEN 30504/java
tcp6 0 0 172.17.0.87:8081 :::* LISTEN 30504/java
tcp6 0 0 :::46075 :::* LISTEN 30504/java
tcp6 0 0 172.17.0.87:2181 :::* LISTEN 30504/java

启动其余两台zk:

$ ./zk02/bin/zkServer.sh --config zk02/conf start
$ ./zk03/bin/zkServer.sh --config zk03/conf start

检查zk集群状态:

# zk01 此时为follower
$ ./zk01/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/knner/zk01/bin/../conf/zoo.cfg
Client port found: 2181. Client address: 172.17.0.87.
Mode: follower
# zk02 此时为leader
$ ./zk02/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/knner/zk02/bin/../conf/zoo.cfg
Client port found: 2182. Client address: 172.17.0.87.
Mode: leader
# zk03 此时为follower
$ ./zk03/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/knner/zk03/bin/../conf/zoo.cfg
Client port found: 2183. Client address: 172.17.0.87.
Mode: follower

集群可用性测试

# 停止zk02节点
$ ./zk02/bin/zkServer.sh stop
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/knner/zk02/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED

# 确认进程已经退出
$ ps -ef|grep zk02
ec2-user 1364 13046 0 13:24 pts/5 00:00:00 grep --color=auto zk02

# 查看zk01 节点状态,follower
$ ./zk01/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/knner/zk01/bin/../conf/zoo.cfg
Client port found: 2181. Client address: 172.17.0.87.
Mode: follower

# 查看zk03节点状态,此时它为leader
$ ./zk03/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/knner/zk03/bin/../conf/zoo.cfg
Client port found: 2183. Client address: 172.17.0.87.
Mode: leader

# 通过zkCli.sh 命令,连接测试:
$ ./zk01/bin/zkCli.sh -server 172.17.0.87:2181
[zk: 172.17.0.87:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
addauth scheme auth
close
config [-c] [-w] [-s]
connect host:port
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
delete [-v version] path
deleteall path
delquota [-n|-b] path
get [-s] [-w] path
getAcl [-s] path
history
listquota path
ls [-s] [-w] [-R] path
ls2 path [watch]
printwatches on|off
quit
reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
redo cmdno
removewatches path [-c|-d|-a] [-l]
rmr path
set [-s] [-v version] path data
setAcl [-s] [-v version] [-R] path acl
setquota -n|-b val path
stat [-w] path
sync path

# 此时再把zk03给停掉:
$ ./zk03/bin/zkServer.sh stop
ps -ef|grep zk03

# 再次查看zk01 状态,提示不再运行了
$ ./zk01/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/knner/zk01/bin/../conf/zoo.cfg
Client port found: 2181. Client address: 172.17.0.87.
Error contacting service. It is probably not running.

# 我们查看进程还是存在的,集群中已经挡掉了多数的zk节点,则无法提供服务了
$ ps -ef|grep zk
ec2-user 30504 1 0 13:08 pts/5 00:00:02 java -Dzookeeper.log.dir=/data/knner/zk01/bin/../logs -Dzookeeper.log.file=zookeeper-ec2-user-server-test01.dev.awsbj.cn.log -Dzookeeper.root.logger=INFO,CONSOLE -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError=kill -9 %p -cp /data/knner/zk01/bin/../............

# 然后再次启动zk02
$ ./zk02/bin/zkServer.sh --config zk02/conf start

# 再次查看zk01,zk02状态,集群又恢复正常了。
$ ./zk01/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/knner/zk01/bin/../conf/zoo.cfg
Client port found: 2181. Client address: 172.17.0.87.
Mode: leader
[ec2-user@test01 knner]$ ./zk02/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/knner/zk02/bin/../conf/zoo.cfg
Client port found: 2182. Client address: 172.17.0.87.
Mode: follower

经过测试,Zookeeper集群中多数zk节点存活,那么zk集群服务就是可用的。

监管Zookeeper进程

ZK服务被设计成”快速失败”,这意味着如果发生不可恢复的错误,它将关闭,退出进程。由于ZooKeeper服务群集高度可靠,这意味着尽管服务器可能宕机,但群集仍处于活动状态并正在处理请求。此外,由于群集是“自我修复”的,因此一旦失败的服务器重新启动,将自动重新加入该集合,而无需任何手动交互。

有一个监控程序,例如daemontoolsSMFSupervisor,管理ZooKeeper服务器可确保该进程确实退出时将自动重启并迅速重新加入集群。

还建议将ZooKeeper服务器进程配置为在发生OutOfMemoryError *时终止并转储其堆。这是通过分别在Linux和Windows上使用以下参数启动JVM来实现的。ZooKeeper *附带zkServer.shzkServer.cmd脚本设置了这些选项。

-XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p'

"-XX:+HeapDumpOnOutOfMemoryError" "-XX:OnOutOfMemoryError=cmd /c taskkill /pid %%%%p /t /f"

ZooKeeper命令

四小写字母命令

ZooKeeper响应少量命令。每个命令由四个字母组成。您可以在客户端端口通过telnet或nc向ZooKeeper发出命令。

三个更有趣的命令:“ stat”提供有关服务器和连接的客户端的一些常规信息,而“ srvr”和“ cons”分别提供有关服务器和连接的扩展详细信息。

3.5.3中的新增功能:四个字母词必须在使用前明确列出白色。有关详细信息,请参考群集配置部分中描述的4lw.commands.whitelist。展望未来,不推荐使用四个字母词,请改用AdminServer

  • conf3.3.0中的新增功能:打印有关服务配置的详细信息。

  • 缺点3.3.0中的新增功能:列出了连接到该服务器的所有客户端的完整连接/会话详细信息。包括有关已接收/已发送的数据包数量,会话ID,操作等待时间,最后执行的操作等信息。

  • crst3.3.0中的新增功能:重置所有连接的连接/会话统计信息。

  • dump:列出未完成的会话和临时节点。这仅适用于领导者。

  • envi:打印有关服务环境的详细信息

  • ruok:测试服务器是否以非错误状态运行。如果服务器正在运行,它将以imok响应。否则,它将完全不响应。响应“ imok”不一定表示服务器已加入仲裁,只是服务器进程处于活动状态并绑定到指定的客户端端口。使用“ stat”获取有关状态仲裁和客户端连接信息的详细信息。

  • srst:重置服务器统计信息。

  • srvr3.3.0中的新功能:列出服务器的完整详细信息。

  • stat:列出服务器和连接的客户端的简要详细信息。

  • wchs3.3.0中的新增功能:列出有关服务器监视的简要信息。

  • wchc3.3.0中的新增功能:按会话列出有关服务器监视的详细信息。这将输出具有相关监视(路径)的会话(连接)列表。请注意,根据手表的数量,此操作可能会很昂贵(即影响服务器性能),请小心使用。

  • dirs3.5.1中的新增功能:以字节为单位显示快照和日志文件的总大小

  • wchp3.3.0中的新增功能:按路径列出有关服务器监视的详细信息。这将输出具有关联会话的路径(znode)列表。请注意,根据手表的数量,此操作可能会很昂贵(即影响服务器性能),请小心使用。

  • mntr3.4.0中的新增功能:输出可用于监视集群运行状况的变量列表。

    $ echo mntr | nc localhost 2185
    zk_version 3.4.0 zk_avg_latency 0 zk_max_latency 0 zk_min_latency 0 zk_packets_received 70 zk_packets_sent 69 zk_num_alive_connections 1 zk_outstanding_requests 0 zk_server_state leader zk_znode_count 4 zk_watch_count 0 zk_ephemerals_count 0 zk_approximate_data_size 27 zk_followers 4 - only exposed by the Leader zk_synced_followers 4 - only exposed by the Leader zk_pending_syncs 0 - only exposed by the Leader zk_open_file_descriptor_count 23 - only available on Unix platforms zk_max_file_descriptor_count 1024 - only available on Unix platforms zk_last_proposal_size 23 zk_min_proposal_size 23 zk_max_proposal_size 64

输出与Java属性格式兼容,并且内容可能会随时间变化(添加了新键)。您的脚本应该期待更改。注意:一些密钥是特定于平台的,而某些密钥仅由Leader导出。输出包含具有以下格式的多行:

key \t value
  • isro3.4.0中的新增功能:测试服务器是否以只读模式运行。如果服务器处于只读模式,则服务器将以“ ro”响应,如果不是处于只读模式,则服务器将以“ rw”响应。

  • gtmk:以十进制格式获取当前的跟踪掩码,作为64位带符号的long值。请参阅stmk以获取可能值的说明。

  • stmk:设置当前的跟踪掩码。跟踪掩码是64位,其中每个位启用或禁用服务器上特定类别的跟踪日志记录。必须将Log4J配置为TRACE首先启用级别,才能查看跟踪日志记录消息。跟踪掩码的位对应于以下跟踪日志记录类别。

    跟踪掩码位值
    0b0000000000 未使用,保留以备将来使用。
    0b0000000010 记录客户端请求,但不包括ping请求。
    0b0000000100 未使用,保留以备将来使用。
    0b0000001000 记录客户端ping请求。
    0b0000010000 记录从作为当前领导者的仲裁对等方收到的数据包,但不包括ping请求。
    0b0000100000 记录客户端会话的添加,删除和验证。
    0b0001000000 记录监视事件到客户端会话的传递。
    0b0010000000 记录从作为当前领导者的仲裁对等方收到的ping数据包。
    0b0100000000 未使用,保留以备将来使用。
    0b1000000000 未使用,保留以备将来使用。

    64位值中的所有其余位均未使用,并保留以供将来使用。通过计算记录值的按位或,可以指定多个跟踪日志记录类别。默认跟踪掩码为0b0100110010。因此,默认情况下,跟踪日志记录包括客户端请求,从领导者接收的数据包和会话。要设置其他跟踪掩码,请发送一个包含stmk四个字母的单词的请求,后跟一个跟踪掩码,表示为一个64位带符号的long值。本示例使用Perl pack函数构造一个跟踪掩码,该掩码启用上述所有跟踪日志记录类别,并将其转换为具有big-endian字节顺序的64位有符号long值。stmk使用netcat 将结果附加到服务器并发送到服务器。服务器以十进制格式响应新的跟踪掩码。

    $ perl -e "print 'stmk', pack('q>', 0b0011111010)" | nc localhost 2181 250

这是ruok命令的示例:

$ echo ruok | nc 127.0.0.1 5111
imok

四小写字母命令测试

安装命令工具telnet,nc:

$ sudo yum install telnet nc -y
Installed:
nmap-ncat.x86_64 2:6.40-13.amzn2 telnet.x86_64 1:0.17-64.amzn2.0.1

测试:

# 不知道telnet如何使用
$ telnet 172.17.0.87 2181
Trying 172.17.0.87...
Connected to 172.17.0.87.
Escape character is '^]'.

conf # 输入的命令
Connection closed by foreign host.

# 建议用nc命令,连接任何一个zk节点都可以的,端口用clientPort的端口
$ echo conf |nc 172.17.0.87 2181
clientPort=2181
secureClientPort=-1
dataDir=/data/zkdata/zk01/version-2
dataDirSize=67108880
dataLogDir=/data/zklog/zk01/version-2
dataLogSize=1625
tickTime=2000
maxClientCnxns=1000
minSessionTimeout=4000
maxSessionTimeout=40000
serverId=1
initLimit=10
syncLimit=5
electionAlg=3
electionPort=3888
quorumPort=2888
peerType=0
membership:
server.1=zk01:2888:3888:participant
server.2=zk02:2887:3887:participant
server.3=zk03:2886:3886:participant

$ echo cons |nc 172.17.0.87 2181
/172.17.0.87:52088[0](queued=0,recved=1,sent=0)

$ echo crst |nc 172.17.0.87 2181
Connection stats reset.

$ echo dump |nc 172.17.0.87 2181
SessionTracker dump:
Session Sets (0)/(0):
ephemeral nodes dump:
Sessions with Ephemerals (0):
Connections dump:
Connections Sets (1)/(1):
1 expire at Fri Dec 06 13:50:21 CST 2019:
4ip: /172.17.0.87:52154 sessionId: 0x0

$ echo envi |nc 172.17.0.87 2181
Environment:
zookeeper.version=3.5.6-c11b7e26bc554b8523dc929761dd28808913f091, built on 10/08/2019 20:18 GMT
host.name=node-1
java.version=1.8.0_201
java.vendor=Oracle Corporation
java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.201.b09-0.amzn2.x86_64/jre
java.class.path=/data/knner/zk01/bin/../zookeeper-server/target/classes:/data/knner/zk01/bin/../build/classes:/data/knner/zk01/bin/../zookeeper-server/target/lib/*.jar:/data/knner/zk01/bin/../build/lib/*.jar:/data/knner/zk01/bin/../lib/zookeeper-jute-3.5.6.jar:/data/knner/zk01/bin/../lib/zookeeper-3.5.6.jar:/data/knner/zk01/bin/../lib/slf4j-log4j12-1.7.25.jar:/data/knner/zk01/bin/../lib/slf4j-api-1.7.25.jar:/data/knner/zk01/bin/../lib/netty-transport-native-unix-common-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-transport-native-epoll-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-transport-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-resolver-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-handler-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-common-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-codec-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-buffer-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/log4j-1.2.17.jar:/data/knner/zk01/bin/../lib/json-simple-1.1.1.jar:/data/knner/zk01/bin/../lib/jline-2.11.jar:/data/knner/zk01/bin/../lib/jetty-util-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-servlet-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-server-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-security-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-io-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-http-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/javax.servlet-api-3.1.0.jar:/data/knner/zk01/bin/../lib/jackson-databind-2.9.10.jar:/data/knner/zk01/bin/../lib/jackson-core-2.9.10.jar:/data/knner/zk01/bin/../lib/jackson-annotations-2.9.10.jar:/data/knner/zk01/bin/../lib/commons-cli-1.2.jar:/data/knner/zk01/bin/../lib/audience-annotations-0.5.0.jar:/data/knner/zk01/bin/../zookeeper-*.jar:/data/knner/zk01/bin/../zookeeper-server/src/main/resources/lib/*.jar:zk01/conf:
java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.io.tmpdir=/tmp
java.compiler=<NA>
os.name=Linux
os.arch=amd64
os.version=4.14.77-81.59.amzn2.x86_64
user.name=ec2-user
user.home=/home/ec2-user
user.dir=/data/knner
os.memory.free=211MB
os.memory.max=891MB
os.memory.total=240MB

# are you ok? I'm ok.
$ echo ruok |nc 172.17.0.87 2181
imok

$ echo srst |nc 172.17.0.87 2181
Server stats reset.

$ echo srvr |nc 172.17.0.87 2181
Zookeeper version: 3.5.6-c11b7e26bc554b8523dc929761dd28808913f091, built on 10/08/2019 20:18 GMT
Latency min/avg/max: 0/0/0
Received: 1
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x300000000
Mode: leader
Node count: 5
Proposal sizes last/min/max: -1/-1/-1

$ echo stat |nc 172.17.0.87 2181
Zookeeper version: 3.5.6-c11b7e26bc554b8523dc929761dd28808913f091, built on 10/08/2019 20:18 GMT
Clients:
/172.17.0.87:52312[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0/0
Received: 2
Sent: 2
Connections: 1
Outstanding: 0
Zxid: 0x300000000
Mode: leader
Node count: 5
Proposal sizes last/min/max: -1/-1/-1

$ echo wchs |nc 172.17.0.87 2181
0 connections watching 0 paths
Total watches:0

$ echo dirs |nc 172.17.0.87 2181
datadir_size: 67108880
logdir_size: 1625

$ echo wchc |nc 172.17.0.87 2181

$ echo wchp |nc 172.17.0.87 2181


$ echo mntr |nc 172.17.0.87 2181
zk_version 3.5.6-c11b7e26bc554b8523dc929761dd28808913f091, built on 10/08/2019 20:18 GMT
zk_avg_latency 0
zk_max_latency 0
zk_min_latency 0
zk_packets_received 7
zk_packets_sent 7
zk_num_alive_connections 1
zk_outstanding_requests 0
zk_server_state leader
zk_znode_count 5
zk_watch_count 0
zk_ephemerals_count 0
zk_approximate_data_size 161
zk_open_file_descriptor_count 63
zk_max_file_descriptor_count 65536
zk_followers 2
zk_synced_followers 2
zk_pending_syncs 0
zk_last_proposal_size -1
zk_max_proposal_size -1
zk_min_proposal_size -1

$ echo isro |nc 172.17.0.87 2181
rw

$ echo gtmk |nc 172.17.0.87 2181
306

AdminServer

3.5.0中的新增功能: AdminServer是嵌入式Jetty服务器,为四个字母单词命令提供HTTP接口。默认情况下,服务器在端口8080上启动,并且通过访问URL/commands/[command name]发出命令,例如http://localhost:8080/commands/stat。命令响应作为JSON返回。与原始协议不同,命令不限于四个字母的名称,命令可以具有多个名称。例如,stmk也可以称为set_trace_mask。要查看所有可用命令的列表,请将浏览器指向URL/commands(例如,http://localhost:8080/commands)。请参阅 AdminServer配置选项,以了解如何更改端口和URL。

AdminServer默认情况下处于启用状态,但可以通过以下任一方式禁用:

  • 将zookeeper.admin.enableServer系统属性设置为false。
  • 从类路径中删除Jetty。(如果您想覆盖ZooKeeper的码头依赖,则此选项很有用。)

请注意,如果AdminServer被禁用,则TCP四字母词接口仍然可用。

AdminServer 测试

注意:默认的path路径是/commands,而我这里在配置文件中将其改为/knner了,所以:

# 格式:172.17.0.87:8081/knner/xxxx
$ curl -XGET 172.17.0.87:8081/knner/stat
{
"version" : "3.5.6-c11b7e26bc554b8523dc929761dd28808913f091, built on 10/08/2019 20:18 GMT",
"read_only" : false,
"server_stats" : {
"packets_sent" : 10,
"packets_received" : 9,
"max_latency" : 0,
"min_latency" : 0,
"fsync_threshold_exceed_count" : 0,
"client_response_stats" : {
"last_buffer_size" : -1,
"min_buffer_size" : -1,
"max_buffer_size" : -1
},
"provider_null" : false,
"num_alive_client_connections" : 0,
"server_state" : "leader",
"outstanding_requests" : 0,
"avg_latency" : 0,
"data_dir_size" : 67108880,
"log_dir_size" : 1625,
"last_processed_zxid" : 12884901888
},
"client_response" : {
"last_buffer_size" : -1,
"min_buffer_size" : -1,
"max_buffer_size" : -1
},
"proposal_stats" : {
"last_buffer_size" : -1,
"min_buffer_size" : -1,
"max_buffer_size" : -1
},
"node_count" : 5,
"connections" : [ ],
"command" : "stats",
"error" : null
}

# 查看命令:
$ curl -XGET 172.17.0.87:8081/knner
<a href="/knner/configuration">configuration</a>
<br/>
<a href="/knner/connection_stat_reset">connection_stat_reset</a>
<br/>
<a href="/knner/connections">connections</a>
<br/>
<a href="/knner/dirs">dirs</a>
<br/>
<a href="/knner/dump">dump</a>
<br/>
<a href="/knner/environment">environment</a>
<br/>
<a href="/knner/get_trace_mask">get_trace_mask</a>
<br/>
<a href="/knner/is_read_only">is_read_only</a>
<br/>
<a href="/knner/monitor">monitor</a>
<br/>
<a href="/knner/ruok">ruok</a>
<br/>
<a href="/knner/server_stats">server_stats</a>
<br/>
<a href="/knner/set_trace_mask">set_trace_mask</a>
<br/>
<a href="/knner/stat_reset">stat_reset</a>
<br/>
<a href="/knner/stats">stats</a>
<br/>
<a href="/knner/watch_summary">watch_summary</a>
<br/>
<a href="/knner/watches">watches</a>
<br/>
<a href="/knner/watches_by_path">watches_by_path</a>
<br/>

# 可以写成4字母的,或者全称
$ curl -XGET 172.17.0.87:8081/knner/environment
{
"zookeeper.version" : "3.5.6-c11b7e26bc554b8523dc929761dd28808913f091, built on 10/08/2019 20:18 GMT",
"host.name" : "node-1",
.....
}

Troubleshooting 故障排查

  • 服务器由于文件损坏 而无法启动:由于ZooKeeper服务器的事务日志中的某些文件损坏,服务器可能无法读取其数据库并无法启动。在加载ZooKeeper数据库时,您将看到一些IOException。在这种情况下,请确保您集合中的所有其他服务器都已启动并正常工作。在命令端口上使用stat命令查看它们是否状况良好。确认集成中的所有其他服务器都已启动后,可以继续清理损坏的服务器的数据库。删除datadir/version-2datalogdir/version-2/中的所有文件。重新启动服务器。

  • 恢复-TxnLogToolkit 恢复带有损坏CRC的事务日志条目

    TxnLogToolkit是ZooKeeper附带的命令行工具,能够恢复带有损坏CRC的事务日志条目。

    在不使用任何命令行参数或不使用参数的情况下运行它-h,--help,它将输出以下帮助页面:

    $ bin/zkTxnLogToolkit.sh
    usage: TxnLogToolkit [-dhrv] txn_log_file_name
    -d,--dump Dump mode. Dump all entries of the log file. (this is the default)
    -h,--help Print help message
    -r,--recover Recovery mode. Re-calculate CRC for broken entries.
    -v,--verbose Be verbose in recovery mode: print all entries, not just fixed ones.
    -y,--yes Non-interactive mode: repair all CRC errors without asking

    默认行为是安全的:将给定事务日志文件的条目转储到屏幕上:(与using -d,--dump参数相同)

    $ bin/zkTxnLogToolkit.sh log.100000001
    ZooKeeper Transactional Log File with dbid 0 txnlog format version 2
    4/5/18 2:15:58 PM CEST session 0x16295bafcc40000 cxid 0x0 zxid 0x100000001 createSession 30000
    CRC ERROR - 4/5/18 2:16:05 PM CEST session 0x16295bafcc40000 cxid 0x1 zxid 0x100000002 closeSession null # 这里有错误
    4/5/18 2:16:05 PM CEST session 0x16295bafcc40000 cxid 0x1 zxid 0x100000002 closeSession null
    4/5/18 2:16:12 PM CEST session 0x26295bafcc90000 cxid 0x0 zxid 0x100000003 createSession 30000
    4/5/18 2:17:34 PM CEST session 0x26295bafcc90000 cxid 0x0 zxid 0x200000001 closeSession null
    4/5/18 2:17:34 PM CEST session 0x16295bd23720000 cxid 0x0 zxid 0x200000002 createSession 30000
    4/5/18 2:18:02 PM CEST session 0x16295bd23720000 cxid 0x2 zxid 0x200000003 create '/andor,#626262,v{s{31,s{'world,'anyone}}},F,1
    EOF reached after 6 txns.

    # 没有错误的
    $ ./zk01/bin/zkTxnLogToolkit.sh /data/zklog/zk01/version-2/log.200000001
    /usr/bin/java
    ZooKeeper Transactional Log File with dbid 0 txnlog format version 2
    12/6/19 1:27:35 PM CST session 0x1028877ea6a0000 cxid 0x0 zxid 0x200000001 createSession 30000
    12/6/19 1:27:42 PM CST session 0x1028877ea6a0000 cxid 0x1 zxid 0x200000002 closeSession
    EOF reached after 2 txns.

    上面的事务日志文件的第二个条目中有一个CRC错误。在转储模式下,该工具包仅将此信息打印到屏幕上,而无需触摸原始文件。在恢复模式(-r,--recover标志)下,原始文件仍然保持不变,所有事务将被复制到后缀为“ .fixed”的新txn日志文件中。如果它与原始txn条目不匹配,它将重新计算CRC值并复制计算出的值。默认情况下,该工具以交互方式工作:遇到CRC错误时,它会要求进行确认。

    $ bin/zkTxnLogToolkit.sh -r log.100000001
    ZooKeeper Transactional Log File with dbid 0 txnlog format version 2
    CRC ERROR - 4/5/18 2:16:05 PM CEST session 0x16295bafcc40000 cxid 0x1 zxid 0x100000002 closeSession null
    Would you like to fix it (Yes/No/Abort) ?

    回答“ 是”意味着新计算的CRC值将输出到新文件。No表示将复制原始CRC值。中止将中止整个操作并退出。(在这种情况下,“。fixed”将不会被删除,而是处于半完成状态:仅包含已经处理过的条目,或者如果操作在第一个条目中止,则仅包含标头。)

    $ bin/zkTxnLogToolkit.sh -r log.100000001
    ZooKeeper Transactional Log File with dbid 0 txnlog format version 2
    CRC ERROR - 4/5/18 2:16:05 PM CEST session 0x16295bafcc40000 cxid 0x1 zxid 0x100000002 closeSession null
    Would you like to fix it (Yes/No/Abort) ? y
    EOF reached after 6 txns.
    Recovery file log.100000001.fixed has been written with 1 fixed CRC error(s)

    恢复的默认行为是保持沉默:仅将具有CRC错误的条目打印到屏幕上。可以使用-v,--verbose参数打开详细模式以查看所有记录。可以使用-y,--yes参数关闭交互模式。在这种情况下,所有CRC错误都将在新的事务文件中修复。

ZK datadir datalogdir 目录结构

$ tree /data/zkdata/zk01/
/data/zkdata/zk01/
├── myid # 人类可读的ASCII文本中包含一个表示服务器ID的整数
├── version-2 # 保存数据树的模糊快照
│   ├── acceptedEpoch
│   ├── currentEpoch
│   ├── snapshot.0
│   ├── snapshot.100000000
│   └── snapshot.200000002
└── zookeeper_server.pid

1 directory, 7 files
# 每个ZooKeeper服务器都有一个唯一的ID。此id在两个地方使用:myid文件和配置文件。该身份识别码文件标识服务器对应于给定的数据目录。配置文件列出了由服务器ID标识的每个服务器的联系信息。当ZooKeeper服务器实例启动时,它会从myid文件中读取其ID ,然后使用该ID从配置文件中读取,并查找其应侦听的端口。

# 从某种意义上说,存储在数据目录中的快照文件是模糊快照,即在ZooKeeper服务器获取快照的过程中,正在对数据树进行更新。快照文件名的后缀是zxid快照开始时最后提交的事务的ZooKeeper事务ID。因此,快照包括在快照进行过程中发生的数据树更新的子集。因此,快照可能不对应于实际存在的任何数据树,因此,我们将其称为模糊快照。尽管如此,ZooKeeper仍可以使用此快照进行恢复,因为它利用了其更新的幂等性质。通过针对模糊快照重播事务日志,ZooKeeper可以在日志末尾获取系统状态。

$ tree /data/zklog/zk01/
/data/zklog/zk01/
└── version-2
└── log.200000001

1 directory, 1 file

# 日志目录包含ZooKeeper事务日志。在进行任何更新之前,ZooKeeper确保将代表更新的事务写入非易失性存储。当写入当前日志文件的事务数达到(可变)阈值时,将启动一个新的日志文件。使用影响快照频率的相同参数计算阈值(请参见上面的snapCount)。日志文件的后缀是写入该日志的第一个zxid。
# 在独立的ZooKeeper服务器和复制的ZooKeeper服务器的不同配置之间,快照和日志文件的格式不会更改。因此,您可以将这些文件从运行中的复制的ZooKeeper服务器拉到带有独立ZooKeeper服务器的开发计算机上,以进行故障排除。

zkCli 使用测试

客户端命名行:bin/zkCli.sh

查看命令帮助:

$ ./zk01/bin/zkCli.sh help
/usr/bin/java
Connecting to localhost:2181 # 默认连接的地址
2019-12-06 14:42:10,584 [myid:] - INFO [main:Environment@109] - Client environment:zookeeper.version=3.5.6-c11b7e26bc554b8523dc929761dd28808913f091, built on 10/08/2019 20:18 GMT
2019-12-06 14:42:10,588 [myid:] - INFO [main:Environment@109] - Client environment:host.name=node-1
2019-12-06 14:42:10,589 [myid:] - INFO [main:Environment@109] - Client environment:java.version=1.8.0_201
2019-12-06 14:42:10,591 [myid:] - INFO [main:Environment@109] - Client environment:java.vendor=Oracle Corporation
2019-12-06 14:42:10,591 [myid:] - INFO [main:Environment@109] - Client environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.201.b09-0.amzn2.x86_64/jre
2019-12-06 14:42:10,591 [myid:] - INFO [main:Environment@109] - Client environment:java.class.path=/data/knner/zk01/bin/../zookeeper-server/target/classes:/data/knner/zk01/bin/../build/classes:/data/knner/zk01/bin/../zookeeper-server/target/lib/*.jar:/data/knner/zk01/bin/../build/lib/*.jar:/data/knner/zk01/bin/../lib/zookeeper-jute-3.5.6.jar:/data/knner/zk01/bin/../lib/zookeeper-3.5.6.jar:/data/knner/zk01/bin/../lib/slf4j-log4j12-1.7.25.jar:/data/knner/zk01/bin/../lib/slf4j-api-1.7.25.jar:/data/knner/zk01/bin/../lib/netty-transport-native-unix-common-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-transport-native-epoll-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-transport-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-resolver-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-handler-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-common-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-codec-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/netty-buffer-4.1.42.Final.jar:/data/knner/zk01/bin/../lib/log4j-1.2.17.jar:/data/knner/zk01/bin/../lib/json-simple-1.1.1.jar:/data/knner/zk01/bin/../lib/jline-2.11.jar:/data/knner/zk01/bin/../lib/jetty-util-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-servlet-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-server-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-security-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-io-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/jetty-http-9.4.17.v20190418.jar:/data/knner/zk01/bin/../lib/javax.servlet-api-3.1.0.jar:/data/knner/zk01/bin/../lib/jackson-databind-2.9.10.jar:/data/knner/zk01/bin/../lib/jackson-core-2.9.10.jar:/data/knner/zk01/bin/../lib/jackson-annotations-2.9.10.jar:/data/knner/zk01/bin/../lib/commons-cli-1.2.jar:/data/knner/zk01/bin/../lib/audience-annotations-0.5.0.jar:/data/knner/zk01/bin/../zookeeper-*.jar:/data/knner/zk01/bin/../zookeeper-server/src/main/resources/lib/*.jar:/data/knner/zk01/bin/../conf:
2019-12-06 14:42:10,592 [myid:] - INFO [main:Environment@109] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2019-12-06 14:42:10,592 [myid:] - INFO [main:Environment@109] - Client environment:java.io.tmpdir=/tmp
2019-12-06 14:42:10,592 [myid:] - INFO [main:Environment@109] - Client environment:java.compiler=<NA>
2019-12-06 14:42:10,592 [myid:] - INFO [main:Environment@109] - Client environment:os.name=Linux
2019-12-06 14:42:10,592 [myid:] - INFO [main:Environment@109] - Client environment:os.arch=amd64
2019-12-06 14:42:10,593 [myid:] - INFO [main:Environment@109] - Client environment:os.version=4.14.77-81.59.amzn2.x86_64
2019-12-06 14:42:10,593 [myid:] - INFO [main:Environment@109] - Client environment:user.name=ec2-user
2019-12-06 14:42:10,593 [myid:] - INFO [main:Environment@109] - Client environment:user.home=/home/ec2-user
2019-12-06 14:42:10,593 [myid:] - INFO [main:Environment@109] - Client environment:user.dir=/data/knner
2019-12-06 14:42:10,593 [myid:] - INFO [main:Environment@109] - Client environment:os.memory.free=233MB
2019-12-06 14:42:10,595 [myid:] - INFO [main:Environment@109] - Client environment:os.memory.max=240MB
2019-12-06 14:42:10,596 [myid:] - INFO [main:Environment@109] - Client environment:os.memory.total=240MB
2019-12-06 14:42:10,599 [myid:] - INFO [main:ZooKeeper@868] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@3b764bce
2019-12-06 14:42:10,606 [myid:] - INFO [main:X509Util@79] - Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
2019-12-06 14:42:10,613 [myid:] - INFO [main:ClientCnxnSocket@237] - jute.maxbuffer value is 4194304 Bytes
2019-12-06 14:42:10,623 [myid:] - INFO [main:ClientCnxn@1653] - zookeeper.request.timeout value is 0. feature enabled=

# 帮助:
# 格式:zkCli.sh -server host:port cmd args
$ ./zk01/bin/zkCli.sh -server 172.17.0.87:2181
[zk: 172.17.0.87:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
addauth scheme auth
close
config [-c] [-w] [-s]
connect host:port
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
delete [-v version] path
deleteall path
delquota [-n|-b] path
get [-s] [-w] path
getAcl [-s] path
history
listquota path
ls [-s] [-w] [-R] path
ls2 path [watch]
printwatches on|off
quit
reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
redo cmdno
removewatches path [-c|-d|-a] [-l]
rmr path
set [-s] [-v version] path data
setAcl [-s] [-v version] [-R] path acl
setquota -n|-b val path
stat [-w] path
sync path

例如:
$ ./zk01/bin/zkCli.sh -server 172.17.0.87:2181 ls /
/usr/bin/java
Connecting to 172.17.0.87:2181
......省略
[zookeeper]

Kafka:

官网:

https://kafka.apache.org/

文档:

https://kafka.apache.org/documentation

下载:

https://kafka.apache.org/downloads

介绍

参考:https://kafka.apache.org/intro

Kafka是一个分布式的流处理平台(distributed streaming platform);

具有三个关键功能:

  • 发布和订阅记录流,类似与消息队列
  • 以容错的持久方式存储记录流
  • 处理记录流

几个概念:

  • Kafka可以在一个或者多个可以跨多个数据中心的服务器上作为集群运行
  • Kafka集群将记录流存储在成为Topic的类别中
  • 每个记录由一个键,一个值和一个时间戳组成

四个核心API:

  • Producer API 允许应用程序发布记录流到一个或者多个topic主题中。
  • Consumer API 运行应用程序订阅一个或者多个Topic主题,并处理所产生的对他们记录的数据流。
  • Streams API 允许应用程序充当流处理器,从一个或多个主题消耗的输入流,并产生一个输出流至一个或多个输出的Topic。
  • Connector API 允许构建和运行可重复使用的生产者或消费者连接Kafka 主题,以现有的应用程序或数据系统。例如,关系数据库的连接器可能会捕获对表的所有更改。

在Kafka中,客户端和服务器之间的通信是通过简单,高性能,与语言无关的TCP协议完成的

主题和日志 Topics and Logs

Kafka提供的记录主题的核心抽象。

主题是将记录发布到的类别或订阅源名称。Kafka中的主题始终是多用户的;也就是说,一个主题可以有零个,一个或多个消费者来订阅写入该主题的数据。

对于每个主题,Kafka集群都会维护一个分区日志,如下所示:

每个分区都是有序的,不变的记录序列,这些记录连续地附加到结构化的提交日志中。每个分区中的记录都分配有一个称为偏移的顺序ID号,该ID 唯一地标识分区中的每个记录。

Kafka集群使用可配置的保留期限持久地保留所有已发布的记录(无论是否已使用它们)。例如,如果将保留策略设置为两天,则在发布记录后的两天内,该记录可供使用,之后将被丢弃以释放空间。Kafka的性能相对于数据大小实际上是恒定的,因此长时间存储数据不是问题。

实际上,基于每个消费者保留的唯一元数据是该消费者在日志中的偏移量或位置。此偏移量由使用者控制:通常,使用者在读取记录时会线性地推进其偏移量,但是实际上,由于位置是由使用者控制的,因此它可以按喜欢的任何顺序使用记录。例如,使用者可以重置到较旧的偏移量以重新处理过去的数据,或者跳到最近的记录并从“现在”开始使用。

这些功能的组合意味着Kafka的消费者非常便宜-他们来来去去对集群或其他消费者没有太大影响。例如,您可以使用我们的命令行工具来“尾部”任何主题的内容,而无需更改任何现有使用者所消耗的内容。

日志中的分区有多种用途。首先,它们允许日志扩展到超出单个服务器所能容纳的大小。每个单独的分区都必须适合托管它的服务器,但是一个主题可能有很多分区,因此它可以处理任意数量的数据。其次,它们充当并行性的单元–稍有更多。

分布

日志的分区分布在Kafka群集中的服务器上,每个服务器处理数据并要求共享分区。每个分区都在可配置数量的服务器之间复制,以实现容错功能。

每个分区都有一个充当“领导者”的服务器和零个或多个充当“跟随者”的服务器。领导者处理对分区的所有读写请求,而跟随者则被动地复制领导者。如果领导者失败,则跟随者之一将自动成为新领导者。每个服务器充当其某些分区的领导者,而充当其他分区的跟随者,因此群集中的负载得到了很好的平衡。

地址复制

Kafka MirrorMaker为您的集群提供地理复制支持。使用MirrorMaker,可以在多个数据中心或云区域之间复制消息。您可以在主动/被动方案中使用它进行备份和恢复。或在主动/主动方案中将数据放置在离您的用户更近的位置,或支持数据位置要求。

生产者 Producers

生产者将数据发布到他们选择的主题。生产者负责选择将哪个记录分配给主题中的哪个分区。可以以循环方式完成此操作,仅是为了平衡负载,也可以根据某些语义分区功能(例如基于记录中的某些键)进行此操作。

消费者 Consumers

消费者使用消费者组名称标记自己,并且发布到主题的每条记录都会传递到每个订阅消费者组中的一个消费者实例。使用者实例可以在单独的进程中或在单独的机器上。

如果所有使用者实例都具有相同的使用者组,那么将在这些使用者实例上有效地平衡记录。

如果所有使用者实例具有不同的使用者组,则每条记录将广播到所有使用者进程

一个由两台服务器组成的Kafka群集,其中包含四个带有两个使用者组的分区(P0-P3)。使用者组A有两个使用者实例,组B有四个。

但是,更常见的是,我们发现主题具有少量的消费者组,每个“逻辑订户”一个。每个组均由许多使用者实例组成,以实现可伸缩性和容错能力。这无非就是发布-订阅语义,其中订阅者是消费者的集群而不是单个进程。

在Kafka中实现消耗的方式是通过在消费者实例上划分日志中的分区,以便每个实例在任何时间点都是分区“公平份额”的排他消费者。Kafka协议动态处理了维护组成员身份的过程。如果新实例加入该组,它们将接管该组其他成员的某些分区;如果实例死亡,则其分区将分配给其余实例。

卡夫卡只提供了记录的总订单中的一个分区,而不是一个主题的不同分区之间。对于大多数应用程序,按分区排序以及按键对数据进行分区的能力就足够了。但是,如果您需要记录的总订单量,则可以通过只有一个分区的主题来实现,尽管这将意味着每个使用者组只有一个使用者进程。

例如:

一个Topic中,分区为4,

  • 那么你启动一个消费者组中有4个消费者实例,那么每个消费者实例都会分配一个分区,各自的消费者实例只会消费分配给自己的分区,当一个消费者实例宕机了,那么分配给该实例的分区会被分配给剩余的一个消费者实例,此时会变成一个消费者实例消费两个分区,另外两个消费者各自消费一个分区。

  • 当你启动一个消费者组中包含2个消费者实例,那么每个消费者实例会分配到两个分区。

    • 当有一个实例宕机了,那么此时只有一个消费者实例消费全部的4个分区
    • 当有一个新的实例加入到这个消费者组中,此时会将之前两个消费者实例分配的两个分区中拿一个给新的消费者实例。此时一个消费者实例消费两个分区,另外两个消费者实例各自消费一个分区。
  • 当你启动一个消费者组中有5个消费者实例,那么四个消费者实例各自分配一个分区进行消费,另一个消费者实例处于闲置状态,只有当其他消费者实例宕机了,他才会消费这个宕机实例所分配的分区,起到备份的作用。

消息复制与分区

  • 一个主题可以包含多个分区。kafka无法在整个主体范围内保证消息的顺序,但是可以保证消息在单个分区内的顺序
  • Kafka在物理上把Topic分成一个或多个Partition,每个Partiton在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。每个分片分区日志都放在Kafka日志目录下的自己的文件夹中。这类文件夹的名称由主题名称(由破折号(-)和分区id组成)组成。由于典型的文件夹名称长度不能超过255个字符,因此对主题名称的长度有限制。我们假设分区的数量永远不会超过100,000个。因此,主题名不能超过249个字符。这在文件夹名中只留下了足够的空间来放置一个dash和一个可能有5位数字的长分区id。
  • Kafka通过分区设计可以实现数据冗余和伸缩,分区可以分布在不同的服务器上,以此为高并发提供可能。
  • 消息复制指的是每一个分区都可能会有一个或者多个副本,其中有一个副本会被推选为领袖节点,其余的落选的为从节点。其中领袖节点将会跟踪与其保持同步的副本列表,该列表称为ISR(In-Sync Replica)
  • 分区的数量有几个影响:
    • 每个分区必须被一个服务器处理,不可能一个分区同时被服务器两个处理。例如有20个分区,那么整个集群集将由不超过20个服务器处理。
    • 分区的个数影响使用者的最大并发数。

名词解释

  • Broker 一个Kafka实例
  • 集群 多个Broker组成的一个Kafka集群
  • Topic 主题,一个逻辑的分组
  • Partition 分区,一个Topic中有多个分区,每个分区可以多个副本数(Replica),实现容错功能,每个分区都是有序的,不变的记录序列。每个分区都有一个充当领导者的服务器和多个充当跟随者的服务器。领导者处理分区的所有读写,跟随者只是被动的复制领导者,当领导者宕机了,其中一个跟随者会成为新的领导者。
  • 消息 Kafka中的数据单元,通过topic–>Partition–>offset可以找到在Kafka中唯一对应的一条消息。
  • 生产者 往Kafka中写入数据
  • 消费者 从Kafka里读取数据进行处理

保证

在较高级别上,Kafka提供以下保证:

  • 生产者发送到特定主题分区的消息将按其发送顺序附加。也就是说,如果记录M1是由与记录M2相同的生产者发送的,并且首先发送M1,则M1的偏移量将小于M2,并在日志中更早地出现。
  • 消费者实例按记录在日志中的存储顺序查看记录。
  • 对于复制因子为N的主题,我们最多可以容忍N-1个服务器故障,而不会丢失提交给日志的任何记录。

Kafka作为消息传递系统

传统上,消息传递具有两种模型:队列发布-订阅。在队列中,一组使用者可以从服务器读取数据,并且每条记录都将转到其中一个。在发布-订阅记录中广播给所有消费者。这两个模型中的每一个都有优点和缺点。排队的优势在于,它允许您将数据处理划分到多个使用者实例上,从而扩展处理量。不幸的是,队列不是多用户的—一次进程读取了丢失的数据。发布-订阅允许您将数据广播到多个进程,但是由于每条消息都传递给每个订阅者,因此无法扩展处理。

卡夫卡的消费群体概念概括了这两个概念。与队列一样,使用者组允许您将处理划分为一组进程(使用者组的成员)。与发布订阅一样,Kafka允许您将消息广播到多个消费者组。

Kafka模型的优势在于,每个主题都具有这些属性-可以扩展处理范围,并且是多订阅者-无需选择其中一个。

与传统的消息传递系统相比,Kafka还具有更强的订购保证。

传统队列将记录按顺序保留在服务器上,如果多个使用者从队列中消费,则服务器将按记录的存储顺序分发记录。但是,尽管服务器按顺序分发记录,但是这些记录是异步传递给使用者的,因此它们可能会在不同的使用者上无序到达。这实际上意味着在并行使用的情况下会丢失记录的顺序。消息传递系统通常通过具有“专用使用者”的概念来解决此问题,该概念仅允许一个进程从队列中使用,但是,这当然意味着在处理中没有并行性。

卡夫卡做得更好。通过在主题内具有并行性(即分区)的概念,Kafka能够在用户进程池中提供排序保证和负载均衡。这是通过将主题中的分区分配给消费者组中的消费者来实现的,以便每个分区都由组中的一个消费者完全消费。通过这样做,我们确保使用者是该分区的唯一读取器,并按顺序使用数据。由于存在许多分区,因此仍然可以平衡许多使用者实例上的负载。但是请注意,使用者组中的使用者实例不能超过分区。

Kafka用于流处理

仅读取,写入和存储数据流是不够的,目的是实现对流的实时处理。

在Kafka中,流处理器是指从输入主题中获取连续数据流,对该输入进行一些处理并生成连续数据流以输出主题的任何东西。

例如,零售应用程序可以接收销售和装运的输入流,并输出根据此数据计算出的重新订购和价格调整流。

可以直接使用生产者和消费者API进行简单处理。但是,对于更复杂的转换,Kafka提供了完全集成的Streams API。这允许构建执行非平凡处理的应用程序,这些应用程序计算流的聚合或将流连接在一起。

该功能有助于解决此类应用程序所面临的难题:处理无序数据,在代码更改时重新处理输入,执行状态计算等。

流API建立在Kafka提供的核心原语之上:它使用生产者和使用者API作为输入,使用Kafka进行状态存储,并使用相同的组机制来实现流处理器实例之间的容错。

拼凑在一起

消息传递,存储和流处理的这种组合看似不寻常,但这对于Kafka作为流平台的角色而言至关重要。

像HDFS这样的分布式文件系统允许存储静态文件以进行批处理。实际上,像这样的系统可以存储和处理过去的历史数据。

传统的企业消息传递系统允许处理将来的消息,这些消息将在您订阅后到达。以这种方式构建的应用程序会在将来的数据到达时对其进行处理。

Kafka结合了这两种功能,对于使用Kafka作为流应用程序平台和流数据管道平台来说,这种结合至关重要。

通过结合存储和低延迟订阅,流应用程序可以以相同的方式处理过去和将来的数据。那是一个单一的应用程序可以处理历史记录中存储的数据,而不是在到达最后一条记录时结束,而是可以在将来的数据到达时继续进行处理。这是流处理的通用概念,它包含批处理以及消息驱动的应用程序。

同样,对于流数据管道,对实时事件的订阅组合使得可以将Kafka用于非常低延迟的管道。但是可靠地存储数据的能力使其可以用于必须保证数据传输的关键数据,或与仅定期加载数据或可能停机很长时间进行维护的脱机系统集成。流处理设备使数据到达时可以进行转换。

有关Kafka提供的担保,API和功能的更多信息,请参阅本文档的其余部分。

安装

环境说明:

我这里还是采用一台机器,使用不同端口的方式来安装Kafka集群:

IP Port log.dir 说明
172.17.0.87 9092 /data/kafka-logs/kafka01 kafka01
172.17.0.87 9293 /data/kafka-logs/kafka02 kafka02
172.17.0.87 9094 /data/kafka-logs/kafka03 kafka03

下载:

$ wget -c "https://archive.apache.org/dist/kafka/2.0.0/kafka_2.12-2.0.0.tgz"
$ tar xf /opt/softs/kafka_2.12-2.0.0.tgz /data/knner
$ cd /data/knner
$ mv kafka_2.12-2.0.0 kafka_2.12-2.0.0_01
$ cp -a kafka_2.12-2.0.0_01 kafka_2.12-2.0.0_02
$ cp -a kafka_2.12-2.0.0_01 kafka_2.12-2.0.0_03
$ ln -s kafka_2.12-2.0.0_01 kafka01
$ ln -s kafka_2.12-2.0.0_02 kafka02
$ ln -s kafka_2.12-2.0.0_03 kafka03

查看一下目录结构:

$ ll kafka01/
total 52
drwxr-xr-x 3 ec2-user ec2-user 4096 Jul 24 2018 bin # 脚本文件目录
drwxr-xr-x 2 ec2-user ec2-user 4096 Jul 24 2018 config # 配置文件目录
drwxr-xr-x 2 ec2-user ec2-user 4096 Dec 5 17:40 libs # jar包目录
-rw-r--r-- 1 ec2-user ec2-user 28824 Jul 24 2018 LICENSE
-rw-r--r-- 1 ec2-user ec2-user 336 Jul 24 2018 NOTICE
drwxr-xr-x 2 ec2-user ec2-user 44 Jul 24 2018 site-docs # doc目录

$ tree kafka01
kafka01
├── bin
│   ├── connect-distributed.sh
│   ├── connect-standalone.sh
│   ├── kafka-acls.sh
│   ├── kafka-broker-api-versions.sh
│   ├── kafka-configs.sh
│   ├── kafka-console-consumer.sh
│   ├── kafka-console-producer.sh
│   ├── kafka-consumer-groups.sh
│   ├── kafka-consumer-perf-test.sh
│   ├── kafka-delegation-tokens.sh
│   ├── kafka-delete-records.sh
│   ├── kafka-dump-log.sh
│   ├── kafka-log-dirs.sh
│   ├── kafka-mirror-maker.sh
│   ├── kafka-preferred-replica-election.sh
│   ├── kafka-producer-perf-test.sh
│   ├── kafka-reassign-partitions.sh
│   ├── kafka-replica-verification.sh
│   ├── kafka-run-class.sh
│   ├── kafka-server-start.sh
│   ├── kafka-server-stop.sh
│   ├── kafka-streams-application-reset.sh
│   ├── kafka-topics.sh
│   ├── kafka-verifiable-consumer.sh
│   ├── kafka-verifiable-producer.sh
│   ├── trogdor.sh
│   ├── windows
│   │   ├── connect-distributed.bat
│   │   ├── connect-standalone.bat
│   │   ├── kafka-acls.bat
│   │   ├── kafka-broker-api-versions.bat
│   │   ├── kafka-configs.bat
│   │   ├── kafka-console-consumer.bat
│   │   ├── kafka-console-producer.bat
│   │   ├── kafka-consumer-groups.bat
│   │   ├── kafka-consumer-perf-test.bat
│   │   ├── kafka-delegation-tokens.bat
│   │   ├── kafka-dump-log.bat
│   │   ├── kafka-mirror-maker.bat
│   │   ├── kafka-preferred-replica-election.bat
│   │   ├── kafka-producer-perf-test.bat
│   │   ├── kafka-reassign-partitions.bat
│   │   ├── kafka-replica-verification.bat
│   │   ├── kafka-run-class.bat
│   │   ├── kafka-server-start.bat
│   │   ├── kafka-server-stop.bat
│   │   ├── kafka-topics.bat
│   │   ├── zookeeper-server-start.bat
│   │   ├── zookeeper-server-stop.bat
│   │   └── zookeeper-shell.bat
│   ├── zookeeper-security-migration.sh
│   ├── zookeeper-server-start.sh
│   ├── zookeeper-server-stop.sh
│   └── zookeeper-shell.sh
├── config
│   ├── connect-console-sink.properties
│   ├── connect-console-source.properties
│   ├── connect-distributed.properties
│   ├── connect-file-sink.properties
│   ├── connect-file-source.properties
│   ├── connect-log4j.properties
│   ├── connect-standalone.properties
│   ├── consumer.properties
│   ├── log4j.properties
│   ├── producer.properties
│   ├── server.properties
│   ├── tools-log4j.properties
│   ├── trogdor.conf
│   └── zookeeper.properties
├── libs
│   ├── activation-1.1.1.jar
│   ├── aopalliance-repackaged-2.5.0-b42.jar
│   ├── argparse4j-0.7.0.jar
│   ├── audience-annotations-0.5.0.jar
│   ├── commons-lang3-3.5.jar
│   ├── connect-api-2.0.0.jar
│   ├── connect-basic-auth-extension-2.0.0.jar
│   ├── connect-file-2.0.0.jar
│   ├── connect-json-2.0.0.jar
│   ├── connect-runtime-2.0.0.jar
│   ├── connect-transforms-2.0.0.jar
│   ├── guava-20.0.jar
│   ├── hk2-api-2.5.0-b42.jar
│   ├── hk2-locator-2.5.0-b42.jar
│   ├── hk2-utils-2.5.0-b42.jar
│   ├── jackson-annotations-2.9.6.jar
│   ├── jackson-core-2.9.6.jar
│   ├── jackson-databind-2.9.6.jar
│   ├── jackson-jaxrs-base-2.9.6.jar
│   ├── jackson-jaxrs-json-provider-2.9.6.jar
│   ├── jackson-module-jaxb-annotations-2.9.6.jar
│   ├── javassist-3.22.0-CR2.jar
│   ├── javax.annotation-api-1.2.jar
│   ├── javax.inject-1.jar
│   ├── javax.inject-2.5.0-b42.jar
│   ├── javax.servlet-api-3.1.0.jar
│   ├── javax.ws.rs-api-2.1.jar
│   ├── jaxb-api-2.3.0.jar
│   ├── jersey-client-2.27.jar
│   ├── jersey-common-2.27.jar
│   ├── jersey-container-servlet-2.27.jar
│   ├── jersey-container-servlet-core-2.27.jar
│   ├── jersey-hk2-2.27.jar
│   ├── jersey-media-jaxb-2.27.jar
│   ├── jersey-server-2.27.jar
│   ├── jetty-client-9.4.11.v20180605.jar
│   ├── jetty-continuation-9.4.11.v20180605.jar
│   ├── jetty-http-9.4.11.v20180605.jar
│   ├── jetty-io-9.4.11.v20180605.jar
│   ├── jetty-security-9.4.11.v20180605.jar
│   ├── jetty-server-9.4.11.v20180605.jar
│   ├── jetty-servlet-9.4.11.v20180605.jar
│   ├── jetty-servlets-9.4.11.v20180605.jar
│   ├── jetty-util-9.4.11.v20180605.jar
│   ├── jopt-simple-5.0.4.jar
│   ├── kafka_2.12-2.0.0.jar
│   ├── kafka_2.12-2.0.0.jar.asc
│   ├── kafka_2.12-2.0.0-javadoc.jar
│   ├── kafka_2.12-2.0.0-javadoc.jar.asc
│   ├── kafka_2.12-2.0.0-scaladoc.jar
│   ├── kafka_2.12-2.0.0-scaladoc.jar.asc
│   ├── kafka_2.12-2.0.0-sources.jar
│   ├── kafka_2.12-2.0.0-sources.jar.asc
│   ├── kafka_2.12-2.0.0-test.jar
│   ├── kafka_2.12-2.0.0-test.jar.asc
│   ├── kafka_2.12-2.0.0-test-sources.jar
│   ├── kafka_2.12-2.0.0-test-sources.jar.asc
│   ├── kafka-clients-2.0.0.jar
│   ├── kafka-log4j-appender-2.0.0.jar
│   ├── kafka-streams-2.0.0.jar
│   ├── kafka-streams-examples-2.0.0.jar
│   ├── kafka-streams-scala_2.12-2.0.0.jar
│   ├── kafka-streams-test-utils-2.0.0.jar
│   ├── kafka-tools-2.0.0.jar
│   ├── log4j-1.2.17.jar
│   ├── lz4-java-1.4.1.jar
│   ├── maven-artifact-3.5.3.jar
│   ├── metrics-core-2.2.0.jar
│   ├── osgi-resource-locator-1.0.1.jar
│   ├── plexus-utils-3.1.0.jar
│   ├── reflections-0.9.11.jar
│   ├── rocksdbjni-5.7.3.jar
│   ├── scala-library-2.12.6.jar
│   ├── scala-logging_2.12-3.9.0.jar
│   ├── scala-reflect-2.12.6.jar
│   ├── slf4j-api-1.7.25.jar
│   ├── slf4j-log4j12-1.7.25.jar
│   ├── snappy-java-1.1.7.1.jar
│   ├── validation-api-1.1.0.Final.jar
│   ├── zkclient-0.10.jar
│   └── zookeeper-3.4.13.jar
├── LICENSE
├── NOTICE
└── site-docs
└── kafka_2.12-2.0.0-site-docs.tgz

5 directories, 151 files

Kafka 配置详解

参考:

https://kafka.apache.org/documentation/#brokerconfigs Broker配置

https://kafka.apache.org/documentation/#topicconfigs topic配置

https://kafka.apache.org/documentation/#producerconfigs 生产者配置

https://kafka.apache.org/documentation/#consumerconfigs 消费者配置

https://kafka.apache.org/documentation/#connectconfigs kafka connect配置

https://kafka.apache.org/documentation/#sourceconnectconfigs source connector配置

https://kafka.apache.org/documentation/#sinkconnectconfigs sink connector配置

https://kafka.apache.org/documentation/#streamsconfigs streams 配置

https://kafka.apache.org/documentation/#adminclientconfigs adminClient配置

下面列出常用的配置以及解释:

  • broker.id Broker ID,Kafka集群中必须唯一。

  • listeners Broker 监听的配置:非加密的用:PLAINTEXT://IP:PORT,加密的用SSL://IP:PORT

  • advertised.listeners 宣告的地址,也是向zk注册的地址,此地址用于生产者和消费者,格式同listeners

  • num.network.threads 用于从网络上接收发送的线程数

  • num.io.threads 处理请求,包括磁盘I/O的线程数

  • socket.send.buffer.bytes 发送的buffer,如果值为-1,使用系统默认值

  • socket.receive.buffer.bytes 接收的buffer,如果值为-1,使用系统默认值

  • socket.request.max.bytes 接收的最大字节数

  • log.dirs log存放的位置

  • num.partitions 默认的每个topic中partitions数量,默认是1

  • default.replication.factor 默认topic中的副本数,默认是1,这个更改成2。

  • log.retention.hours 日志最大保存的小时,过期的将被删除,这里设置成48h

  • zookeeper.connect zookeeper的地址,IP:PORT,多个zk用逗号隔开

  • message.max.bytes 默认消息的大小,增大到5M

  • replica.fetch.max.bytes 取消息的最大大小,默认是1M,增加到5M

  • auto.create.topics.enable 是否可以自动创建topoc

  • controlled.shutdown.enable 可以用于优雅停服,注意default.replication.factor的值需要大于1.

  • broker.rack 配置Broker机架感知,可将同一分区的副本分布在不同机架上,扩展了Kafka为代理故障提供的保证,以涵盖机架故障,从而限制了机架上所有代理立即发生故障时数据丢失的风险。

  • auto.leader.rebalance.enable 自动分区领导者平衡,每当代理停止或崩溃时,该代理的分区就会转移到其他副本。这意味着默认情况下,重新启动代理时,它将仅是其所有分区的关注者,这意味着它将不用于客户端读取和写入。

    为了避免这种不平衡,Kafka提出了首选副本的概念。如果分区的副本列表为1,5,9,则首选节点1作为节点5或9的引导者,因为它在副本列表中较早。您可以让Kafka集群尝试通过运行以下命令来恢复对已还原副本的领导权:

    bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port

下面是Kafka集群三个节点的配置:因为我这里都是在一台机器上,所以其中的端口和log路径不能相同,如果你是在不同机器上的节点,配置文件尽可能的保持一样。

Kafka01 配置:

$ cat kafka01/config/server.properties |grep -vE "^#|^$"
broker.id=0
listeners=PLAINTEXT://172.17.0.87:9092
advertised.listeners=PLAINTEXT://172.17.0.87:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs/kafka01
num.partitions=1
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk01:2181,zk02:2182,zk03:2183
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
message.max.bytes=5242880
replica.fetch.max.bytes=5242880
auto.create.topics.enable=false
controlled.shutdown.enable=true
broker.rack=cn-north-1a
auto.leader.rebalance.enable=true

Kafka02 配置:

$ cat kafka02/config/server.properties |grep -vE "^#|^$"
broker.id=1
listeners=PLAINTEXT://172.17.0.87:9093
advertised.listeners=PLAINTEXT://172.17.0.87:9093
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs/kafka02
num.partitions=1
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk01:2181,zk02:2182,zk03:2183
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
message.max.bytes=5242880
replica.fetch.max.bytes=5242880
auto.create.topics.enable=false
controlled.shutdown.enable=true
broker.rack=cn-north-1a
auto.leader.rebalance.enable=true

Kafka03 配置:

$ cat kafka03/config/server.properties |grep -vE "^#|^$"
broker.id=2
listeners=PLAINTEXT://172.17.0.87:9094
advertised.listeners=PLAINTEXT://172.17.0.87:9094
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs/kafka03
num.partitions=1
default.replication.factor=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk01:2181,zk02:2182,zk03:2183
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
message.max.bytes=5242880
replica.fetch.max.bytes=5242880
auto.create.topics.enable=false
controlled.shutdown.enable=true
broker.rack=cn-north-1a
auto.leader.rebalance.enable=true

创建log存储目录:

Kafka 在启动的时候也会自动创建。

$ mkdir /data/kafka-logs/kafka0{1..3} -pv
mkdir: created directory ‘/data/kafka-logs/kafka01’
mkdir: created directory ‘/data/kafka-logs/kafka02’
mkdir: created directory ‘/data/kafka-logs/kafka03’

启动

查看启动脚本帮助:kafka-server-start.sh

$ ./kafka01/bin/kafka-server-start.sh
USAGE: ./kafka01/bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*

启动三台Kafka:

$ ./kafka01/bin/kafka-server-start.sh -daemon kafka01/config/server.properties
$ ./kafka02/bin/kafka-server-start.sh -daemon kafka02/config/server.properties
$ ./kafka03/bin/kafka-server-start.sh -daemon kafka03/config/server.properties

检查进行,以及监听端口:

$ ps -ef|grep kafka01
ec2-user 19933 1 3 13:21 pts/8 00:00:04 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Xloggc:/data/knner/kafka01/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/data/knner/kafka01/bin/../logs -Dlog4j.configuration=file:./kafka01/bin/../config/log4j.properties -cp /data/knner/kafka01/bin/../libs/activation-1.1.1.jar:/data/knner/kafka01/bin/../libs/aopalliance-repackaged-2.5.0-b42.jar:/data/knner/kafka01/bin/../libs/argparse4j-0.7.0.jar:/data/knner/kafka01/bin/../libs/audience-annotations-0.5.0.jar:/data/knner/kafka01/bin/../libs/commons-lang3-3.5.jar:/data/knner/kafka01/bin/../libs/connect-api-2.0.0.jar:/data/knner/kafka01/bin/../libs/connect-basic-auth-extension-2.0.0.jar:/data/knner/kafka01/bin/../libs/connect-file-2.0.0.jar:/data/knner/kafka01/bin/../libs/connect-json-2.0.0.jar:/data/knner/kafka01/bin/../libs/connect-runtime-2.0.0.jar:/data/knner/kafka01/bin/../libs/connect-transforms-2.0.0.jar:/data/knner/kafka01/bin/../libs/guava-20.0.jar:/data/knner/kafka01/bin/../libs/hk2-api-2.5.0-b42.jar:/data/knner/kafka01/bin/../libs/hk2-locator-2.5.0-b42.jar:/data/knner/kafka01/bin/../libs/hk2-utils-2.5.0-b42.jar:/data/knner/kafka01/bin/../libs/jackson-annotations-2.9.6.jar:/data/knner/kafka01/bin/../libs/jackson-core-2.9.6.jar:/data/knner/kafka01/bin/../libs/jackson-databind-2.9.6.jar:/data/knner/kafka01/bin/../libs/jackson-jaxrs-base-2.9.6.jar:/data/knner/kafka01/bin/../libs/jackson-jaxrs-json-provider-2.9.6.jar:/data/knner/kafka01/bin/../libs/jackson-module-jaxb-annotations-2.9.6.jar:/data/knner/kafka01/bin/../libs/javassist-3.22.0-CR2.jar:/data/knner/kafka01/bin/../libs/javax.annotation-api-1.2.jar:/data/knner/kafka01/bin/../libs/javax.inject-1.jar:/data/knner/kafka01/bin/../libs/javax.inject-2.5.0-b42.jar:/data/knner/kafka01/bin/../libs/javax.servlet-api-3.1.0.jar:/data/knner/kafka01/bin/../libs/javax.ws.rs-api-2.1.jar:/data/knner/kafka01/bin/../libs/jaxb-api-2.3.0.jar:/data/knner/kafka01/bin/../libs/jersey-client-2.27.jar:/data/knner/kafka01/bin/../libs/jersey-common-2.27.jar:/data/knner/kafka01/bin/../libs/jersey-container-servlet-2.27.jar:/data/knner/kafka01/bin/../libs/jersey-container-servlet-core-2.27.jar:/data/knner/kafka01/bin/../libs/jersey-hk2-2.27.jar:/data/knner/kafka0/bin/../libs/jersey-media-jaxb-2.27.jar:/data/knner/kafka01/bin/../libs/jersey-server-2.27.jar:/data/knner/kafka01/bin/../libs/jetty-client-9.4.11.v20180605.jar:/data/knner/kafka01/bin/../libs/jetty-continuation-9.4.11.v20180605.jar:/data/knner/kafka01/bin/../libs/jetty-http-9.4.11.v20180605.jar:/data/knner/kafka01/bin/../libs/jetty-io-9.4.11.v20180605.jar:/data/knner/kafka01/bin/../libs/jetty-security-9.4.11.v20180605.jar:/data/knner/kafka01/bin/../libs/jetty-server-9.4.11.v20180605.jar:/data/knner/kafka01/bin/../libs/jetty-servlet-9.4.11.v20180605.jar:/data/knner/kafka01/bin/../libs/jetty-servlets-9.4.11.v20180605.jar:/data/knner/kafka01/bin/../libs/jetty-util-9.4.11.v20180605.jar:/data/knner/kafka01/bin/../libs/jopt-simple-5.0.4.jar:/data/knner/kafka01/bin/../libs/kafka_2.12-2.0.0.jar:/data/knner/kafka01/bin/../libs/kafka_2.12-2.0.0-sources.jar:/data/knner/kafka01/bin/../libs/kafka-clients-2.0.0.jar:/data/knner/kafka01/bin/../libs/kafka-log4j-appender-2.0.0.jar:/data/knner/kafka01/bin/../libs/kafka-streams-2.0.0.jar:/data/knner/kafka01/bin/../libs/kafka-streams-examples-2.0.0.jar:/data/knner/kafka01/bin/../libs/kafka-streams-scala_2.12-2.0.0.jar:/data/knner/kafka01/bin/../libs/kafka-streams-test-utils-2.0.0.jar:/data/knner/kafka01/bin/../libs/kafka-tools-2.0.0.jar:/data/knner/kafka01/bin/../libs/log4j-1.2.17.jar:/data/knner/kafka01/bin/../libs/lz4-java-1.4.1.jar:/data/knner/kafka01/bin/../libs/maven-artifact-3.5.3.jar:/data/knner/kafka01/bin/../libs/metrics-core-2.2.0.jar:/data/knner/kafka01/bin/../libs/osgi-resource-locator-1.0.1.jar:/data/knner/kafka01/bin/../libs/plexus-utils-3.1.0.jar:/data/knner/kafka01/bin/../libs/reflections-0.9.11.jar:/data/knner/kafka01/bin/../libs/rocksdbjni-5.7.3.jar:/data/knner/kafka01/bin/../libs/scala-library-2.12.6.jar:/data/knner/kafka01/bin/../libs/scala-logging_2.12-3.9.0.jar:/data/knner/kafka01/bin/../libs/scala-reflect-2.12.6.jar:/data/knner/kafka01/bin/../libs/slf4j-api-1.7.25.jar:/data/knner/kafka01/bin/../libs/slf4j-log4j12-1.7.25.jar:/data/knner/kafka01/bin/../libs/snappy-java-1.1.7.1.jar:/data/knner/kafka01/bin/../libs/validation-api-1.1.0.Final.jar:/data/knner/kafka01/bin/../libs/zkclient-0.10.jar:/data/knner/kafka01/bin/../libs/zookeeper-3.4.13.jar kafka.Kafka kafka01/config/server.properties

$ netstat -lnutp|grep 19933
tcp6 0 0 :::35773 :::* LISTEN 19933/java
tcp6 0 0 172.17.0.87:9092 :::* LISTEN 19933/java

我们看到默认的Kafka JVM设置是1G堆内存:

修改堆内存

修改启动脚本:bin/kafka-server-start.sh

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

可以通过环境变量的方式:KAFKA_HEAP_OPTS,也可以直接修改它的默认值。

检查Zookeeper中Kafka的注册情况:

$ ./zk01/bin/zkCli.sh -server 172.17.0.87:2181
[zk: 172.17.0.87:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: 172.17.0.87:2181(CONNECTED) 1] ls /brokers
[ids, seqid, topics]
[zk: 172.17.0.87:2181(CONNECTED) 2] ls /brokers/ids
[0, 1, 2]
[zk: 172.17.0.87:2181(CONNECTED) 3] ls /brokers/ids/0
[]

# 看到有三台Broker Kafka节点已经注册了
[zk: 172.17.0.87:2181(CONNECTED) 4] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://172.17.0.87:9092"],"jmx_port":-1,"host":"172.17.0.87","timestamp":"1575868871067","port":9092,"version":4}
[zk: 172.17.0.87:2181(CONNECTED) 5] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://172.17.0.87:9093"],"jmx_port":-1,"host":"172.17.0.87","timestamp":"1575868952567","port":9093,"version":4}
[zk: 172.17.0.87:2181(CONNECTED) 7] get /brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://172.17.0.87:9094"],"jmx_port":-1,"host":"172.17.0.87","timestamp":"1575868974088","port":9094,"version":4}

[zk: 172.17.0.87:2181(CONNECTED) 17] get /cluster/id
{"version":"1","id":"gduCYgbZSGG8eRfNF4hEMw"}

[zk: 172.17.0.87:2181(CONNECTED) 25] get /controller
{"version":1,"brokerid":0,"timestamp":"1575868871176"}

优雅关机

Kafka群集将自动检测任何代理关闭或故障,并为该计算机上的分区选择新的领导者。无论服务器发生故障还是为了维护或配置更改而有意将其关闭,都会发生这种情况。对于后一种情况,Kafka支持一种更优雅的机制来停止服务器,而不仅仅是杀死服务器。当服务器正常停止时,它会进行两项优化:

  1. 它将所有日志同步到磁盘上,以避免在重新启动时进行任何日志恢复(即,验证日志尾部所有消息的校验和)。日志恢复需要时间,因此可以加快有意重启的速度。
  2. 它将在关闭服务器之前将服务器领导的所有分区迁移到其他副本。这将使领导层转移更快,并将每个分区不可用的时间减少到几毫秒。

只要服务器停止运行(不是通过强行终止),就会自动进行日志同步,但是受控的领导者迁移需要使用特殊设置:

controlled.shutdown.enable=true

请注意,只有在代理上托管的所有分区都具有副本时(即复制因子大于1 并且这些副本中至少有一个处于活动状态),受控关闭才会成功。这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。

Topic的增删改查

参考:https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools replication工具使用

https://kafka.apache.org/documentation/#topicconfigs topic配置

命令帮助:kafka-topics.sh

$ ./kafka01/bin/kafka-topics.sh 
Create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions, # 修改
replica assignment, and/or
configuration for the topic.
--config <String: name=value> A topic configuration override for the
topic being created or altered.The
following is a list of valid
configurations: # 添加或修改topic的可用配置
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full
details on the topic configs.
--create Create a new topic. # 创建topic
--delete Delete a topic # 删除topic
--delete-config <String: name> A topic configuration override to be
removed for an existing topic (see
the list of configurations under the
--config option). # 清除已有topic的配置
--describe List details for the given topics. # 查看topic 信息
--disable-rack-aware Disable rack aware replica assignment
--force Suppress console prompts
--help Print usage information.
--if-exists if set when altering or deleting
topics, the action will only execute
if the topic exists
--if-not-exists if set when creating topics, the
action will only execute if the
topic does not already exist
--list List all available topics. # 查看topic列表
--partitions <Integer: # of partitions> The number of partitions for the topic # 指定分区数量,在创建或者修改的时候
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected
--replica-assignment <String: A list of manual partition-to-broker # replica手动分配replica和Broker之间的映射关系,在创建修改的时候
broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each # 指定副本数,创建时
replication factor> partition in the topic being created.
--topic <String: topic> The topic to be create, alter or # topic 名称
describe. Can also accept a regular
expression except for --create option
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--zookeeper <String: hosts> REQUIRED: The connection string for # 指定zookeeper地址
the zookeeper connection in the form
host:port. Multiple hosts can be
given to allow fail-over.

在新版本的Kafka中,不需要指定–zookeeper地址了,需要指定kafka的地址即可:–bootstrap-server localhost:9092

Topic 的创建

指定–replication-factor 复制因子,不指定的话,使用配置文件中指定的默认值(default.replication.factor=1)

指定–partitions,指定这个topic 有多少个分区,不指定的话,使用配置文件中指定的默认值(num.partitions=1)

这里的Partitions和Broker的对应关系是Kafka自动分配的:

$ ./kafka01/bin/kafka-topics.sh --create --zookeeper 172.17.0.87:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

Topic 的查看

$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --list 
test

$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --describe --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
4Topic: test Partition: 0 Leader: 2 Replicas: 2 Isr: 2

说明:

第一行是汇总信息,后面的每一行代表一个Partition的信息,多个partition会有多行:

  • Topic:test 指明Topic的名称为test
  • PartitionCount:1 指明这个topic的分区数为1
  • ReplicationFactor:1 指明这个topic的复制因子为1,就是复制数为1
  • Configs: 下面是configs,每一行代表一个Partition,我这里的partition是1,所以只有一行
    • Partition: 0 编号0的partition信息:
      • Leader:2 该Partition的Leader在Broker.id为2的节点上,往后对于该partition都由该节点负责。
      • Replicas:2 代表该partition所在node的node id列表,我这里只有一个副本,在Broker.id为2的节点上。
      • Isr: 2 同步的副本集合。他是Replicas 列表的子集,它当前是活动的,并被提交给领导者。

创建3个副本,4个partition的topic,然后再次查看一下该topic的详细信息:

$ ./kafka01/bin/kafka-topics.sh --create --zookeeper 172.17.0.87:2181 --replication-factor 3 --partitions 4 --topic my-replicated-topic
Created topic "my-replicated-topic".

$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --describe --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:4 ReplicationFactor:3 Configs:
4Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
4Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
4Topic: my-replicated-topic Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
4Topic: my-replicated-topic Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0

查看一下kafka的log目录:

$ pwd
/data/kafka-logs/kafka01
$ ll
total 20
-rw-rw-r-- 1 ec2-user ec2-user 4 Dec 9 16:18 cleaner-offset-checkpoint
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-0
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-12
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-15
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-18
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-21
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-24
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-27
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-3
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-30
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-33
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-36
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-39
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-42
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-45
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-48
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:59 __consumer_offsets-6
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 16:51 __consumer_offsets-9
-rw-rw-r-- 1 ec2-user ec2-user 4 Dec 10 10:10 log-start-offset-checkpoint
-rw-rw-r-- 1 ec2-user ec2-user 54 Dec 9 13:21 meta.properties
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 15:41 my-replicated-topic-0 格式:TopicName-PartitionID
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 15:41 my-replicated-topic-1
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 15:41 my-replicated-topic-2
drwxrwxr-x 2 ec2-user ec2-user 141 Dec 9 15:41 my-replicated-topic-3
-rw-rw-r-- 1 ec2-user ec2-user 505 Dec 10 10:10 recovery-point-offset-checkpoint
-rw-rw-r-- 1 ec2-user ec2-user 508 Dec 10 10:11 replication-offset-checkpoint

$ ll my-replicated-topic-0/
total 20480
-rw-rw-r-- 1 ec2-user ec2-user 10485760 Dec 9 15:41 00000000000000000000.index
-rw-rw-r-- 1 ec2-user ec2-user 0 Dec 9 15:41 00000000000000000000.log
-rw-rw-r-- 1 ec2-user ec2-user 10485756 Dec 9 15:41 00000000000000000000.timeindex
-rw-rw-r-- 1 ec2-user ec2-user 0 Dec 9 15:41 leader-epoch-checkpoint

Topic 的创建,手动指定Partitions和Broker的对应关系:

$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --create --topic my-replicated-topic-manual --replica-assignment 0:1:2,1:2:0,2:0:1
Created topic "my-replicated-topic-manual".

$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --describe --topic my-replicated-topic-manual
Topic:my-replicated-topic-manual PartitionCount:3 ReplicationFactor:3 Configs:
4Topic: my-replicated-topic-manual Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
4Topic: my-replicated-topic-manual Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
4Topic: my-replicated-topic-manual Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1

手动replica分配,也就是replica和broker的对应关系

0:1:2,1:2:0,2:0:1

  • 所有的数字都代表broker的ID
  • 表示三个partition,用逗号分割的
  • 每个partition有三个副本replica,用冒号分割的
  • 所以partition和broker的对应关系如下:
    • partition 0:id0,id1,id2
    • partition 1:id1,id2,id0
    • partition 2:id2,id0,id1

Topic 的修改

注意:这里的修改只能修改Topic的partition数量(只能增加,不能减少)

$  ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --describe --topic my-replicated-topic-manual 
Topic:my-replicated-topic-manual PartitionCount:3 ReplicationFactor:3 Configs:
4Topic: my-replicated-topic-manual Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
4Topic: my-replicated-topic-manual Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
4Topic: my-replicated-topic-manual Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1

方式1:只需要指定--partitions 的数量即可:

$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --alter --topic my-replicated-topic-manual --partitions 4
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --describe --topic my-replicated-topic-manual
Topic:my-replicated-topic-manual PartitionCount:4 ReplicationFactor:3 Configs:
4Topic: my-replicated-topic-manual Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
4Topic: my-replicated-topic-manual Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
4Topic: my-replicated-topic-manual Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
4Topic: my-replicated-topic-manual Partition: 3 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1

当减少partitions数量时会报错:

$ ./bin/kafka-topics.sh --zookeeper zk01:2181 --describe --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:4 ReplicationFactor:3 Configs:
4Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
4Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
4Topic: my-replicated-topic Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
4Topic: my-replicated-topic Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0

$ ./bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic my-replicated-topic --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased. Topic my-replicated-topic currently has 4 partitions, 2 would not be an increase.
[2019-12-10 10:22:31,772] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic my-replicated-topic currently has 4 partitions, 2 would not be an increase.
(kafka.admin.TopicCommand$)

方式2:手工指定replica的分配:

$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --describe --topic my-replicated-topic-manual 
Topic:my-replicated-topic-manual PartitionCount:4 ReplicationFactor:3 Configs:
4Topic: my-replicated-topic-manual Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
4Topic: my-replicated-topic-manual Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
4Topic: my-replicated-topic-manual Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
4Topic: my-replicated-topic-manual Partition: 3 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1

$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --alter --topic my-replicated-topic-manual --partitions 5 --replica-assignment 0:1:2,1:2:0,2:0:1,2:1:0,0:2:1
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --describe --topic my-replicated-topic-manual
Topic:my-replicated-topic-manual PartitionCount:5 ReplicationFactor:3 Configs:
4Topic: my-replicated-topic-manual Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
4Topic: my-replicated-topic-manual Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
4Topic: my-replicated-topic-manual Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
4Topic: my-replicated-topic-manual Partition: 3 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
4Topic: my-replicated-topic-manual Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1

对已已经分配的partition,你又手工分配的话,是不生效的:

如上面,我通过alter命令更改了--partitions 4,Partition: 3 已经分配到了0:2:1上了

我又手工指定replica 的分配--partitions 5 --replica-assignment 0:1:2,1:2:0,2:0:1,2:1:0,0:2:1 这里的Partition: 3应该是我手动指定的:2:1:0,但是实际还是之前分配好的:0:2:1

Topic 的删除

$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --delete --topic my-replicated-topic-manual
Topic my-replicated-topic-manual is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --list
my-replicated-topic
test

Topic 的配置的增删改查

所有的Topic配置请参考:

https://kafka.apache.org/documentation/#topicconfigs

当然在创建Topic的时候可以通过参数:--config 来增加Topic的配置。

对于Kafka 实体配置(Topic,Client,User,Broker)的配置,同一使用命令行工具kafka-configs.sh 帮助如下:

$ ./bin/kafka-configs.sh 
Add/Remove entity config for a topic, client, user or broker
Option Description
------ -----------
--add-config <String> Key Value pairs of configs to add. # 增加配置
Square brackets can be used to group
values which contain commas: 'k1=v1,
k2=[v1,v2,v2],k3=v3'. The following
is a list of valid configurations:
For entity-type 'topics': # topic配置
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
For entity-type 'brokers': # brokers 配置
log.message.timestamp.type
ssl.client.auth
log.retention.ms
sasl.login.refresh.window.jitter
sasl.kerberos.ticket.renew.window.
factor
log.preallocate
log.index.size.max.bytes
sasl.login.refresh.window.factor
ssl.truststore.type
ssl.keymanager.algorithm
log.cleaner.io.buffer.load.factor
sasl.login.refresh.min.period.seconds
ssl.key.password
background.threads
log.retention.bytes
ssl.trustmanager.algorithm
log.segment.bytes
log.cleaner.delete.retention.ms
log.segment.delete.delay.ms
min.insync.replicas
ssl.keystore.location
ssl.cipher.suites
log.roll.jitter.ms
log.cleaner.backoff.ms
sasl.jaas.config
principal.builder.class
log.flush.interval.ms
log.cleaner.dedupe.buffer.size
log.flush.interval.messages
advertised.listeners
num.io.threads
listener.security.protocol.map
log.message.downconversion.enable
sasl.enabled.mechanisms
sasl.login.refresh.buffer.seconds
ssl.truststore.password
listeners
metric.reporters
ssl.protocol
sasl.kerberos.ticket.renew.jitter
ssl.keystore.password
sasl.mechanism.inter.broker.protocol
log.cleanup.policy
sasl.kerberos.principal.to.local.rules
sasl.kerberos.min.time.before.relogin
num.recovery.threads.per.data.dir
log.cleaner.io.max.bytes.per.second
log.roll.ms
ssl.endpoint.identification.algorithm
unclean.leader.election.enable
message.max.bytes
log.cleaner.threads
log.cleaner.io.buffer.size
sasl.kerberos.service.name
ssl.provider
follower.replication.throttled.rate
log.index.interval.bytes
log.cleaner.min.compaction.lag.ms
log.message.timestamp.difference.max.
ms
ssl.enabled.protocols
log.cleaner.min.cleanable.ratio
replica.alter.log.dirs.io.max.bytes.
per.second
ssl.keystore.type
ssl.secure.random.implementation
ssl.truststore.location
sasl.kerberos.kinit.cmd
leader.replication.throttled.rate
num.network.threads
compression.type
num.replica.fetchers
For entity-type 'users': # users配置
request_percentage
producer_byte_rate
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate
For entity-type 'clients': # clients配置
request_percentage
producer_byte_rate
consumer_byte_rate
Entity types 'users' and 'clients' may
be specified together to update
config for clients of a specific
user.
--alter Alter the configuration for the entity. # 修改
--bootstrap-server <String: server to The Kafka server to connect to. This
connect to> is required for describing and
altering broker configs.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--delete-config <String> config keys to remove 'k1,k2' # 删除配置
--describe List configs for the given entity. # 查看配置
--entity-default Default entity name for
clients/users/brokers (applies to
corresponding entity type in command
line)
--entity-name <String> Name of entity (topic name/client # 实体名称
id/user principal name/broker id)
--entity-type <String> Type of entity # 实体类型
(topics/clients/users/brokers)
--force Suppress console prompts
--help Print usage information.
--zookeeper <String: urls> REQUIRED: The connection string for # zk地址
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.

列出Topic的配置:

以上面创建的Topic:my-replicated-topic 为例:

$ ./bin/kafka-configs.sh --zookeeper zk01:2181 --entity-type topics --entity-name my-replicated-topic --describe
Configs for topic 'my-replicated-topic' are

# 默认没有配置

增加Topic的配置:

参考:https://kafka.apache.org/documentation/#topicconfigs

NAME DESCRIPTION TYPE DEFAULT VALID VALUES SERVER DEFAULT PROPERTY IMPORTANCE
cleanup.policy A string that is either “delete” or “compact” or both. This string designates the retention policy to use on old log segments. The default policy (“delete”) will discard old segments when their retention time or size limit has been reached. The “compact” setting will enable log compaction on the topic. list delete [compact, delete] log.cleanup.policy medium
compression.type Specify the final compression type for a given topic. This configuration accepts the standard compression codecs (‘gzip’, ‘snappy’, ‘lz4’, ‘zstd’). It additionally accepts ‘uncompressed’ which is equivalent to no compression; and ‘producer’ which means retain the original compression codec set by the producer. string producer [uncompressed, zstd, lz4, snappy, gzip, producer] compression.type medium
 # 增加配置:cleanup.policy=compact
$ ./bin/kafka-configs.sh --zookeeper zk01:2181 --entity-type topics --entity-name my-replicated-topic --alter --add-config cleanup.policy=compact
Completed Updating config for entity: topic 'my-replicated-topic'.
# 查看
$ ./bin/kafka-configs.sh --zookeeper zk01:2181 --entity-type topics --entity-name my-replicated-topic --describe
Configs for topic 'my-replicated-topic' are cleanup.policy=compact
# 增加配置:compression.type=gzip
$ ./bin/kafka-configs.sh --zookeeper zk01:2181 --entity-type topics --entity-name my-replicated-topic --alter --add-config compression.type=gzip
Completed Updating config for entity: topic 'my-replicated-topic'.
# 查看
$ ./bin/kafka-configs.sh --zookeeper zk01:2181 --entity-type topics --entity-name my-replicated-topic --describe
Configs for topic 'my-replicated-topic' are cleanup.policy=compact,compression.type=gzip

修改Topic的现有配置

# 修改:cleanup.policy=delete
$ ./bin/kafka-configs.sh --zookeeper zk01:2181 --entity-type topics --entity-name my-replicated-topic --alter --add-config cleanup.policy=delete
Completed Updating config for entity: topic 'my-replicated-topic'.
# 查看
$ ./bin/kafka-configs.sh --zookeeper zk01:2181 --entity-type topics --entity-name my-replicated-topic --describe
Configs for topic 'my-replicated-topic' are compression.type=gzip,cleanup.policy=delete

删除Topic配置

# 注意,删除配置时,只需要指定配置的KEY即可,不需要也不能指定KEY=VALUE
$ ./bin/kafka-configs.sh --zookeeper zk01:2181 --entity-type topics --entity-name my-replicated-topic --alter --delete-config compression.type=gzip
Invalid config(s): compression.type=gzip # 报错了

$ ./bin/kafka-configs.sh --zookeeper zk01:2181 --entity-type topics --entity-name my-replicated-topic --alter --delete-config compression.type
Completed Updating config for entity: topic 'my-replicated-topic'.

$ ./bin/kafka-configs.sh --zookeeper zk01:2181 --entity-type topics --entity-name my-replicated-topic --describe
Configs for topic 'my-replicated-topic' are cleanup.policy=delete

Kafka 的生产者和消费者测试

我们使用上面已经创建好的Topic:test

首先查看kafka-console-consumer.sh脚本的帮助:

$ ./kafka01/bin/kafka-console-consumer.sh 
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
Option Description
------ -----------
--bootstrap-server <String: server to REQUIRED: The server(s) to connect to.
connect to> # 指定kafka的地址
--consumer-property <String: A mechanism to pass user-defined
consumer_prop> properties in the form key=value to
the consumer.
--consumer.config <String: config file> Consumer config properties file. Note
that [consumer-property] takes
precedence over this config.
--enable-systest-events Log lifecycle events of the consumer
in addition to logging consumed
messages. (This is specific for
system tests.)
--formatter <String: class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
--from-beginning If the consumer does not already have # 从开头开始消费
an established offset to consume
from, start with the earliest
message present in the log rather
than the latest message.
--group <String: consumer group id> The consumer group id of the consumer. # 指定消费者组
--isolation-level <String> Set to read_committed in order to
filter out transactional messages
which are not committed. Set to
read_uncommittedto read all
messages. (default: read_uncommitted)
--key-deserializer <String:
deserializer for key>
--max-messages <Integer: num_messages> The maximum number of messages to
consume before exiting. If not set,
consumption is continual.
--offset <String: consume offset> The offset id to consume from (a non-
negative number), or 'earliest'
which means from beginning, or
'latest' which means from end
(default: latest)
--partition <Integer: partition> The partition to consume from.
Consumption starts from the end of
the partition unless '--offset' is
specified.
--property <String: prop> The properties to initialize the
message formatter. Default
properties include:
print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.
deserializer>
Users can also pass in customized
properties for their formatter; more
specifically, users can pass in
properties keyed with 'key.
deserializer.' and 'value.
deserializer.' prefixes to configure
their deserializers.
--skip-message-on-error If there is an error when processing a
message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms> If specified, exit if no message is
available for consumption for the
specified interval.
--topic <String: topic> The topic id to consume on.
--value-deserializer <String:
deserializer for values>
--whitelist <String: whitelist> Whitelist of topics to include for
consumption.

kafka-console-producer.sh 脚本帮助:

$ ./kafka01/bin/kafka-console-producer.sh 
Read data from standard input and publish it to Kafka.
Option Description
------ -----------
--batch-size <Integer: size> Number of messages to send in a single
batch if they are not being sent
synchronously. (default: 200)
--broker-list <String: broker-list> REQUIRED: The broker list string in # 指定kafka broker地址
the form HOST1:PORT1,HOST2:PORT2.
--compression-codec [String: The compression codec: either 'none',
compression-codec] 'gzip', 'snappy', or 'lz4'.If
specified without value, then it
defaults to 'gzip'
--line-reader <String: reader_class> The class name of the class to use for
reading lines from standard in. By
default each line is read as a
separate message. (default: kafka.
tools.
ConsoleProducer$LineMessageReader)
--max-block-ms <Long: max block on The max time that the producer will
send> block for during a send request
(default: 60000)
--max-memory-bytes <Long: total memory The total memory used by the producer
in bytes> to buffer records waiting to be sent
to the server. (default: 33554432)
--max-partition-memory-bytes <Long: The buffer size allocated for a
memory in bytes per partition> partition. When records are received
which are smaller than this size the
producer will attempt to
optimistically group them together
until this size is reached.
(default: 16384)
--message-send-max-retries <Integer> Brokers can fail receiving the message
for multiple reasons, and being
unavailable transiently is just one
of them. This property specifies the
number of retires before the
producer give up and drop this
message. (default: 3)
--metadata-expiry-ms <Long: metadata The period of time in milliseconds
expiration interval> after which we force a refresh of
metadata even if we haven't seen any
leadership changes. (default: 300000)
--producer-property <String: A mechanism to pass user-defined
producer_prop> properties in the form key=value to
the producer.
--producer.config <String: config file> Producer config properties file. Note
that [producer-property] takes
precedence over this config.
--property <String: prop> A mechanism to pass user-defined
properties in the form key=value to
the message reader. This allows
custom configuration for a user-
defined message reader.
--request-required-acks <String: The required acks of the producer
request required acks> requests (default: 1)
--request-timeout-ms <Integer: request The ack timeout of the producer
timeout ms> requests. Value must be non-negative
and non-zero (default: 1500)
--retry-backoff-ms <Integer> Before each retry, the producer
refreshes the metadata of relevant
topics. Since leader election takes
a bit of time, this property
specifies the amount of time that
the producer waits before refreshing
the metadata. (default: 100)
--socket-buffer-size <Integer: size> The size of the tcp RECV size.
(default: 102400)
--sync If set message send requests to the
brokers are synchronously, one at a
time as they arrive.
--timeout <Integer: timeout_ms> If set and the producer is running in
asynchronous mode, this gives the
maximum amount of time a message
will queue awaiting sufficient batch
size. The value is given in ms.
(default: 1000)
--topic <String: topic> REQUIRED: The topic id to produce
messages to.

首先打开两个消费者,属于同一个组;

打开两个SSH终端,分别执行下面命令:

$ cd /data/knner
$ ./kafka01/bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.87:9092 --from-beginning --topic test --group test-group

然后在另打开一个SSH终端,执行下面命令充当生产者:

$ ./kafka01/bin/kafka-console-producer.sh --broker-list 172.17.0.87:9092,172.17.0.87:9093,172.17.0.87:9094 --topic test
>test001 # 输入两条消息
>test002

我们查看两个消费者中只有其中一个消费者消费了这两条消息,另一个没有消费。因为这两个消费者同属于一个组的。一条消息只能被组内个一个消费者消费。

我们继续测试,停掉刚才的两个消费者,然后分别执行下面命令,来运行两个消费者,但是,属于不同的组

# 消费者1:消费者组test-group01
$ ./kafka01/bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.87:9092 --from-beginning --topic test --group test-group
# 消费者2:消费者组test-group02
$ ./kafka01/bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.87:9092 --from-beginning --topic test --group test-group-01

然后在通过刚才的消费者再输入另外两条消息:

test003
test004

我们发现这两个消费者都消费了这两条消息;因为这两个消费者属于不同的组。一个消息被多个消费者组多次消费,但是只能被组内个一个消费者消费。

检查消费者位置以及分区偏移

kafka-consumer-groups.sh 命令帮助:

$ ./bin/kafka-consumer-groups.sh
List all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.
Option Description
------ -----------
--all-topics Consider all topics assigned to a
group in the `reset-offsets` process.
--bootstrap-server <String: server to REQUIRED: The server(s) to connect to.
connect to> # 指定kafka 服务器
--by-duration <String: duration> Reset offsets to offset by duration
from current timestamp. Format:
'PnDTnHnMnS'
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client and Consumer.
--delete Pass in groups to delete topic
partition offsets and ownership
information over the entire consumer
group. For instance --group g1 --
group g2 # 清除消费者信息,包括分片偏移量,所有者信息
--describe Describe consumer group and list
offset lag (number of messages not
yet processed) related to given
group. # 查看描述信息
--dry-run Only show results without executing
changes on Consumer Groups.
Supported operations: reset-offsets.
--execute Execute operation. Supported
operations: reset-offsets.
--export Export operation execution to a CSV
file. Supported operations: reset-
offsets. # 导出
--from-file <String: path to CSV file> Reset offsets to values defined in CSV
file.
--group <String: consumer group> The consumer group we wish to act on.
--list List all consumer groups. # 指定消费者组
--members Describe members of the group. This
option may be used with '--describe'
and '--bootstrap-server' options
only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
members
--offsets Describe the group and list all topic
partitions in the group along with
their offset lag. This is the
default sub-action of and may be
used with '--describe' and '--
bootstrap-server' options only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
offsets
--reset-offsets Reset offsets of consumer group.
Supports one consumer group at the
time, and instances should be
inactive
Has 2 execution options: --dry-run
(the default) to plan which offsets
to reset, and --execute to update
the offsets. Additionally, the --
export option is used to export the
results to a CSV format.
You must choose one of the following
reset specifications: --to-datetime,
--by-period, --to-earliest, --to-
latest, --shift-by, --from-file, --
to-current.
To define the scope use --all-topics
or --topic. One scope must be
specified unless you use '--from-
file'.
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset
by 'n', where 'n' can be positive or
negative.
--state Describe the group state. This option
may be used with '--describe' and '--
bootstrap-server' options only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
state
--timeout <Long: timeout (ms)> The timeout that can be set for some
use cases. For example, it can be
used when describing the group to
specify the maximum amount of time
in milliseconds to wait before the
group stabilizes (when the group is
just created, or is going through
some changes). (default: 5000)
--to-current Reset offsets to current offset.
--to-datetime <String: datetime> Reset offsets to offset from datetime.
Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset <Long: offset> Reset offsets to a specific offset.
--topic <String: topic> The topic whose consumer group
information should be deleted or
topic whose should be included in
the reset offset process. In `reset-
offsets` case, partitions can be
specified using this format: `topic1:
0,1,2`, where 0,1,2 are the
partition to be included in the
process. Reset-offsets also supports
multiple topic inputs.
--verbose Provide additional information, if
any, when describing the group. This
option may be used with '--
offsets'/'--members'/'--state' and
'--bootstrap-server' options only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
members --verbose

列出消费者组:

$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --list
test-group-01
test-group

查看具体消费者组的描述信息:

包括分区偏移量,最后的分区偏移量,所在host,client id

$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --describe --group test-group-01

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 5 5 0 consumer-1-1f2cc552-4df7-47ad-b112-52e2a87ee697 /172.17.0.87 consumer-1

如何指定CLIENT-ID呢?

我们关闭之前的所有消费者,另外启动两个,如下:

$ ./kafka01/bin/kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka02:9093,kafka03:9094 --from-beginning --topic test --group test-group-01 --consumer-property client.id=001

$ ./kafka01/bin/kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka02:9093,kafka03:9094 --from-beginning --topic test --group test-group-01 --consumer-property client.id=002

此时查看消费者组:

$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --describe --group test-group-01

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 5 5 0 001-3d946a1e-8cb3-4d4d-8395-4932a342fc89 /172.17.0.87 001

发现还是只有一个,加上--members:此选项提供使用者组中所有活动成员的列表

$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --describe --group test-group-01 --members

CONSUMER-ID HOST CLIENT-ID #PARTITIONS
001-3d946a1e-8cb3-4d4d-8395-4932a342fc89 /172.17.0.87 001 1
002-303d3b20-d08c-4da6-a492-913fb2d3a762 /172.17.0.87 002 0

--members --verbose:除了上述“ –members”选项报告的信息之外,此选项还提供分配给每个成员的分区

$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --describe --group test-group-01 --members --verbose

CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
001-3d946a1e-8cb3-4d4d-8395-4932a342fc89 /172.17.0.87 001 1 test(0)
002-303d3b20-d08c-4da6-a492-913fb2d3a762 /172.17.0.87 002 0 -

--offsets:这是默认的describe选项,并提供与“ –describe”选项相同的输出

--state:此选项提供有用的组级别信息。

$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --describe --group test-group-01 --state

COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
172.17.0.87:9092 (0) range Stable 2

要手动删除一个或多个使用者组,可以使用“ –delete”选项:

# 注意,需要先停止消费者
$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --delete --group test-group-01
Error: Deletion of some consumer groups failed:
* Group 'test-group-01' could not be deleted due to: NON_EMPTY_GROUP

$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --delete --group test-group-01
Deletion of requested consumer groups ('test-group-01') was successful.

$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --list
test
test-group

重置分区偏移

要重置使用者组的偏移,可以使用“ –reset-offsets”选项。此选项当时支持一个消费者组。它需要定义以下范围:–all-topics或–topic。除非您使用“ –from-file”方案,否则必须选择一个范围。另外,首先请确保使用者实例处于非活动状态。

它具有3个执行选项:

  • (默认)以显示要重置的偏移量。
  • –execute:执行–reset-offsets过程。
  • –export:将结果导出为CSV格式。

–reset-offsets还具有以下场景可供选择(必须选择至少一个场景):

  • –to-datetime <String:datetime>:将偏移量重置为与datetime的偏移量。格式:“ YYYY-MM-DDTHH:mm:SS.sss”
  • –to-earliest :将偏移量重置为最早的偏移量。
  • –to-latest:将偏移量重置为最新偏移量。
  • –shift-by <Long: number-of-offsets>:重置偏移量,将当前偏移量偏移“ n”,其中“ n”可以为正或负。
  • –from-file:将偏移量重置为CSV文件中定义的值。
  • –to-current:将偏移量重置为当前偏移量。
  • –by-duration <String:duration>:将偏移量重置为从当前时间戳记的持续时间偏移量。格式:“ PnDTnHnMnS”
  • –to-offset:将偏移量重置为特定偏移量。

请注意,超出范围的偏移量将调整为可用的偏移量结束。例如,如果偏移量结束为10,偏移量请求为15,则实际上将选择偏移量为10。

# 查看:CURRENT-OFFSET=LOG-END-OFFSET 也就是全部消费完了
$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --describe --group test-group
Consumer group 'test-group' has no active members.

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 5 5 0 - - -
# 重置偏移量到最早的
$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --reset-offsets --group test-group --topic test --to-earliest
WARN: No action will be performed as the --execute option is missing.In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.

TOPIC PARTITION NEW-OFFSET
test 0 0
# 发现并没有生效,
$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --describe --group test-group
Consumer group 'test-group' has no active members.

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 5 5 0 - - -
# 根据上面的提示,需要加上--execute参数来执行:
$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --reset-offsets --group test-group --topic test --to-earliest --execute

TOPIC PARTITION NEW-OFFSET
test 0 0

# CURRENT-OFFSET此时为0了。
$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --describe --group test-group
Consumer group 'test-group' has no active members.

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 0 5 5 - - -

扩展集群

将服务器添加到Kafka集群很容易,只需为其分配唯一的代理ID,然后在新服务器上启动Kafka。但是,不会为这些新服务器自动分配任何数据分区,因此,除非将分区移至它们,否则在创建新主题之前它们将不会做任何工作。因此,通常在将计算机添加到群集时,您将需要将一些现有数据迁移到这些计算机。

数据迁移过程是手动启动的,但是是完全自动化的。在幕后,Kafka会将新服务器添加为要迁移的分区的跟随者,并允许它完全复制该分区中的现有数据。新服务器完全复制该分区的内容并加入同步副本后,现有副本之一将删除其分区的数据。

分区重新分配工具可用于在代理之间移动分区。理想的分区分布将确保所有代理之间的数据负载和分区大小均匀。分区重新分配工具没有能力自动研究Kafka群集中的数据分布,并四处移动分区以实现均匀的负载分布。因此,管理员必须弄清楚应该移动哪些主题或分区。

分区重新分配工具可以在3种互斥模式下运行:

  • –generate:在此模式下,给定主题列表和代理列表,该工具会生成候选重新分配,以将指定主题的所有分区移至新的代理。给定主题和目标代理的列表,此选项仅提供一种方便的方法来生成分区重新分配计划。
  • –execute:在此模式下,该工具将根据用户提供的重新分配计划启动分区的重新分配。(使用–reassignment-json-file选项)。这可以是管理员手工制作的自定义重新分配计划,也可以使用–generate选项提供
  • –verify:在此模式下,该工具会验证上一次–execute期间列出的所有分区的重新分配状态。状态可以是成功完成,失败或进行中

自动将数据迁移到新计算机

分区重新分配工具可用于将某些主题从当前代理集移到新添加的代理。这在扩展现有集群时通常很有用,因为与一次移动一个分区相比,将整个主题移至新的一组代理更容易。用于执行此操作时,用户应提供应移至新的一组代理的主题列表和新代理的目标列表。然后,该工具将给定主题列表中的所有分区平均分配到新的一组代理中。在此过程中,主题的复制因子保持不变。实际上,主题输入列表的所有分区的副本都从旧的代理集移到了新添加的代理。

例如,以下示例将主题foo1,foo2的所有分区移动到新的代理集5,6。在此步骤结束时,主题foo1和foo2的所有分区仅存在于代理5,6上。

由于该工具将主题的输入列表作为json文件接受,因此您首先需要确定要移动的主题并按以下方式创建json文件:

> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}

kafka-reassign-partitions.sh 命令帮助:

$ ./bin/kafka-reassign-partitions.sh
This command moves topic partitions between replicas.
Option Description
------ -----------
--bootstrap-server <String: Server(s) the server(s) to use for
to use for bootstrapping> bootstrapping. REQUIRED if an
absolution path of the log directory
is specified for any replica in the
reassignment json file
--broker-list <String: brokerlist> The list of brokers to which the
partitions need to be reassigned in
the form "0,1,2". This is required
if --topics-to-move-json-file is
used to generate reassignment
configuration
--disable-rack-aware Disable rack aware replica assignment
--execute Kick off the reassignment as specified
by the --reassignment-json-file
option.
--generate Generate a candidate partition
reassignment configuration. Note
that this only generates a candidate
assignment, it does not execute it.
--reassignment-json-file <String: The JSON file with the partition
manual assignment json file path> reassignment configurationThe format
to use is -
{"partitions":
[{"topic": "foo",
"partition": 1,
"replicas": [1,2,3],
"log_dirs": ["dir1","dir2","dir3"]
}],
"version":1
}
Note that "log_dirs" is optional. When
it is specified, its length must
equal the length of the replicas
list. The value in this list can be
either "any" or the absolution path
of the log directory on the broker.
If absolute log directory path is
specified, it is currently required
that the replica has not already
been created on that broker. The
replica will then be created in the
specified log directory on the
broker later.
--replica-alter-log-dirs-throttle The movement of replicas between log
<Long: replicaAlterLogDirsThrottle> directories on the same broker will
be throttled to this value
(bytes/sec). Rerunning with this
option, whilst a rebalance is in
progress, will alter the throttle
value. The throttle rate should be
at least 1 KB/s. (default: -1)
--throttle <Long: throttle> The movement of partitions between
brokers will be throttled to this
value (bytes/sec). Rerunning with
this option, whilst a rebalance is
in progress, will alter the throttle
value. The throttle rate should be
at least 1 KB/s. (default: -1)
--timeout <Long: timeout> The maximum time in ms allowed to wait
for partition reassignment execution
to be successfully initiated
(default: 10000)
--topics-to-move-json-file <String: Generate a reassignment configuration
topics to reassign json file path> to move the partitions of the
specified topics to the list of
brokers specified by the --broker-
list option. The format to use is -
{"topics":
[{"topic": "foo"},{"topic": "foo1"}],
"version":1
}
--verify Verify if the reassignment completed
as specified by the --reassignment-
json-file option. If there is a
throttle engaged for the replicas
specified, and the rebalance has
completed, the throttle will be
removed
--zookeeper <String: urls> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.

JSON文件准备好后,请使用分区重新分配工具生成候选分配:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Proposed partition reassignment configuration

{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}

该工具生成候选分配,该分配会将所有分区从主题foo1,foo2移至代理5,6。但是请注意,此时分区移动尚未开始,它仅告诉您当前分配和建议的新分配。如果您想回滚到当前分配,则应将其保存。新的赋值应保存在json文件(例如,expand-cluster-reassignment.json)中,然后使用–execute选项输入到工具中,如下所示:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}

最后,–verify选项可与该工具一起使用,以检查分区重新分配的状态。请注意,应将相同的expand-cluster-reassignment.json(与–execute选项一起使用)与–verify选项一起使用:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully

自定义分区分配和迁移

分区重新分配工具还可以用于将分区的副本选择性地移动到一组特定的代理。以这种方式使用时,假定用户知道重新分配计划,并且不需要工具生成候选重新分配,从而有效地跳过了–generate步骤并直接进入–execute步骤

例如,以下示例将主题foo1的分区0移动到代理5,6,将主题foo2的分区1移动到代理2,3:

第一步是在json文件中手工制作自定义重新分配计划:

> cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}

然后,将json文件与–execute选项一起使用以开始重新分配过程:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}

–verify选项可与该工具一起使用,以检查分区重新分配的状态。请注意,应将相同的expand-cluster-reassignment.json(与–execute选项一起使用)与–verify选项一起使用:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo2,1] completed successfully

复制因子增加

增加现有分区的复制因子很容易。只需在自定义重新分配json文件中指定额外的副本,然后将其与–execute选项一起使用即可增加指定分区的复制因子。

例如,以下示例将主题foo的分区0的复制因子从1增加到3。在增加复制因子之前,该分区的唯一副本存在于代理5上。作为增加复制因子的一部分,我们将在经纪人6和7。

第一步是在json文件中手工制作自定义重新分配计划:

> cat increase-replication-factor.json
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

然后,将json文件与–execute选项一起使用以开始重新分配过程:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

–verify选项可与该工具一起使用,以检查分区重新分配的状态。请注意,应将–verify选项与同一crement-replication-factor.json(与–execute选项一起使用)一起使用:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [foo,0] completed successfully

您还可以使用kafka-topics工具验证复制因子的增加:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7

我们已之前创建的test Topic为例:

这里的replicationFactor是1,也就是没有副本的。

$ ./bin/kafka-topics.sh --zookeeper zk01:2181 --describe --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
4Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1

创建json文件:

$ cat jsonfiles/increase-replication-factor.json 
{
"partitions": [
{
"partition": 0,
"replicas": [
0,
1,
2
],
"topic": "test"
}
],
"version": 1
}

应用:

$ ./bin/kafka-reassign-partitions.sh --zookeeper zk01:2181 --reassignment-json-file jsonfiles/increase-replication-factor.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

检查一下:

$ ./bin/kafka-topics.sh --zookeeper zk01:2181 --describe --topic test
Topic:test PartitionCount:1 ReplicationFactor:3 Configs:
4Topic: test Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,0,2

限制数据迁移期间的带宽使用

Kafka允许您对复制流量应用限制,在用于将副本在计算机之间移动的带宽上设置上限。当重新平衡群集,引导新的代理或添加或删除代理时,这很有用,因为它限制了这些数据密集型操作对用户的影响。

有两个接口可用于接合油门。最简单,最安全的方法是在调用kafka-reassign-partitions.sh时应用油门,但kafka-configs.sh也可用于直接查看和更改油门值。

因此,例如,如果您要执行重新平衡,则使用以下命令,它将以不超过50MB/s的速度移动分区。

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file bigger-cluster.json —throttle 50000000

当您执行此脚本时,您将看到油门启动:

The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.

如果您希望在重新平衡期间更改油门,比如说要增加吞吐量以使其更快地完成,则可以通过重新运行传递相同的reassignment-json-file的execute命令来执行此操作:

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
There is an existing assignment running.
The throttle limit was set to 700000000 B/s

重新平衡完成后,管理员可以使用–verify选项检查重新平衡的状态。如果重新平衡完成,则将通过–verify命令删除油门。重要的是,一旦重新平衡完成,管理员可以通过使用–verify选项运行命令来及时删除限制。否则,可能会限制常规复制流量。

当执行–verify选项并完成重新分配后,脚本将确认已取消节流阀:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --verify --reassignment-json-file bigger-cluster.json
Status of partition reassignment:
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [mytopic,0] completed successfully
Throttle was removed.

管理员还可以使用kafka-configs.sh验证分配的配置。有两对节气门配置,用于管理节流过程。油门值本身。这是使用动态属性在代理级别配置的:

leader.replication.throttled.rate
follower.replication.throttled.rate

还有一组枚举的受限制的副本:

leader.replication.throttled.replicas
follower.replication.throttled.replicas

每个主题都配置了哪些。所有四个配置值都由kafka-reassign-partitions.sh自动分配(在下面讨论)。

要查看油门极限配置:

> bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers
Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000

这显示了应用于复制协议的引导方和跟随方的限制。默认情况下,双方都分配有相同的限制吞吐量值。

要查看限制副本的列表:

> bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics
Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
follower.replication.throttled.replicas=1:101,0:102

在这里,我们看到引导者限制应用于代理102上的分区1和代理101的分区0。同样,跟随者限制应用于代理101上的分区1和代理102上的分区0。

默认情况下,kafka-reassign-partitions.sh会将引导者限制应用于重新平衡之前存在的所有副本,其中任何一个都可能是领导者。它将从动油门应用于所有移动目的地。因此,如果在代理程序101,102上存在一个具有副本的分区,并将其重新分配给102,103,则该分区的前导调节器将应用于101,102,而后继调节器将仅应用于103。

如果需要,还可以使用kafka-configs.sh上的–alter开关手动更改油门配置。

Kafka python

Kafka-python 3.5k※

GitHub:

https://github.com/dpkp/kafka-python

Docs:

https://kafka-python.readthedocs.io/en/master/index.html

confluent-kafka-python 1.6k※

GitHub:

https://github.com/confluentinc/confluent-kafka-python

Docs:

http://docs.confluent.io/current/clients/index.html

pykafka 983※

GitHub:

https://github.com/Parsely/pykafka

Docs:

http://pykafka.readthedocs.org/

Kafka生态

参考:

https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem

本文到这里就结束了,欢迎期待后面的文章。您可以关注下方的公众号二维码,在第一时间查看新文章。

公众号

如有疏忽错误欢迎在留言区评论指正,如果对您有所帮助欢迎点击下方进行打赏。