syncLimit 限制多长和leader的数据同步时间,limits how far out of date a server can be from a leader,乘以基本事件单位tickTime为具体的时间;允许以关注者与ZooKeeper同步的时间,如果追随者远远落后于领导者,他们将被丢弃。
$ cat zk01/conf/zoo.cfg # The number of milliseconds of each tick 时间单位为2s tickTime=2000 # The number of ticks that the initial # synchronization phase can take。此时是20s initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement 此时是10s syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/data/zkdata/zk01
dataLogDir=/data/zklog/zk01 # the port at which the clients will connect 只监听的172.17.0.87这个地址上,端口是2181,多网卡很有用 clientPort=2181 clientPortAddress=172.17.0.87 # the maximum number of client connections. # increase this if you need to handle more clients 客户端最大连接数 maxClientCnxns=1000 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir 保留3个snapshots autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature 每隔3h执行一次 autopurge.purgeInterval=3
$ cat zk02/conf/zoo.cfg # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/data/zkdata/zk02
dataLogDir=/data/zklog/zk02 # the port at which the clients will connect clientPort=2182 clientPortAddress=172.17.0.87 # the maximum number of client connections. # increase this if you need to handle more clients maxClientCnxns=1000 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature autopurge.purgeInterval=3
# admin server admin.enableServer=true admin.serverAddress=172.17.0.87 admin.serverPort=8082 admin.commandURL=/knner
zk03 配置
$ cat zk03/conf/zoo.cfg # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/data/zkdata/zk03
dataLogDir=/data/zklog/zk03 # the port at which the clients will connect clientPort=2183 clientPortAddress=172.17.0.87 # the maximum number of client connections. # increase this if you need to handle more clients maxClientCnxns=1000 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature autopurge.purgeInterval=3
# 监听端口 $ netstat -lnutp|grep 30504 (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) tcp6 0 0 172.17.0.87:3888 :::* LISTEN 30504/java tcp6 0 0 172.17.0.87:8081 :::* LISTEN 30504/java tcp6 0 0 :::46075 :::* LISTEN 30504/java tcp6 0 0 172.17.0.87:2181 :::* LISTEN 30504/java
# 再次查看zk01 状态,提示不再运行了 $ ./zk01/bin/zkServer.sh status /usr/bin/java ZooKeeper JMX enabled by default Using config: /data/knner/zk01/bin/../conf/zoo.cfg Client port found: 2181. Client address: 172.17.0.87. Error contacting service. It is probably not running.
$ bin/zkTxnLogToolkit.sh usage: TxnLogToolkit [-dhrv] txn_log_file_name -d,--dump Dump mode. Dump all entries of the log file. (this is the default) -h,--help Print help message -r,--recover Recovery mode. Re-calculate CRC for broken entries. -v,--verbose Be verbose in recovery mode: print all entries, not just fixed ones. -y,--yes Non-interactive mode: repair all CRC errors without asking
$ bin/zkTxnLogToolkit.sh -r log.100000001 ZooKeeper Transactional Log File with dbid 0 txnlog format version 2 CRC ERROR - 4/5/18 2:16:05 PM CEST session 0x16295bafcc40000 cxid 0x1 zxid 0x100000002 closeSession null Would you like to fix it (Yes/No/Abort) ?
$ bin/zkTxnLogToolkit.sh -r log.100000001 ZooKeeper Transactional Log File with dbid 0 txnlog format version 2 CRC ERROR - 4/5/18 2:16:05 PM CEST session 0x16295bafcc40000 cxid 0x1 zxid 0x100000002 closeSession null Would you like to fix it (Yes/No/Abort) ? y EOF reached after 6 txns. Recovery file log.100000001.fixed has been written with 1 fixed CRC error(s)
$ ./kafka01/bin/kafka-topics.sh Create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions, # 修改 replica assignment, and/or configuration for the topic. --config <String: name=value> A topic configuration override for the topic being created or altered.The following is a list of valid configurations: # 添加或修改topic的可用配置 cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.downconversion.enable message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs. --create Create a new topic. # 创建topic --delete Delete a topic # 删除topic --delete-config <String: name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). # 清除已有topic的配置 --describe List details for the given topics. # 查看topic 信息 --disable-rack-aware Disable rack aware replica assignment --force Suppress console prompts --help Print usage information. --if-exists ifset when altering or deleting topics, the action will only execute if the topic exists --if-not-exists ifset when creating topics, the action will only execute if the topic does not already exist --list List all available topics. # 查看topic列表 --partitions <Integer: # of partitions> The number of partitions for the topic # 指定分区数量,在创建或者修改的时候 being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected --replica-assignment <String: A list of manual partition-to-broker # replica手动分配replica和Broker之间的映射关系,在创建修改的时候 broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each # 指定副本数,创建时 replication factor> partition in the topic being created. --topic <String: topic> The topic to be create, alter or # topic 名称 describe. Can also accept a regular expression except for --create option --topics-with-overrides ifset when describing topics, only show topics that have overridden configs --unavailable-partitions ifset when describing topics, only show partitions whose leader is not available --under-replicated-partitions ifset when describing topics, only show under replicated partitions --zookeeper <String: hosts> REQUIRED: The connection string for# 指定zookeeper地址 the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over.
$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --alter --topic my-replicated-topic-manual --partitions 4 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
$ ./bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic my-replicated-topic --partitions 2 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Error while executing topic command : The number of partitions for a topic can only be increased. Topic my-replicated-topic currently has 4 partitions, 2 would not be an increase. [2019-12-10 10:22:31,772] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic my-replicated-topic currently has 4 partitions, 2 would not be an increase. (kafka.admin.TopicCommand$)
$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --alter --topic my-replicated-topic-manual --partitions 5 --replica-assignment 0:1:2,1:2:0,2:0:1,2:1:0,0:2:1 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --delete --topic my-replicated-topic-manual Topic my-replicated-topic-manual is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
$ ./kafka01/bin/kafka-topics.sh --zookeeper 172.17.0.87:2181 --list my-replicated-topic test
$ ./bin/kafka-configs.sh Add/Remove entity config for a topic, client, user or broker Option Description ------ ----------- --add-config <String> Key Value pairs of configs to add. # 增加配置 Square brackets can be used to group values which contain commas: 'k1=v1, k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: For entity-type 'topics': # topic配置 cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.downconversion.enable message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable For entity-type 'brokers': # brokers 配置 log.message.timestamp.type ssl.client.auth log.retention.ms sasl.login.refresh.window.jitter sasl.kerberos.ticket.renew.window. factor log.preallocate log.index.size.max.bytes sasl.login.refresh.window.factor ssl.truststore.type ssl.keymanager.algorithm log.cleaner.io.buffer.load.factor sasl.login.refresh.min.period.seconds ssl.key.password background.threads log.retention.bytes ssl.trustmanager.algorithm log.segment.bytes log.cleaner.delete.retention.ms log.segment.delete.delay.ms min.insync.replicas ssl.keystore.location ssl.cipher.suites log.roll.jitter.ms log.cleaner.backoff.ms sasl.jaas.config principal.builder.class log.flush.interval.ms log.cleaner.dedupe.buffer.size log.flush.interval.messages advertised.listeners num.io.threads listener.security.protocol.map log.message.downconversion.enable sasl.enabled.mechanisms sasl.login.refresh.buffer.seconds ssl.truststore.password listeners metric.reporters ssl.protocol sasl.kerberos.ticket.renew.jitter ssl.keystore.password sasl.mechanism.inter.broker.protocol log.cleanup.policy sasl.kerberos.principal.to.local.rules sasl.kerberos.min.time.before.relogin num.recovery.threads.per.data.dir log.cleaner.io.max.bytes.per.second log.roll.ms ssl.endpoint.identification.algorithm unclean.leader.election.enable message.max.bytes log.cleaner.threads log.cleaner.io.buffer.size sasl.kerberos.service.name ssl.provider follower.replication.throttled.rate log.index.interval.bytes log.cleaner.min.compaction.lag.ms log.message.timestamp.difference.max. ms ssl.enabled.protocols log.cleaner.min.cleanable.ratio replica.alter.log.dirs.io.max.bytes. per.second ssl.keystore.type ssl.secure.random.implementation ssl.truststore.location sasl.kerberos.kinit.cmd leader.replication.throttled.rate num.network.threads compression.type num.replica.fetchers For entity-type 'users': # users配置 request_percentage producer_byte_rate SCRAM-SHA-256 SCRAM-SHA-512 consumer_byte_rate For entity-type 'clients': # clients配置 request_percentage producer_byte_rate consumer_byte_rate Entity types 'users' and 'clients' may be specified together to update config for clients of a specific user. --alter Alter the configuration for the entity. # 修改 --bootstrap-server <String: server to The Kafka server to connect to. This connect to> is required for describing and altering broker configs. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs. --delete-config <String> config keys to remove 'k1,k2'# 删除配置 --describe List configs for the given entity. # 查看配置 --entity-default Default entity name for clients/users/brokers (applies to corresponding entity typeincommand line) --entity-name <String> Name of entity (topic name/client # 实体名称 id/user principal name/broker id) --entity-type <String> Type of entity # 实体类型 (topics/clients/users/brokers) --force Suppress console prompts --help Print usage information. --zookeeper <String: urls> REQUIRED: The connection string for# zk地址 the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
列出Topic的配置:
以上面创建的Topic:my-replicated-topic 为例:
$ ./bin/kafka-configs.sh --zookeeper zk01:2181 --entity-type topics --entity-name my-replicated-topic --describe Configs for topic 'my-replicated-topic' are
A string that is either “delete” or “compact” or both. This string designates the retention policy to use on old log segments. The default policy (“delete”) will discard old segments when their retention time or size limit has been reached. The “compact” setting will enable log compaction on the topic.
list
delete
[compact, delete]
log.cleanup.policy
medium
compression.type
Specify the final compression type for a given topic. This configuration accepts the standard compression codecs (‘gzip’, ‘snappy’, ‘lz4’, ‘zstd’). It additionally accepts ‘uncompressed’ which is equivalent to no compression; and ‘producer’ which means retain the original compression codec set by the producer.
$ ./bin/kafka-configs.sh --zookeeper zk01:2181 --entity-type topics --entity-name my-replicated-topic --describe Configs for topic 'my-replicated-topic' are cleanup.policy=delete
Kafka 的生产者和消费者测试
我们使用上面已经创建好的Topic:test
首先查看kafka-console-consumer.sh脚本的帮助:
$ ./kafka01/bin/kafka-console-consumer.sh The console consumer is a tool that reads data from Kafka and outputs it to standard output. Option Description ------ ----------- --bootstrap-server <String: server to REQUIRED: The server(s) to connect to. connect to> # 指定kafka的地址 --consumer-property <String: A mechanism to pass user-defined consumer_prop> properties in the form key=value to the consumer. --consumer.config <String: config file> Consumer config properties file. Note that [consumer-property] takes precedence over this config. --enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) --formatter <String: class> The name of a class to use for formatting kafka messages for display. (default: kafka.tools. DefaultMessageFormatter) --from-beginning If the consumer does not already have # 从开头开始消费 an established offset to consume from, start with the earliest message present in the log rather than the latest message. --group <String: consumer group id> The consumer group id of the consumer. # 指定消费者组 --isolation-level <String> Set to read_committed in order to filter out transactional messages which are not committed. Set to read_uncommittedto read all messages. (default: read_uncommitted) --key-deserializer <String: deserializer for key> --max-messages <Integer: num_messages> The maximum number of messages to consume before exiting. If not set, consumption is continual. --offset <String: consume offset> The offset id to consume from (a non- negative number), or 'earliest' which means from beginning, or 'latest'which means from end (default: latest) --partition <Integer: partition> The partition to consume from. Consumption starts from the end of the partition unless '--offset' is specified. --property <String: prop> The properties to initialize the message formatter. Default properties include: print.timestamp=true|false print.key=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> key.deserializer=<key.deserializer> value.deserializer=<value. deserializer> Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key. deserializer.' and 'value. deserializer.' prefixes to configure their deserializers. --skip-message-on-error If there is an error when processing a message, skip it instead of halt. --timeout-ms <Integer: timeout_ms> If specified, exitif no message is available for consumption for the specified interval. --topic <String: topic> The topic id to consume on. --value-deserializer <String: deserializer for values> --whitelist <String: whitelist> Whitelist of topics to include for consumption.
kafka-console-producer.sh 脚本帮助:
$ ./kafka01/bin/kafka-console-producer.sh Read data from standard input and publish it to Kafka. Option Description ------ ----------- --batch-size <Integer: size> Number of messages to send in a single batch if they are not being sent synchronously. (default: 200) --broker-list <String: broker-list> REQUIRED: The broker list string in# 指定kafka broker地址 the form HOST1:PORT1,HOST2:PORT2. --compression-codec [String: The compression codec: either 'none', compression-codec] 'gzip', 'snappy', or 'lz4'.If specified without value, then it defaults to 'gzip' --line-reader <String: reader_class> The class name of the class to use for reading lines from standard in. By default each line is read as a separate message. (default: kafka. tools. ConsoleProducer$LineMessageReader) --max-block-ms <Long: max block on The max time that the producer will send> block for during a send request (default: 60000) --max-memory-bytes <Long: total memory The total memory used by the producer in bytes> to buffer records waiting to be sent to the server. (default: 33554432) --max-partition-memory-bytes <Long: The buffer size allocated for a memory in bytes per partition> partition. When records are received which are smaller than this size the producer will attempt to optimistically group them together until this size is reached. (default: 16384) --message-send-max-retries <Integer> Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message. (default: 3) --metadata-expiry-ms <Long: metadata The period of time in milliseconds expiration interval> after which we force a refresh of metadata even if we haven't seen any leadership changes. (default: 300000) --producer-property <String: A mechanism to pass user-defined producer_prop> properties in the form key=value to the producer. --producer.config <String: config file> Producer config properties file. Note that [producer-property] takes precedence over this config. --property <String: prop> A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user- defined message reader. --request-required-acks <String: The required acks of the producer request required acks> requests (default: 1) --request-timeout-ms <Integer: request The ack timeout of the producer timeout ms> requests. Value must be non-negative and non-zero (default: 1500) --retry-backoff-ms <Integer> Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. (default: 100) --socket-buffer-size <Integer: size> The size of the tcp RECV size. (default: 102400) --sync If set message send requests to the brokers are synchronously, one at a time as they arrive. --timeout <Integer: timeout_ms> If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms. (default: 1000) --topic <String: topic> REQUIRED: The topic id to produce messages to.
首先打开两个消费者,属于同一个组;
打开两个SSH终端,分别执行下面命令:
$ cd /data/knner $ ./kafka01/bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.87:9092 --from-beginning --topic test --group test-group
然后在另打开一个SSH终端,执行下面命令充当生产者:
$ ./kafka01/bin/kafka-console-producer.sh --broker-list 172.17.0.87:9092,172.17.0.87:9093,172.17.0.87:9094 --topic test >test001 # 输入两条消息 >test002
$ ./bin/kafka-consumer-groups.sh List all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets. Option Description ------ ----------- --all-topics Consider all topics assigned to a group in the `reset-offsets` process. --bootstrap-server <String: server to REQUIRED: The server(s) to connect to. connect to> # 指定kafka 服务器 --by-duration <String: duration> Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS' --command-config <String: command Property file containing configs to be config property file> passed to Admin Client and Consumer. --delete Pass in groups to delete topic partition offsets and ownership information over the entire consumer group. For instance --group g1 -- group g2 # 清除消费者信息,包括分片偏移量,所有者信息 --describe Describe consumer group and list offset lag (number of messages not yet processed) related to given group. # 查看描述信息 --dry-run Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets. --execute Execute operation. Supported operations: reset-offsets. --export Export operation execution to a CSV file. Supported operations: reset- offsets. # 导出 --from-file <String: path to CSV file> Reset offsets to values defined in CSV file. --group <String: consumer group> The consumer group we wish to act on. --list List all consumer groups. # 指定消费者组 --members Describe members of the group. This option may be used with '--describe' and '--bootstrap-server' options only. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- members --offsets Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with '--describe' and '-- bootstrap-server' options only. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- offsets --reset-offsets Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. Additionally, the -- export option is used to export the results to a CSV format. You must choose one of the following reset specifications: --to-datetime, --by-period, --to-earliest, --to- latest, --shift-by, --from-file, -- to-current. To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from- file'. --shift-by <Long: number-of-offsets> Reset offsets shifting current offset by 'n', where'n' can be positive or negative. --state Describe the group state. This option may be used with '--describe' and '-- bootstrap-server' options only. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- state --timeout <Long: timeout (ms)> The timeout that can be setfor some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000) --to-current Reset offsets to current offset. --to-datetime <String: datetime> Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss' --to-earliest Reset offsets to earliest offset. --to-latest Reset offsets to latest offset. --to-offset <Long: offset> Reset offsets to a specific offset. --topic <String: topic> The topic whose consumer group information should be deleted or topic whose should be included in the reset offset process. In `reset- offsets` case, partitions can be specified using this format: `topic1: 0,1,2`, where 0,1,2 are the partition to be included in the process. Reset-offsets also supports multiple topic inputs. --verbose Provide additional information, if any, when describing the group. This option may be used with '-- offsets'/'--members'/'--state' and '--bootstrap-server' options only. Example: --bootstrap-server localhost: 9092 --describe --group group1 -- members --verbose
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS 172.17.0.87:9092 (0) range Stable 2
要手动删除一个或多个使用者组,可以使用“ –delete”选项:
# 注意,需要先停止消费者 $ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --delete --group test-group-01 Error: Deletion of some consumer groups failed: * Group 'test-group-01' could not be deleted due to: NON_EMPTY_GROUP
$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --delete --group test-group-01 Deletion of requested consumer groups ('test-group-01') was successful.
$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --list test test-group
# 查看:CURRENT-OFFSET=LOG-END-OFFSET 也就是全部消费完了 $ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --describe --group test-group Consumer group 'test-group' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 5 5 0 - - - # 重置偏移量到最早的 $ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --reset-offsets --group test-group --topic test --to-earliest WARN: No action will be performed as the --execute option is missing.In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.
TOPIC PARTITION NEW-OFFSET test 0 0 # 发现并没有生效, $ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --describe --group test-group Consumer group 'test-group' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 5 5 0 - - - # 根据上面的提示,需要加上--execute参数来执行: $ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --reset-offsets --group test-group --topic test --to-earliest --execute
TOPIC PARTITION NEW-OFFSET test 0 0
# CURRENT-OFFSET此时为0了。 $ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka01:9092 --describe --group test-group Consumer group 'test-group' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 0 5 5 - - -
$ ./bin/kafka-reassign-partitions.sh This command moves topic partitions between replicas. Option Description ------ ----------- --bootstrap-server <String: Server(s) the server(s) to use for to use for bootstrapping> bootstrapping. REQUIRED if an absolution path of the log directory is specified for any replica in the reassignment json file --broker-list <String: brokerlist> The list of brokers to which the partitions need to be reassigned in the form "0,1,2". This is required if --topics-to-move-json-file is used to generate reassignment configuration --disable-rack-aware Disable rack aware replica assignment --execute Kick off the reassignment as specified by the --reassignment-json-file option. --generate Generate a candidate partition reassignment configuration. Note that this only generates a candidate assignment, it does not execute it. --reassignment-json-file <String: The JSON file with the partition manual assignment json file path> reassignment configurationThe format to use is - {"partitions": [{"topic": "foo", "partition": 1, "replicas": [1,2,3], "log_dirs": ["dir1","dir2","dir3"] }], "version":1 } Note that "log_dirs" is optional. When it is specified, its length must equal the length of the replicas list. The value in this list can be either "any" or the absolution path of the log directory on the broker. If absolute log directory path is specified, it is currently required that the replica has not already been created on that broker. The replica will then be created in the specified log directory on the broker later. --replica-alter-log-dirs-throttle The movement of replicas between log <Long: replicaAlterLogDirsThrottle> directories on the same broker will be throttled to this value (bytes/sec). Rerunning with this option, whilst a rebalance is in progress, will alter the throttle value. The throttle rate should be at least 1 KB/s. (default: -1) --throttle <Long: throttle> The movement of partitions between brokers will be throttled to this value (bytes/sec). Rerunning with this option, whilst a rebalance is in progress, will alter the throttle value. The throttle rate should be at least 1 KB/s. (default: -1) --timeout <Long: timeout> The maximum time in ms allowed to wait for partition reassignment execution to be successfully initiated (default: 10000) --topics-to-move-json-file <String: Generate a reassignment configuration topics to reassign json file path> to move the partitions of the specified topics to the list of brokers specified by the --broker- list option. The format to use is - {"topics": [{"topic": "foo"},{"topic": "foo1"}], "version":1 } --verify Verify if the reassignment completed as specified by the --reassignment- json-file option. If there is a throttle engaged for the replicas specified, and the rebalance has completed, the throttle will be removed --zookeeper <String: urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] }
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo1,1] is in progress Reassignment of partition [foo1,2] is in progress Reassignment of partition [foo2,0] completed successfully Reassignment of partition [foo2,1] completed successfully Reassignment of partition [foo2,2] completed successfully
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]}, {"topic":"foo2","partition":1,"replicas":[3,4]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] }
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file bigger-cluster.json --throttle 700000000 There is an existing assignment running. The throttle limit was set to 700000000 B/s
> bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000 Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
这显示了应用于复制协议的引导方和跟随方的限制。默认情况下,双方都分配有相同的限制吞吐量值。
要查看限制副本的列表:
> bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101, follower.replication.throttled.replicas=1:101,0:102