Got it

Kafka: Basic Kafka Operations Example Highlighted

Latest reply: Nov 24, 2021 04:43:40 5071 13 12 0 0

1.1.1 Case 1: Basic Kafka Operations - Producer and Consumer Information

Scenarios

Kafka is a distributed message system, in which messages can be publicized or subscribed. A producer is to be developed to send a message to a topic of a Kafka cluster every second, and a consumer is to be implemented to ensure that the topic is subscribed and that messages of the topic are consumed in real time.

1.1.1.1 Scenario Description

1.1.1.2 Development Idea

1.        Use a Linux client to create a topic.

2.        Develop a producer to produce data for the topic.

3.        Develop a consumer to consume the data of the topic.

1.1.1.3 Example Code Description

1.1.1.3.1 Old Producer API Usage Example

Function Description

Playing as a message producer role, the producer publicize messages on Kafka Broker.

The following code snippet belongs to the run method in the com.huawei.bigdata.kafka.example.Producer class, and these code snippets are used to send one message to a specific Topic per second. (Note: The old producer API can only be used to access topics for which no ACL is set when the value of allow.everyone.if.no.acl.found is set to true on the server. For details, see Service Operation Guide > Kafka> Safety Instruction on Using Kafka.)

Code Example

/** 

 * Start producer to send a message per second. 

  */ 

 public void run() 

 { 

 LOG.info("Producer: start."); 

 int messageNo = 1; 

 while (true) 

 { 

     String messageStr = new String("Message_" + messageNo);

// Specify the message sequence number as the key value.

String key = String.valueOf(messageNo);

producer.send(new KeyedMessage<String, String>(topic, key, messageStr));

     LOG.info("Producer: send " + messageStr + " to " + topic); 

     messageNo++; 

     // Send a message per second.

     try 

     { 

       Thread.sleep(1000); 

     } 

       catch (InterruptedException e) 

     { 

       e.printStackTrace(); 

     } 

 } 

 }

1.1.1.3.2 Old Consumer API Usage Example

Function Description

Each consumer instance belongs to a consumer group, and one message is consumed by one consumer instance in a same consumer group. Multiple consumer groups can consume a same message at the same time.

The following code snippet belongs to the run method in the com.huawei.bigdata.kafka.example.Consumer class, and these code snippets are used to subscribe to messages of a specific Topic. (Note: The old consumer API can only be used to access topics for which no ACL is set when the value of allow.everyone.if.no.acl.found is set to true on the server. For details, see Service Operation Guide > Kafka> Safety Instruction on Using Kafka.)

Code Example

/**  *Run Consumer to subscribe to specified topic messages on Kafka.  */

 public void run() 

 { 

 LOG.info("Consumer: start."); 

 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 

 topicCountMap.put(topic, new Integer(1)); 

 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 

 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

 LOG.info("Consumerstreams size is : " + streams.size()); 

 for (KafkaStream<byte[], byte[]> stream : streams) 

 { 

     ConsumerIterator<byte[], byte[]> it = stream.iterator(); 

     while (it.hasNext()) 

     { 

       LOG.info("Consumer: receive " + new String(it.next().message()) + " from " + topic); 

     } 

 } 

 LOG.info("Consumer End."); 

 }

1.1.1.3.3 Multi-thread Producer Example

Function Description

The multi-thread producer function is implemented based on the code example described in section 1.6.2.3.1 Old Producer API Usage Example. Multiple producer threads can be started. Each thread sends messages to the partition whose key is the same as the thread ID.

The following code snippet belongs to the run method in the com.huawei.bigdata.kafka.example.ProducerMultThread class, and these code snippets are used to enable multiple threads to produce data.

Code Example

/**

 * Specify the current ThreadID as the key value and send data.

 */

public void run()

{

LOG.info("Producer: start.");

    // Record the number of messages.

int messageCount = 1;

    // Specify the number of messages sent by each thread.

int messagesPerThread = 5;

while (messageCount <= messagesPerThread)

    {

        // Specify the content of messages to be sent.

        String messageStr = new String("Message_" + sendThreadId + "_" + messageCount);

        // Specify a key value for each thread to enable the thread to send messages to only a specified partition.

      String key = String.valueOf(sendThreadId);

        // Send messages.

        producer.send(new KeyedMessage<String, String>(sendTopic, key, messageStr));

LOG.info("Producer: send " + messageStr + " to " + sendTopic + " with key: " + key);

        messageCount++;

     }

}

1.1.1.3.4 Multi-thread Consumer Example

Function Description

The multi-thread consumer function is implemented based on the code example described in section Old Consumer API Usage Example"Old Consumer API Usage Example." The number of consumer threads that can be started to consume the messages in partitions is the same as the number of partitions in the topic.

The following code snippet belongs to the run method in the com.huawei.bigdata.kafka.example.ConsumerMultThread class, and these code snippets are used to implement concurrent consumption of messages in a specified topic.

Code Example

/**

 * Start the concurrent multi-thread consumer.

  */

public void run()

{

LOG.info("Consumer: start.");

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

    // Integer indicates the number of threads that are started to consume a specified topic.

    // Note: When the value of Integer is greater than the number of partitions in the topic to be consumed, some threads cannot consume data.

    topicCountMap.put(topic, new Integer(CONCURRENCY_THREAD_NUM));

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

LOG.info("Consumerstreams size is : " + streams.size());

    // Specify thread numbers used to identify different threads.

int threadNum = 0;

for (KafkaStream<byte[], byte[]> stream : streams)

    {

        StreamThread streamThread = new StreamThread(stream, threadNum, topic);

        streamThread.start();

        threadNum++;

    }

LOG.info("Consumer End.");

}

1.1.1.3.5 New Consumer API Usage Example

Function Description

Kafka 0.9 and later versions support the security feature. In security mode, the new consumer API needs to be used to access Kafka clusters, because the old consumer API cannot access topics with ACL configured. In addition, the new consumer API can access both security and normal mode Kafka clusters. For details, see Service Operation Guide> Kafka > Safety Instruction on Using Kafka.

The following example code belongs to the com.huawei.bigdata.kafka.example.NewConsumerclass. It is used to enable the new consumer API to subscribe secure topics and consume messages.

Code Example

    /**

     * New Consumer construction function.

     * @param topic Subscribed topic name.

     */

public NewConsumer(String topic)

    {

super("NewConsumerExample", false);

        Properties props = new Properties();

        // Security protocol type.

        props.put("security.protocol", KafkaProperties.getInstance().getValues("security.protocol", "SASL_PLAINTEXT"));

        // Service name.

        props.put("sasl.kerberos.service.name", "kafka");

        consumer = new KafkaConsumer<Integer, String>(props);

this.topic = topic;

    }

    /**

     * Function for processing messages of the subscribed topic.

     */

    public void doWork()

    {

        consumer.subscribe(Collections.singletonList(this.topic));

        ConsumerRecords<Integer, String> records = consumer.poll(timeout);

        for (ConsumerRecord<Integer, String> record : records)

        {

LOG.info("[NewConsumerExample], Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());

        }

}

1.1.1.4 Obtaining Example Code

Using the FusionInsight Client

Decompress the FusionInsight client installation package and obtain the kafka-examples file, which is saved in the kafka sub-folder of the FusionInsight_Services_ClientConfigfolder.

Using the Maven Project

Log in to Huawei DevClod (https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home) to download code udner components/kafka/kafka-example-maven-security to the local PC.

1.1.1.5 Application Commissioning

1.1.1.5.1 Commissioning an Application in Windows

Starting NewProducer

                              Step 1     Ensure that the mappings between the host names and service IP addresses of all the hosts in the remote cluster are configured in the local hosts file.

                              Step 2     Run NewProducer.java on Eclipse, as shown in Figure 1-4.

Figure 1-1 Right-clicking NewProducer.javaon Eclipse

20180628164334547001.png

 

                              Step 3     The console window appears. You can find that NewProducer is sending messages to the default topic (example-metric1). One piece of log is printed when every 10 messages are sent.

Figure 1-2 NewProcedure running window

20180628164334608002.jpg

 

----End

Starting SimpleConsumerDemo

                              Step 1     Ensure that the value of Kafka server parameter allow.everyone.if.no.acl.found is true.

                              Step 2     Right-click the Eclipse and choose Run As> Run Configurations, as shown in Figure 1-173.

Figure 1-3 Run Configurations running

20180628164335500003.png

 

                              Step 3     In the configuration box that appears, set the minimum number of messages consumed this time, Topic name, and Partition number, and separate values with spaces, as shown in Figure 1-174.

Figure 1-4 Setting parameters

20180628164336943004.png

 

                              Step 4     Click Run. Then, you can find the messages sent on the producer on the console.

Figure 1-5 SimpleConsumerDemo running window

20180628164337282005.jpg

 

----End

Starting NewConsumer

                              Step 1     Run the NewConsumer.java file.

                              Step 2     Click Run. In the console window that appears, you can find that NewProducer starts after NewConsumer successfully starts, and then you can view messages received in real time.

Figure 1-6 NewConsumer.java running window

20180628164337687006.jpg

 

----End

Starting Other Code Examples

The procedures for starting other code examples are similar to the procedures for starting NewProducer and NewConsumer in this section.

1.1.1.5.2 Commissioning an Application in Linux

Scenario

Run an example program in Linux after code development is complete.

Procedure

                              Step 1     Right-click the project and choose Exportfrom the shortcut menu.

20180628164338717007.png

                              Step 2     Select Runnable JAR file and click Next.

20180628164339232008.png

                              Step 3     Select the producer or consumer class from the Launch configuration drop-down list as the default implementation class, and click Browse. In the displayed window, specify the JAR file name and save path, click Save, select Copy required libraries into a sub-folder next to the created JAR, and click Finish.

20180628164340850009.png

                              Step 4     A JAR file and a lib folder are created in the specified save path.

20180628164340271010.png

                              Step 5     Copy the folder (example_lib in this example) to any directory on the Linux environment, for example, /opt/example, and copy the created JAR file to /opt/example/example_lib.

                              Step 6     Copy all files in the src/main/resourcesdirectory of the Eclipse project to the src/main/resources directory at the same level as the lib folder, that is, /opt/example/src/main/resources.

                              Step 7     Ensure that the current user has read permission of all the files in the src/main/resources and libfolders in /opt/example, jdk has been installed, and java environment variables are set. Then, run the command, for example, java -cp.:/opt/example/example_lib/*:/opt/example/src/main/resources com.huawei.bigdata.kafka.example.NewProducer to run the example project.

----End

1.1.1.5.3 Kafka Stream Example Running Guide

1.6.2.5.3.1 High level Streams API Example Usage Guide

1.        Open example code WordCountDemo.java in the IDE and change the following two variables to the machine-machine account name and keytab file that you apply for.

// Use the machine-machine account keytab file that you apply for. 
private static final String USER_KEYTAB_FILE = "Change it to the real-world keytab file name."; 
// Use the machine-machine account name that you apply for. 
private static final String USER_PRINCIPAL = "Change it to the real-world username.";

2.        Use the Linux client to create the input and output topics. Ensure that the topic names are the same as those in the example code, set the policy for clearing the output topic to compact, and run the example code.

// source-topic name created by the user, that is, input topic 
private static final String INPUT_TOPIC_NAME = "streams-wordcount-input"; 
// sink-topic name created by the user, that is, output topic 
private static final String OUTPUT_TOPIC_NAME = "streams-wordcount-output";

          Create the input topic:

kafka-topics.sh --create --zookeeper 192.168.0.11:24002,192.168.0.12:24002, 192.168.0.13:24002/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-input

          Create the output topic:

kafka-topics.sh --create --zookeeper 192.168.0.11:24002,192.168.0.12:24002, 192.168.0.13:24002/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact

          Run example code WordCountDemo.java. For details, see Commissioning an Application in Windows and Commissioning an Application in Linux.

3.        Use the Linux client to write messages and view the statistics result.

Run the kafka-console-producer.shcommand to write messages to the input topic.

# kafka-console-producer.sh --broker-list 192.168.0.11:21007,192.168.0.12:21007,192.168.0.13:21007 --topic streams-wordcount-input --producer.config /opt/client/Kafka/kafka/config/producer.properties 
>This is Kafka Streams test 
>test starting 
>now Kafka Streams is running 
>test end 
>

Run the kafka-console-consumer.shcommand to consume data from the output topic and view the statistics result.

# kafka-console-consumer.sh --topic streams-wordcount-output --bootstrap-server 192.168.0.11:21007,192.168.0.12:21007,192.168.0.13:21007 --new-consumer --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --formatter kafka.tools.DefaultMessageFormatter 
this    1 
is      6 
kafka 12 
streams 8 
test    8 
test    9 
starting 1 
now     1 
kafka 13 
streams 9 
is      7 
running 1 
test    10 
end     1

1.6.2.5.3.2 Low level Streams API Example Usage Guide

1.        Open example code WordCountProcessorDemo.javain the IDE and change the following two variables to the machine-machine account name and keytab file that you apply for.

// Use the machine-machine account keytab file that you apply for. 
private static final String USER_KEYTAB_FILE = "Change it to the real-world keytab file name";  
// Use the machine-machine account name that you apply for. 
private static final String USER_PRINCIPAL = "Change it to the real-world username. ";

2.        Use the Linux client to create the input and output topics. Ensure that the topic names are the same as those in the example code, set the policy for clearing the output topic to compact, and run the example code.

// source-topic name created by the user, that is, input topic  
private static final String INPUT_TOPIC_NAME = "streams-wordcount-processor-input"; 
 
// sink-topic name created by the user, that is, output topic  
private static final String OUTPUT_TOPIC_NAME = "streams-wordcount-processor-output";

          Create the input topic:

kafka-topics.sh --create --zookeeper 192.168.0.11:24002,192.168.0.12:24002, 192.168.0.13:24002/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-processor-input

          Create the output topic:

kafka-topics.sh --create --zookeeper 192.168.0.11:24002,192.168.0.12:24002, 192.168.0.13:24002/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-processor-output--config cleanup.policy=compact

          Run example code WordCountProcessorDemo.java. For details, see Commissioning an Application in Windows and Commissioning an Application in Linux.

3.        Use the Linux client to write messages and view the statistics result.

Run the kafka-console-producer.shcommand to write messages to the input topic.

# kafka-console-producer.sh --broker-list 192.168.0.11:21007,192.168.0.12:21007,192.168.0.13:21007 --topic streams-wordcount-processor-input --producer.config /opt/client/Kafka/kafka/config/producer.properties 
>This is Kafka Streams test 
>now Kafka Streams is running 
>test end 
>

Run the kafka-console-consumer.shcommand to consume data from the output topic and view the statistics result.

# kafka-console-consumer.sh --topic streams-wordcount-processor-output --bootstrap-server 192.168.0.11:21007,192.168.0.12:21007,192.168.0.13:21007 --new-consumer --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true 
is      1 
kafka 1 
streams 1 
test    1 
this    1 
end     1 
is      2 
kafka 2 
now     1 
running 1 
streams 2 
test    2 
this    1

1.1.2 Case 2: Basic Kafka Streams Operation Example

1.1.2.1 Scenarios

Scenario Description

Kafka Streams is a lightweight stream processing framework provided by Apache Kafka. The input and output of Kafka Streams are stored in the Kafka cluster.

The following describes the most common WordCount examples.

1.1.2.2 Development Idea

1.        Create two topics on the Linux client to serve as the input and output topics.

2.        Develop a Kafka Streams to implement the word count function. The system collects statistics on the number of words in each message by reading the message in the input topic, consumes data from the output topic, and provides the statistical result in the form of a key-value pair.

1.1.2.3 Example Code Description

1.1.2.3.1 High Level KafkaStreams API Usage Sample

Function Description

The following code snippets are used in the main method of the com.huawei.bigdata.kafka.example.WordCountDemo class to implement the following function:

Collects statistics on input records. Same words are divided into a group, which is used as a key value. The occurrence times of each word are calculated as a value and are output in the form of a key-value pair.

Example Code

 // Build a processor topology.

 KStreamBuilder builder = new KStreamBuilder();

 // Receive input records from the input topic

 KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME);

 // Processing

 KTable<String, Long> counts = source

      // Process received records and divide the records based on regular expression REGEX_STRING.

      .flatMapValues(new ValueMapper<String, Iterable<String>>() {

         @Override

       public Iterable<String> apply(String value) {

           return Arrays.asList(value.toLowerCase(Locale.getDefault()                  .split(REGEX_STRING));

         }

      })

      // Aggregate the calculation results of the key-value pairs. 

      .groupBy(new KeyValueMapper<String, String, String>() {

         @Override

         public String apply(String key, String value) {

           return value;

         }

      })

      // Perform the counting and store the result.

      .count(KEY_VALUE_STATE_STORE_NAME);

 // Output the key-value pair of the calculation result from the output topic.

 counts.toStream().to(Serdes.String(), Serdes.Long(), OUTPUT_TOPIC_NAME);

1.1.2.3.2 Low Level KafkaStreams API Usage Sample

Function Description

The following code snippets are used in thecom.huawei.bigdata.kafka.example.WordCountProcessorDemo class to implement the following function:

Collects statistics on input records. Same words are divided into a group, which is used as a key value. The occurrence times of each word are calculated as a value and are output in the form of a key-value pair.

Example Code

private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {

  @Override

  public Processor<String, String> get() {

    return new Processor<String, String>() {

      // ProcessorContext instance. It provides the access to the metadata of the records being processed.

      private ProcessorContext context;

      private KeyValueStore<String, Integer> kvStore;

  @Override

      @SuppressWarnings("unchecked")

      public void init(ProcessorContext context) {

        this.context = context;

        // Execute punctuate() once every 1000 ms.

        this.context.schedule(1000);

        // Search for key-value status store KEY_VALUE_STATE_STORE_NAME, which can be used to memorize the latest received input records.

        this.kvStore = (KeyValueStore<String, Integer>)context.getStateStore(KEY_VALUE_STATE_STORE_NAME);

      }

      // Process the records received by the input topic, split the records into words, and count the words.

      @Override

      public void process(String dummy, String line) {

        String[] words = line.toLowerCase(Locale.getDefault()).split(REGEX_STRING);

        for (String word : words) {

          Integer oldValue = this.kvStore.get(word);

          if (oldValue == null) {

            this.kvStore.put(word, 1);

          } else {

            this.kvStore.put(word, oldValue + 1);

          }

        }

      }

      @Override

      public void punctuate(long timestamp) {

        try (KeyValueIterator<String, Integer> iter = this.kvStore.all()) {

          System.out.println("----------- " + timestamp + " ----------- ");

          while (iter.hasNext()) {

            KeyValue<String, Integer> entry = iter.next();

            System.out.println("[" + entry.key + ", " + entry.value + "]");

            // Send the new record to the downstream processor as the key-value pair. 

            context.forward(entry.key, entry.value.toString());

          }

          iter.close();

        }

        // Submit the current processing status.

        context.commit();

     }

     @Override

     public void close() {

     }

 };

 }

}

1.1.2.4 Obtaining Example Codes

Using the FusionInsight Client

Decompress the FusionInsight client installation package and obtain the kafka-examples file, which is saved in the kafka sub-folder of the FusionInsight_Services_ClientConfigfolder.

Maven Project Mode

Download the code from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home

Security mode: components/kafka/kafka-example-maven-security

1.1.2.5 Application Commissioning

1.1.2.5.1 Commissioning Applications on the Windows OS

Running NewProducer

                              Step 1     Ensure that the mappings between the host names and service IP addresses of all the hosts in the remote cluster are configured in the local hosts file.

                              Step 2     Use Eclipse to run Producer.java, as shown in Figure 1-10.

Figure 1-7 Right-clicking NewProducer.java and choosing Run As > Java Application

20180628164334547001.png

 

                              Step 3     Click Run. In the displayed console window, you can see that NewProducer is sending messages to the default topic (example-metric1). Each time 10 messages are sent, a log is recorded.

Figure 1-8 NewProducer running window

20180628164341459011.jpg

 

----End

Running SimpleConsumerDemo

                              Step 1     Ensure that the allow.everyone.if.no.acl.foundparameter is set to true on the Kafka server.

                              Step 2     Right-click the Eclipse and choose Run As> Run Configurations, as shown in Figure 1-12.

Figure 1-9 Choosing Run As > Run Configurations

20180628164342046012.png

 

                              Step 3     In the displayed configuration dialog box, set the minimum number of messages consumed this time, Topic name, and Partition number, and separate values with spaces, as shown in Figure 1-13.

Figure 1-10 Setting parameters

20180628164343181013.png

 

                              Step 4     Click Run. Then you can see the messages sent by the producer on the console.

Figure 1-11 SimpleConsumerDemo running window

20180628164343034014.jpg

 

----End

Running NewConsumer

                              Step 1     Run NewConsumer.java.

                              Step 2     Click Run. In the displayed console window, you can see that NewProducer starts after NewConsumer has started successfully, to receive messages in real time.

Figure 1-12 NewConsumer.java running window

20180628164344674015.jpg

 

----End

Running Other Example Codes

The procedures for running other example codes are similar to those for running NewProducer and NewConsumer in this section.

1.1.2.5.2 Commissioning Applications on the Linux OS

Scenario

Run an example program on the Linux OS after code development is complete.

Procedure

                              Step 1     Right-click the project, and select Export.

20180628164345937016.png

                              Step 2     Select Runnable JAR file and click Next.

20180628164346099017.png

                              Step 3     Select the producer or consumer class from the Launch configuration drop-down list as the default implementation class, and click Browse. In the displayed window, specify the JAR file name and save path, click Save, select Copy required libraries into a sub-folder next to the generated JAR, and click Finish.

20180628164346094018.png

A JAR file and a lib folder are generated in the specified save path.

20180628164340271010.png

                              Step 4     Copy the generated dependency library folder (example_lib) to any directory on the Linux OS, for example, /opt/example. Then copy the generated JAR file to the /opt/example/example_libdirectory.

                              Step 5     Copy all files in the src/main/resourcesdirectory of the Eclipse project to the src/main/resources directory at the same level as the dependency library folder, that is, /opt/example/src/main/resources.

                              Step 6     Go to the /opt/example directory. Ensure that the current user can read all files in the src/main/resourcesdirectory and the dependent library file directory. In addition, ensure that JDK has been installed and Java environment variables have been set. Then run java -cp .:/opt/example/example_lib/*:/opt/example/src/main/resources com.huawei.bigdata.kafka.example.NewProducer to execute the example project.

----End

1.1.2.5.3 Kafka Streams Example Running Guide

1.6.3.5.3.1 High Level Streams API Example Usage Guide

1.        Open example code WordCountDemo.java in the IDE and change the following two variables to the machine-machine account name and keytab file that you apply for.

// keytab file name of the machine-machine account that a user applies for  
private static final String USER_KEYTAB_FILE = "Set to the actual keytab file name";  
// Machine-machine account name that a user applies for  
private static final String USER_PRINCIPAL = "Set to the actual username";

2.        Use the Linux client to create the input and output topics. Ensure that the topic names are the same as those in the example code, set the policy for clearing the output topic to compact, and run the example code.

// Source-topic name created by a user, that is, input topic  
private static final String INPUT_TOPIC_NAME = "streams-wordcount-input";  
  
// Sink-topic name created by a user, that is, output topic  
private static final String OUTPUT_TOPIC_NAME = "streams-wordcount-output";

          Create the input topic:

kafka-topics.sh --create --zookeeper 192.168.0.11:24002,192.168.0.12:24002, 192.168.0.13:24002/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-input

          Create the output topic:

kafka-topics.sh --create --zookeeper 192.168.0.11:24002,192.168.0.12:24002, 192.168.0.13:24002/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-output --config cleanup.policy=compact

          Run example code WordCountDemo.java. You can commission programs on Windows and Linux OSs according to the running environment.

3.        Use the Linux client to write messages and view the statistics result.

Run kafka-console-producer.sh to write messages to the input topic.

# kafka-console-producer.sh --broker-list 192.168.0.11:21007,192.168.0.12:21007,192.168.0.13:21007 --topic streams-wordcount-input --producer.config /opt/client/Kafka/kafka/config/producer.properties  
>This is Kafka Streams test  
>test starting  
>now Kafka Streams is running  
>test end  
>

Run kafka-console-consumer.sh to consume data from the output topic and view the statistics result.

# kafka-console-consumer.sh --topic streams-wordcount-output --bootstrap-server 192.168.0.11:21007,192.168.0.12:21007,192.168.0.13:21007 --new-consumer --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --formatter kafka.tools.DefaultMessageFormatter  
this    1  
is      6  
kafka 12  
streams 8  
test    8  
test    9  
starting 1  
now     1  
kafka 13  
streams 9  
is      7  
running 1  
test    10  
end     1

1.6.3.5.3.2 Low Level Streams API Example Usage Guide

1.        Open example code WordCountProcessorDemo.javain the IDE and change the following two variables to the machine-machine account name and keytab file that you apply for.

// keytab file name of the machine-machine account that a user applies for  
private static final String USER_KEYTAB_FILE = "Set to the actual keytab file name";  
// Machine-machine account name that a user applies for  
private static final String USER_PRINCIPAL = "Set to the actual username";

2.        Use the Linux client to create the input and output topics. Ensure that the topic names are the same as those in the example code, set the policy for clearing the output topic to compact, and run the example code.

// Source-topic name created by a user, that is, input topic  
private static final String INPUT_TOPIC_NAME = "streams-wordcount-processor-input";  
  
// Sink-topic name created by a user, that is, output topic  
private static final String OUTPUT_TOPIC_NAME = "streams-wordcount-processor-output";

          Create the input topic:

kafka-topics.sh --create --zookeeper 192.168.0.11:24002,192.168.0.12:24002, 192.168.0.13:24002/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-processor-input

          Create the output topic:

kafka-topics.sh --create --zookeeper 192.168.0.11:24002,192.168.0.12:24002, 192.168.0.13:24002/kafka --replication-factor 1 --partitions 1 --topic streams-wordcount-processor-output  --config cleanup.policy=compact

          Run example code WordCountProcessorDemo.java. You can commission programs on Windows and Linux OSs according to the running environment.

3.        Use the Linux client to write messages and view the statistics result.

Run kafka-console-producer.sh to write messages to the input topic.

# kafka-console-producer.sh --broker-list 192.168.0.11:21007,192.168.0.12:21007,192.168.0.13:21007 --topic streams-wordcount-processor-input --producer.config /opt/client/Kafka/kafka/config/producer.properties  
>This is Kafka Streams test  
>now Kafka Streams is running  
>test end  
>

Run kafka-console-consumer.sh to consume data from the output topic and view the statistics result.

# kafka-console-consumer.sh --topic streams-wordcount-processor-output --bootstrap-server 192.168.0.11:21007,192.168.0.12:21007,192.168.0.13:21007 --new-consumer --consumer.config /opt/client/Kafka/kafka/config/consumer.properties --from-beginning --property print.key=true --property print.value=true  
is      1  
kafka 1  
streams 1  
test    1  
this    1  
end     1  
is      2  
kafka 2  
now     1  
running 1  
streams 2  
test  2  
this    1

 


This article contains more resources

You need to log in to download or view. No account? Register

x
  • x
  • convention:

xuqu
Created Jul 19, 2018 03:30:28

great
View more
  • x
  • convention:

AngelBaby
Created Jul 19, 2018 03:36:22

well done
View more
  • x
  • convention:

chz
Created Jul 19, 2018 03:42:15

Hope to help you
View more
  • x
  • convention:

olive.zhao
Admin Created Nov 2, 2021 14:39:26

  • x
  • convention:

SamB
Moderator Created Nov 8, 2021 09:57:51

Good share
View more
  • x
  • convention:

mouh1991
Created Nov 8, 2021 10:36:22

Thank you
View more
  • x
  • convention:

nagu
Created Nov 8, 2021 10:36:25

Thanks for sharing
View more
  • x
  • convention:

faysalji
Moderator Author Created Nov 8, 2021 11:11:00

Operational examples are useful !!!
View more
  • x
  • convention:

huyvan
Created Nov 8, 2021 13:27:53

Good share
View more
  • x
  • convention:

12
Back to list

Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.
Information Protection Guide
Thanks for using Huawei Enterprise Support Community! We will help you learn how we collect, use, store and share your personal information and the rights you have in accordance with Privacy Policy and User Agreement.