Got it

KafKa Basic Information

Latest reply: Nov 24, 2021 04:51:12 458 4 4 0 0

Hello, everyone!

Today I'm going to introduce you FI KafKa. It helps beginners learn about FI faster.


Overview

Kafka is a high throughput, distributed message system based on release and subscription.

The following figure shows the overall Kafka topology.

en-us_image_0228243514.png

A typical Kafka cluster can contain several Producers (can be page views produced in the web front-end, server logs, system CPUs, or memory), Brokers (Kafka supports horizontal expansion. The cluster throughput rate increases as Brokers add.), Consumer Groups, and a ZooKeeper cluster. Kafka uses ZooKeeper to manage cluster configurations, elect a leader, and rebalance when consumer groups change. Producer publishes the message to Broker using the push mode and Consumer subscribes to and consumes the message from Broker using the pull mode.

Log Overview

Log path: The default path of Kafka logs is /var/log/Bigdata/kafka/broker.

Log archive rule: The automatic Kafka log compression function is enabled. By default, when the size of a log file exceeds 30 MB (the log file size threshold is configurable. For details, see "Configuring the Log Level and Log File Size"), the log file is automatically compressed into a log file named in the following rule: <Original log file name>-<yyyy-mm-dd_hh-mm-ss>.[No.].log.zip. A maximum of 20 latest compressed files are reserved by default. The number of compressed files and the compression threshold can be configured.

Kafka Log List

Log Type

Log File Name

Description

Run log

server.log

Server run log file of the Kafka service process

controller.log

Controller run log file of the Kafka service process

kafka-request.log

Request run log file of the Kafka service process

log-cleaner.log

Cleaner run log file of the Kafka service process

state-change.log

State-change run log file of the Kafka service process

kafkaServer-gc.log

GC log file of the Kafka service process

postinstall.log

Work log file after the Kafka installation

prestart.log

Work log file before the Kafka startup

checkService.log

Log file that records whether the Kafka service starts successfully

cleanup.log

Cleanup log file for Kafka downloading

start.log

Startup log file of the KafkaServer process

stop.log

Stop log file of the KafkaServer process

checkavailable.log

Log file that records the health check details of the Kafka service

checkInstanceHealth.log

Log file that records the health check details of the Kafka Broker

Log Level

Table 2 describes the log levels provided by Kafka.

The levels of run logs are OFF, ERROR, WARN, INFO, DEBUG, and TRACE from high priority to low. Run logs of equal or higher levels are recorded. The higher the specified log level, the less the logs recorded.

Table 2 Log level

Level

Description

OFF

Indicates that log output is disabled.

ERROR

Logs of this level record error information about system running.

WARN

Logs of this level record abnormal information about the current event processing.

INFO

Logs of this level record normal running status information about the system and events.

DEBUG

Logs of this level record the system information and system debugging information.

TRACE

Logs of this level record information about the system and class invoking relationships.

To modify log levels, perform the following operations:

  1. Log in to FusionInsight Manager.

  2. Choose Services > Kafka > Service Configuration.

  3. In the Type drop-down list, select All.

  4. On the menu bar on the left, select the log menu of the target role.

  5. Select a desired log level.

  6. Click Save and click OK. The configurations take effect.

Log Format

Table 3 describes the Kafka log format.

Table 3 Log format

Log Type

Format

Example

Run log

yyyy-MM-dd HH:mm:ss,SSS | Log Level | Thread that generates the log | Message in the log | Full name of the class invoked by the log event (Log print file:Row No.)

2015-08-08 03:09,483 | INFO | [main] | Loading logs. | kafka.log.LogManager (Logging.scala:68)

yyyy-MM-dd HH:mm:ss HostName Component name logLevel Message

2015-08-08 03:09 10-165-0-83 Kafka INFO Running kafka-start.sh.

Client Tools

During fault locating, a client is used to connect to Kafka for testing and verification.

notice_3.0-en-us.png

  • zookeeper_IP indicates the service IP address of any ZooKeeper node instance.

  • kafka_IP indicates the service IP address of any Kafka node instance.

  • Prerequisites

  1. The Kafka client has been installed.

  2. A user with the kafkaadmin permission is available.

  3. Run the source bigdata_env command and kinit the user in step (2).

  4. If port 21009 is used, set ssl.mode.enable to true and security.inter.broker.protocol to SASL_SSL.

  • List: list of topics

    Displays all topics in the cluster. The IP address of ZooKeeper is mandatory.

    kafka-topics.sh --list --zookeeper 192.168.234.231:24002/kafka

  • Describe: details of topics

    Displays topic partition and replica details. The IP address of ZooKeeper is mandatory.

    kafka-topics.sh --describe --zookeeper 192.168.234.231:24002/kafka

    Displays the partition and copy details of a topic in the cluster. The ZooKeeper address is a mandatory parameter, and topicName is the name of the topic to be queried.

    kafka-topics.sh --describe --topic topicName --zookeeper zookeeper_IP:24002/kafka

  • Producer: producer tool

  1. New-version producer tool

    Sends a message to a specified topic (for example, test) through the console. --broker-list is mandatory. PORT indicates the port number. In security mode, the port number is 21007 or 21009. In non-security mode, the port number is 21005. configpath is the path of the configuration file, for example, config/producer.properties in the client directory.

    If port 21009 is used, the content and commands of the producer.properties file are as follows:

    security.protocol = SASL_SSLkerberos.domain.name = hadoop.hadoop.combootstrap.servers = Kafka_IP:21009sasl.kerberos.service.name = kafka

    kafka-console-producer.sh --topic test --broker-list Kafka_IP:21009 --producer.config configPath/producer.properties

    If port 21005 is used, the content and commands of the producer.properties file are as follows:

    security.protocol = PLAINTEXTbootstrap.servers = Kafka_IP:21005

    kafka-console-producer.sh --topic test --broker-list Kafka_IP:21005 --producer.config configPath/producer.properties

    If port 21007 is used, the content and commands of the producer.properties file are as follows:

    security.protocol = SASL_PLAINTEXTkerberos.domain.name = hadoop.hadoop.combootstrap.servers = Kafka_IP:21007sasl.kerberos.service.name = kafka

    kafka-console-producer.sh --topic test --broker-list Kafka_IP:21007 --producer.config configPath/producer.properties

  1. Earlier-version producer tool

    Sends a message to a specified topic (for example, test) through the console. --broker-list is mandatory. If --old-producer is added, the producer of the old version is used. (Not recommended.)

    kafka-console-producer.sh --topic test --broker-list Kafka_IP:21005 --old-producer

Consumer: consumer tool

  1. New-version consumer tool

    Uses the console to send messages to a specified topic (for example, test in this example). The --bootstrap-server parameter is mandatory. PORT indicates the port number. In security mode, the port number is 21007 or 21009. In non-security mode, the port number is 21005. configpath indicates the path of the configuration files, for example, config/consumer.properties in the client directory.

    If port 21009 is used, the content and commands of the consumer.properties file are as follows:

    security.protocol = SASL_SSLkerberos.domain.name = hadoop.hadoop.comgroup.id = example-group1auto.commit.interval.ms = 60000sasl.kerberos.service.name = kafka

    kafka-console-consumer.sh --topic test --bootstrap-server Kafka_IP:21009 --consumer.config   configPath/consumer.properties --new-consumer

    If port 21005 is used, the content and commands of the consumer.properties file are as follows:

    security.protocol = PLAINTEXTgroup.id = example-group1auto.commit.interval.ms = 60000

    kafka-console-consumer.sh --topic test --bootstrap-server Kafka_IP:21005 --consumer.config configPath/consumer.properties --new-consumer

    If port 21007 is used, the content and commands of the consumer.properties file are as follows:

    security.protocol = SASL_PLAINTEXTkerberos.domain.name = hadoop.hadoop.comgroup.id = example-group1auto.commit.interval.ms = 60000sasl.kerberos.service.name = kafka

    kafka-console-consumer.sh --topic test --bootstrap-server Kafka_IP:21007 --consumer.config   configPath/consumer.properties --new-consumer

  1. Earlier-version consumer tool

    Uses the console to send messages to a specified topic (for example, test in this example). The ZooKeeper address and topic name are mandatory. --from-beginning indicates that the consumption starts from the beginning. (Not recommended.)

    kafka-console-consumer.sh --topic test --zookeeper Zookeeper_IP:24002/kafka --from-beginning

  • Alter: modification of topic-related parameters

    Modifies the configuration parameters or number of partitions of a specified topic.

  1. Modifies the topic configuration. name and value indicate the configuration name and value of the topic, respectively.

    kafka-topics.sh --alter --topic test --zookeeper Zookeeper_IP:24002/kafka --config <name=value>

  1. Increases the number of partitions of the topic. The value of num must be greater than the number of partitions of the original topic.

    kafka-topics.sh --alter --partitions num --topic test --zookeeper Zookeeper_IP:24002/kafka

Pressure test tools used by the producer

The pressure test tool is used to test the read and write performance of the Kafka cluster. It can be used to test the disk read and write performance, network bandwidth, and other read and write problems. The producer commands are as follows:

--record-size: indicates the size of a single data record.

--num-records: indicates the number of sent data records.

--producer.config: indicates the path of configuration files.

--throughput: indicates the data traffic rate. The value -1 indicates that the rate is not limited.

kafka-producer-perf-test.sh --topic test --record-size 1024 --num-records 1000 --producer.config ../config/producer.properties --throughput -1

Pressure test tools used by the consumer

kafka-consumer-perf-test.sh --group groupId --topic test --messages 100000000 --broker-list Kafka Service IP address:port  --new-consumer --consumer.config ../config/consumer.properties --print-metrics --show-detailed-stats

The port number in the command must be the same as the security.protocol value in the consumer.properties file.

The following information is displayed after the command is executed: start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec

start.time indicates the start time; end.time indicates the end time; data.consumed.in.MB indicates the data size (MB) from the start time to the end time, and MB.sec indicates the average consumption time per second (MB) from the start time to the end time; data.consumed.in.nMsg indicates the number of data records consumed from the start time to the end time; nMsg.sec indicates the average number of data records consumed per second from the start time to the end time.

consumer group: indicates the consumer group information.

The kafka-consumer-groups.sh script is used to view the consumption details of a consumer group. The consumer group information is saved in the default mode of Kafka. If the customized mode is used, the tool cannot be used to view the consumption details.

  1. To view the consumer group list using the earlier-version consumer group information, run the following command:

    kafka-consumer-groups.sh --list --zookeeper Zookeeper_IP:24002/kafka

    To query the consumption details of an earlier-version consumer group information, run the following command. group_id indicates the name of the consumer group to be queried.

    kafka-consumer-groups.sh --describe --zookeeper Zookeeper_IP:24002/kafka --group group_id

  2. To view the consumer group list using the new-version consumer group information, run the following command. port indicates the port number, which can be 21007, 21009, or 21005. configpath indicates the path of the configuration files, for example, config/consumer.properties in the client directory. The content in consumer.properties is the same as that in the consumer tool.

    kafka-consumer-groups.sh --list --bootstrap-server Kafka_IP:port --command-config configpath/consumer.properties

    To view the consumer group details as a new-version consumer, run the following command. port indicates the port number, which can be 21007, 21009, or 21005. configpath indicates the path of the configuration files, for example, config/consumer.properties in the client directory. The content in consumer.properties is the same as that in the consumer tool.

    kafka-consumer-groups.sh --describe --bootstrap-server Kafka_IP:port --group group_id --command-config configpath/consumer.properties

Leader replica rebalancing

kafka-preferred-replica-election.sh --zookeeper Zookeeper_IP:24002/kafka

After the command is executed, all the preferred replicas become the leaders.

Delete Topic

Deletes the specified topic from the cluster. The IP address of ZooKeeper is mandatory.

  kafka-topics.sh --delete --topic test --zookeeper Zookeeper_IP:24002/kafka

Client Configuration

Applicable Version

6.5.x

Consumer configuration

Parameter

Description

Impact Factor

Default Value

heartbeat.interval.ms

Consumption timeout interval

Timeout interval between the consumer and Kafka. The value cannot exceed session.timeout.ms. Generally, the value is one third of session.timeout.ms.

3000

max.partition.fetch.bytes

Number of data records that can be read when each consumer initiates a fetch request

If this parameter is set to a large value, the consumer caches more data locally, which may affect memory usage.

1048576

fetch.max.bytes

Maximum size of data that can be returned by the server to the consumer

The value can be greater than max.partition.fetch.bytes. Generally, the default value is used.

52428800

session.timeout.ms

Heartbeat timeout interval between the consumer and broker when the consumer group is used to manage the offset

If the data consumption frequency of the consumer is very low, you are advised to increase the value of this parameter.

10000

auto.offset.reset

Consumption policy selected when the offset of data consumption cannot be found during consumption

earliest: Consumption starts from the beginning of the data. Duplicate data may be consumed.

latest: Consumption starts from the end of the data, which may cause data loss.

earlist

max.poll.interval.ms

Maximum delay between data pulling when the consumer performs a poll() operation

If poll() is not called again before the timeout period expires, the consumer is considered as failed,

and the group triggers rebalance to reallocate the partition to other members. If too much complex and time-consuming logic needs to be added between two polls, the time needs to be prolonged.

300s

max.poll.records

Maximum data volume that can be obtained by a consumer during a poll() operation

If this parameter is set to a larger value, the amount of data to be pulled at a time increases. Ensure that the data pulling time is within the range specified by max.poll.interval.ms.

500

Producer configuration

Parameter

Description

Impact Factor

Default Value

buffer.memory

Size of the memory used by the producer to buffer data in asynchronous mode

The value must be less than or equal to the memory size used by the producer.

33554432 (byte)

batch.size

Size of cached data requested from the same partition based on the batch policy (that is, after a batch of data is accumulated, the data is sent to Kafka)

When sending message records to a cluster, the Kafka producer attempts to compress a batch of message records to be sent to the same partition. In each request, multiple batches are sent. Each batch may contain multiple records. In this way, messages are sent in batches, reducing network requests and improving the performance of the producer client and Kafka cluster service. If batch.size is set to a large value, the memory may be wasted because the batch.size memory needs to be allocated in advance for each record sent to different partitions.

16384

Liner.ms

Size of cached data requested from the same partition based on the time policy (that is, after data is accumulated for a period of time, the data is sent to Kafka)

Increasing the value of this parameter can increase the memory utilization of the client and server. However, the time is uncontrollable. Therefore, you are advised not to use this parameter.

0

max.in.flight.requests.per.connection

Number of unresponded requests that can be sent by the client on a single connection

If this parameter is set to 1, the client cannot send requests to the same broker before the Kafka broker responds to the requests. This affects sending performance but ensures that the sent data is in order.

5

metadata.max.age.ms

Maximum metadata lifetime. The producer client forcibly updates the metadata at an interval specified by metadata.max.age.ms, even if no partition leadership proactively discovers a new broker or partition.

If the time is set to an incorrect value, the partition leadership will be updated frequently.

300000

reconnect.backoff.ms

Interval between retry attempts

This prevents the producer client from repeatedly reconnecting to the Kafka broker. The value applies to all connections between clients and brokers. If the value is too large, the connection time is too long, affecting the sending efficiency.

reconnect.backoff.ms


  • x
  • convention:

olive.zhao
Admin Created Nov 2, 2021 14:40:05

  • x
  • convention:

NTan33
Created Nov 5, 2021 01:07:57

Quite the detailed introduction indeed!
View more
  • x
  • convention:

Zebra
Created Nov 8, 2021 13:45:42

Good one
View more
  • x
  • convention:

user_4358465
Created Nov 24, 2021 04:51:12

Very helpful content
View more
  • x
  • convention:

Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.
Information Protection Guide
Thanks for using Huawei Enterprise Support Community! We will help you learn how we collect, use, store and share your personal information and the rights you have in accordance with Privacy Policy and User Agreement.