Hello, everyone!
Today I'm going to introduce you FI KafKa. It helps beginners learn about FI faster.
Overview
Log Overview
Client Tools
Client Configuration
Overview
Kafka is a high throughput, distributed message system based on release and subscription.
The following figure shows the overall Kafka topology.
A typical Kafka cluster can contain several Producers (can be page views produced in the web front-end, server logs, system CPUs, or memory), Brokers (Kafka supports horizontal expansion. The cluster throughput rate increases as Brokers add.), Consumer Groups, and a ZooKeeper cluster. Kafka uses ZooKeeper to manage cluster configurations, elect a leader, and rebalance when consumer groups change. Producer publishes the message to Broker using the push mode and Consumer subscribes to and consumes the message from Broker using the pull mode.
Log Overview
Log path: The default path of Kafka logs is /var/log/Bigdata/kafka/broker.
Log archive rule: The automatic Kafka log compression function is enabled. By default, when the size of a log file exceeds 30 MB (the log file size threshold is configurable. For details, see "Configuring the Log Level and Log File Size"), the log file is automatically compressed into a log file named in the following rule: <Original log file name>-<yyyy-mm-dd_hh-mm-ss>.[No.].log.zip. A maximum of 20 latest compressed files are reserved by default. The number of compressed files and the compression threshold can be configured.
Kafka Log List
Log Type | Log File Name | Description |
---|---|---|
Run log | server.log | Server run log file of the Kafka service process |
controller.log | Controller run log file of the Kafka service process | |
kafka-request.log | Request run log file of the Kafka service process | |
log-cleaner.log | Cleaner run log file of the Kafka service process | |
state-change.log | State-change run log file of the Kafka service process | |
kafkaServer-gc.log | GC log file of the Kafka service process | |
postinstall.log | Work log file after the Kafka installation | |
prestart.log | Work log file before the Kafka startup | |
checkService.log | Log file that records whether the Kafka service starts successfully | |
cleanup.log | Cleanup log file for Kafka downloading | |
start.log | Startup log file of the KafkaServer process | |
stop.log | Stop log file of the KafkaServer process | |
checkavailable.log | Log file that records the health check details of the Kafka service | |
checkInstanceHealth.log | Log file that records the health check details of the Kafka Broker |
Log Level
Table 2 describes the log levels provided by Kafka.
The levels of run logs are OFF, ERROR, WARN, INFO, DEBUG, and TRACE from high priority to low. Run logs of equal or higher levels are recorded. The higher the specified log level, the less the logs recorded.
Table 2 Log levelLevel
Description
OFF
Indicates that log output is disabled.
ERROR
Logs of this level record error information about system running.
WARN
Logs of this level record abnormal information about the current event processing.
INFO
Logs of this level record normal running status information about the system and events.
DEBUG
Logs of this level record the system information and system debugging information.
TRACE
Logs of this level record information about the system and class invoking relationships.
To modify log levels, perform the following operations:
Log in to FusionInsight Manager.
Choose Services > Kafka > Service Configuration.
In the Type drop-down list, select All.
On the menu bar on the left, select the log menu of the target role.
Select a desired log level.
Click Save and click OK. The configurations take effect.
Log Format
Table 3 describes the Kafka log format.
Table 3 Log formatLog Type
Format
Example
Run log
yyyy-MM-dd HH:mm:ss,SSS | Log Level | Thread that generates the log | Message in the log | Full name of the class invoked by the log event (Log print file:Row No.)
2015-08-08 03:09,483 | INFO | [main] | Loading logs. | kafka.log.LogManager (Logging.scala:68)
yyyy-MM-dd HH:mm:ss HostName Component name logLevel Message
2015-08-08 03:09 10-165-0-83 Kafka INFO Running kafka-start.sh.
Client Tools
During fault locating, a client is used to connect to Kafka for testing and verification.
zookeeper_IP indicates the service IP address of any ZooKeeper node instance.
kafka_IP indicates the service IP address of any Kafka node instance.
Prerequisites
The Kafka client has been installed.
A user with the kafkaadmin permission is available.
Run the source bigdata_env command and kinit the user in step (2).
If port 21009 is used, set ssl.mode.enable to true and security.inter.broker.protocol to SASL_SSL.
List: list of topics
Displays all topics in the cluster. The IP address of ZooKeeper is mandatory.
kafka-topics.sh --list --zookeeper 192.168.234.231:24002/kafka
Describe: details of topics
Displays topic partition and replica details. The IP address of ZooKeeper is mandatory.
kafka-topics.sh --describe --zookeeper 192.168.234.231:24002/kafka
Displays the partition and copy details of a topic in the cluster. The ZooKeeper address is a mandatory parameter, and topicName is the name of the topic to be queried.
kafka-topics.sh --describe --topic topicName --zookeeper zookeeper_IP:24002/kafka
Producer: producer tool
New-version producer tool
Sends a message to a specified topic (for example, test) through the console. --broker-list is mandatory. PORT indicates the port number. In security mode, the port number is 21007 or 21009. In non-security mode, the port number is 21005. configpath is the path of the configuration file, for example, config/producer.properties in the client directory.
If port 21009 is used, the content and commands of the producer.properties file are as follows:
security.protocol = SASL_SSLkerberos.domain.name = hadoop.hadoop.combootstrap.servers = Kafka_IP:21009sasl.kerberos.service.name = kafka
kafka-console-producer.sh --topic test --broker-list Kafka_IP:21009 --producer.config configPath/producer.properties
If port 21005 is used, the content and commands of the producer.properties file are as follows:
security.protocol = PLAINTEXTbootstrap.servers = Kafka_IP:21005
kafka-console-producer.sh --topic test --broker-list Kafka_IP:21005 --producer.config configPath/producer.properties
If port 21007 is used, the content and commands of the producer.properties file are as follows:
security.protocol = SASL_PLAINTEXTkerberos.domain.name = hadoop.hadoop.combootstrap.servers = Kafka_IP:21007sasl.kerberos.service.name = kafka
kafka-console-producer.sh --topic test --broker-list Kafka_IP:21007 --producer.config configPath/producer.properties
Earlier-version producer tool
Sends a message to a specified topic (for example, test) through the console. --broker-list is mandatory. If --old-producer is added, the producer of the old version is used. (Not recommended.)
kafka-console-producer.sh --topic test --broker-list Kafka_IP:21005 --old-producer
Consumer: consumer tool
New-version consumer tool
Uses the console to send messages to a specified topic (for example, test in this example). The --bootstrap-server parameter is mandatory. PORT indicates the port number. In security mode, the port number is 21007 or 21009. In non-security mode, the port number is 21005. configpath indicates the path of the configuration files, for example, config/consumer.properties in the client directory.
If port 21009 is used, the content and commands of the consumer.properties file are as follows:
security.protocol = SASL_SSLkerberos.domain.name = hadoop.hadoop.comgroup.id = example-group1auto.commit.interval.ms = 60000sasl.kerberos.service.name = kafka
kafka-console-consumer.sh --topic test --bootstrap-server Kafka_IP:21009 --consumer.config configPath/consumer.properties --new-consumer
If port 21005 is used, the content and commands of the consumer.properties file are as follows:
security.protocol = PLAINTEXTgroup.id = example-group1auto.commit.interval.ms = 60000
kafka-console-consumer.sh --topic test --bootstrap-server Kafka_IP:21005 --consumer.config configPath/consumer.properties --new-consumer
If port 21007 is used, the content and commands of the consumer.properties file are as follows:
security.protocol = SASL_PLAINTEXTkerberos.domain.name = hadoop.hadoop.comgroup.id = example-group1auto.commit.interval.ms = 60000sasl.kerberos.service.name = kafka
kafka-console-consumer.sh --topic test --bootstrap-server Kafka_IP:21007 --consumer.config configPath/consumer.properties --new-consumer
Earlier-version consumer tool
Uses the console to send messages to a specified topic (for example, test in this example). The ZooKeeper address and topic name are mandatory. --from-beginning indicates that the consumption starts from the beginning. (Not recommended.)
kafka-console-consumer.sh --topic test --zookeeper Zookeeper_IP:24002/kafka --from-beginning
Alter: modification of topic-related parameters
Modifies the configuration parameters or number of partitions of a specified topic.
Modifies the topic configuration. name and value indicate the configuration name and value of the topic, respectively.
kafka-topics.sh --alter --topic test --zookeeper Zookeeper_IP:24002/kafka --config <name=value>
Increases the number of partitions of the topic. The value of num must be greater than the number of partitions of the original topic.
kafka-topics.sh --alter --partitions num --topic test --zookeeper Zookeeper_IP:24002/kafka
Pressure test tools used by the producer
The pressure test tool is used to test the read and write performance of the Kafka cluster. It can be used to test the disk read and write performance, network bandwidth, and other read and write problems. The producer commands are as follows:
--record-size: indicates the size of a single data record.
--num-records: indicates the number of sent data records.
--producer.config: indicates the path of configuration files.
--throughput: indicates the data traffic rate. The value -1 indicates that the rate is not limited.
kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000 --producer.config ../config/producer.properties --throughput -1
Pressure test tools used by the consumer
kafka-consumer-perf-test.sh --group groupId --topic test --messages 100000000 --broker-list Kafka Service IP address:port --new-consumer --consumer.config ../config/consumer.properties --print-metrics --show-detailed-stats
The port number in the command must be the same as the security.protocol value in the consumer.properties file.
The following information is displayed after the command is executed: start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
start.time indicates the start time; end.time indicates the end time; data.consumed.in.MB indicates the data size (MB) from the start time to the end time, and MB.sec indicates the average consumption time per second (MB) from the start time to the end time; data.consumed.in.nMsg indicates the number of data records consumed from the start time to the end time; nMsg.sec indicates the average number of data records consumed per second from the start time to the end time.
consumer group: indicates the consumer group information.
The kafka-consumer-groups.sh script is used to view the consumption details of a consumer group. The consumer group information is saved in the default mode of Kafka. If the customized mode is used, the tool cannot be used to view the consumption details.
To view the consumer group list using the earlier-version consumer group information, run the following command:
kafka-consumer-groups.sh --list --zookeeper Zookeeper_IP:24002/kafka
To query the consumption details of an earlier-version consumer group information, run the following command. group_id indicates the name of the consumer group to be queried.
kafka-consumer-groups.sh --describe --zookeeper Zookeeper_IP:24002/kafka --group group_id
To view the consumer group list using the new-version consumer group information, run the following command. port indicates the port number, which can be 21007, 21009, or 21005. configpath indicates the path of the configuration files, for example, config/consumer.properties in the client directory. The content in consumer.properties is the same as that in the consumer tool.
kafka-consumer-groups.sh --list --bootstrap-server Kafka_IP:port --command-config configpath/consumer.properties
To view the consumer group details as a new-version consumer, run the following command. port indicates the port number, which can be 21007, 21009, or 21005. configpath indicates the path of the configuration files, for example, config/consumer.properties in the client directory. The content in consumer.properties is the same as that in the consumer tool.
kafka-consumer-groups.sh --describe --bootstrap-server Kafka_IP:port --group group_id --command-config configpath/consumer.properties
Leader replica rebalancing
kafka-preferred-replica-election.sh --zookeeper Zookeeper_IP:24002/kafka
After the command is executed, all the preferred replicas become the leaders.
Delete Topic
Deletes the specified topic from the cluster. The IP address of ZooKeeper is mandatory.
kafka-topics.sh --delete --topic test --zookeeper Zookeeper_IP:24002/kafka
Client Configuration
Applicable Version
6.5.x
Consumer configuration
Parameter | Description | Impact Factor | Default Value |
---|---|---|---|
heartbeat.interval.ms | Consumption timeout interval | Timeout interval between the consumer and Kafka. The value cannot exceed session.timeout.ms. Generally, the value is one third of session.timeout.ms. | 3000 |
max.partition.fetch.bytes | Number of data records that can be read when each consumer initiates a fetch request | If this parameter is set to a large value, the consumer caches more data locally, which may affect memory usage. | 1048576 |
fetch.max.bytes | Maximum size of data that can be returned by the server to the consumer | The value can be greater than max.partition.fetch.bytes. Generally, the default value is used. | 52428800 |
session.timeout.ms | Heartbeat timeout interval between the consumer and broker when the consumer group is used to manage the offset | If the data consumption frequency of the consumer is very low, you are advised to increase the value of this parameter. | 10000 |
auto.offset.reset | Consumption policy selected when the offset of data consumption cannot be found during consumption | earliest: Consumption starts from the beginning of the data. Duplicate data may be consumed. latest: Consumption starts from the end of the data, which may cause data loss. | earlist |
max.poll.interval.ms | Maximum delay between data pulling when the consumer performs a poll() operation | If poll() is not called again before the timeout period expires, the consumer is considered as failed, and the group triggers rebalance to reallocate the partition to other members. If too much complex and time-consuming logic needs to be added between two polls, the time needs to be prolonged. | 300s |
max.poll.records | Maximum data volume that can be obtained by a consumer during a poll() operation | If this parameter is set to a larger value, the amount of data to be pulled at a time increases. Ensure that the data pulling time is within the range specified by max.poll.interval.ms. | 500 |
Producer configuration
Parameter | Description | Impact Factor | Default Value |
---|---|---|---|
buffer.memory | Size of the memory used by the producer to buffer data in asynchronous mode | The value must be less than or equal to the memory size used by the producer. | 33554432 (byte) |
batch.size | Size of cached data requested from the same partition based on the batch policy (that is, after a batch of data is accumulated, the data is sent to Kafka) | When sending message records to a cluster, the Kafka producer attempts to compress a batch of message records to be sent to the same partition. In each request, multiple batches are sent. Each batch may contain multiple records. In this way, messages are sent in batches, reducing network requests and improving the performance of the producer client and Kafka cluster service. If batch.size is set to a large value, the memory may be wasted because the batch.size memory needs to be allocated in advance for each record sent to different partitions. | 16384 |
Liner.ms | Size of cached data requested from the same partition based on the time policy (that is, after data is accumulated for a period of time, the data is sent to Kafka) | Increasing the value of this parameter can increase the memory utilization of the client and server. However, the time is uncontrollable. Therefore, you are advised not to use this parameter. | 0 |
max.in.flight.requests.per.connection | Number of unresponded requests that can be sent by the client on a single connection | If this parameter is set to 1, the client cannot send requests to the same broker before the Kafka broker responds to the requests. This affects sending performance but ensures that the sent data is in order. | 5 |
metadata.max.age.ms | Maximum metadata lifetime. The producer client forcibly updates the metadata at an interval specified by metadata.max.age.ms, even if no partition leadership proactively discovers a new broker or partition. | If the time is set to an incorrect value, the partition leadership will be updated frequently. | 300000 |
reconnect.backoff.ms | Interval between retry attempts | This prevents the producer client from repeatedly reconnecting to the Kafka broker. The value applies to all connections between clients and brokers. If the value is too large, the connection time is too long, affecting the sending efficiency. | reconnect.backoff.ms |