Got it

Storm:Basic Storm Operations

Latest reply: Jul 19, 2018 06:01:05 651 2 0 0 0

1.1.1 Case 1: Basic Storm Operations

1.1.1.1 Scenarios

Scenario Description

The following describes the service process of a dynamic word counting system. The data source is a logical unit that produces random text continuously.

l   The data source continuously sends random text, such as "apple orange apple", to the text splitting logic.

l   The word splitting logic splits each text entry sent by the data source by space, such as "apple", "orange", "apple", and then sends each word to the word counting logic.

l   The word counting logic increases the number of times that a specific word occurs by one when receiving the word, and prints the real-time results, for example:

           Apple: 1

           Orange: 1

           Apple: 2

1.1.1.2 Development Idea

Function Type

Table 1-24 describes the procedure for a user to develop an application to calculate the number of times that each word appears in random text.

Table 1-1 Functions to be developed

SN

Step

Example

1

Create a Spout to create random text.

For details, see section "Creating a Spout" of FusionInsight HD Application Development Guide.

2

Create a Bolt to split the random text into words.

For details, see section "Creating a Bolt" of FusionInsight HD Application Development Guide.

3

Create a Blot to calculate the number of times that each word appears.

For details, see section "Creating a Bolt" of FusionInsight HD Application Development Guide.

4

Creating a topology.

For details, see section "Creating a Topology" of FusionInsight HD Application Development Guide.

 

For details about certain code, see example code description. For details about complete code, see the Storm-examples example project.

1.1.1.3 Example Code Description

Creating a Spout

l   Function description

A Spout is a message source of Storm and message producer of the topology. Generally, a message source reads data from an external source and sends messages (Tuple) to the topology.

One message source can send multiple message streams. OutputFieldsDeclarer.declarerStream can be used to define multiple streams and then SpoutOutputCollector can be used to emit specific streams.

l   Example code

The following code snippets are used in the nextTuple method in the RandomSentenceSpout class of the com.huawei.storm.example.common package, and are used to split strings into words.

/**  
 * {@inheritDoc}  
 */  
  @Override  
  public void nextTuple()  
  {  
      Utils.sleep(100);  
      String[] sentences =  
          new String[] {"the cow jumped over the moon",     
                        "an apple a day keeps the doctor away",  
                        "four score and seven years ago",     
                        "snow white and the seven dwarfs",     
                        "i am at two with nature"};  
      String sentence = sentences[random.nextInt(sentences.length)];  
      collector.emit(new Values(sentence));  
  }

Creating a Bolt

l   Function description

All message processing logic is encapsulated in Bolts. Bolts provide multiple functions, such as filtering and aggregation.

If there are other topology operators, except for Bolts, OutputFieldsDeclarer.declareStream can be used to define streams, and OutputCollector.emit can be used to select streams to be emitted.

l   Example code

The following code snippets are in the execute method in the SplitSentenceBolt class of the com.huawei.storm.example.common package, and are used to split a statement into words and send the words.

/**  
 * {@inheritDoc}  
 */  
  @Override  
  public void execute(Tuple input, BasicOutputCollector collector)  
  {  
      String sentence = input.getString(0);  
      String[] words = sentence.split(" ");  
      for (String word : words)  
      {  
          word = word.trim();  
          if (!word.isEmpty())  
          {  
              word = word.toLowerCase();  
              collector.emit(new Values(word));  
          }  
      }  
  }

The following code snippets are used in the execute method in the WordCountBolt class of the com.huawei.storm.example.wordcount package, and are used to calculate the number of received words.

    @Override     
      public void execute(Tuple tuple, BasicOutputCollector collector)     
      {     
          String word = tuple.getString(0);     
          Integer count = counts.get(word);     
          if (count == null)     
          {     
              count = 0;     
          }     
          count++;     
          counts.put(word, count);     
          System.out.println("word: " + word + ", count: " + count);     
      }

Creating a Topology

l   Function description

A topology is a directed acyclic graph (DAG) consisting of Spouts and Bolts.

Applications are submitted in storm jar mode. Therefore, a function for creating a topology must be invoked in the main function, and the class to which the main function belongs must be specified in storm jar parameters.

l   Example code

The following code snippets are used in the main method in the WordCountTopology class of the com.huawei.storm.example.wordcount package to construct and submit applications.

 public static void main(String[] args)  
          throws Exception  
      {  
          TopologyBuilder builder = buildTopology();  
  
          /*  
           * Tasks can be submitted in the following three modes:  
           * 1. CLI submitting. In this mode, a user must copy the JAR file of an application to a client and run related commands on the client. 
           * 2. Remote submitting. In this mode, a user must package the JAR file of an application and execute the main method in Eclipse.   
           * 3. Local submitting. In this mode, a user must run an application for test on a local computer. 
           * The CLI submitting and remote submitting modes support both security and normal modes. 
           * The local submitting mode supports the normal mode only.
           *     
           * A user can select only one mode for submitting a task. By default, the CLI submitting mode is used. To use another mode, delete code comments.            */  
  
          submitTopology(builder, SubmitType.CMD); 
  
      }  
  
     private static void submitTopology(TopologyBuilder builder, SubmitType type) throws Exception 
     { 
         switch (type)  
         { 
             case CMD:  
           { 
                 cmdSubmit(builder, null); 
                 break; 
             } 
             case REMOTE: 
             { 
                 remoteSubmit(builder); 
                 break; 
             } 
             case LOCAL: 
             { 
                 localSubmit(builder); 
                 break; 
             } 
         } 
     } 
  
     /** 
      * CLI submitting mode 
      * The procedures are as follows:
      * Package a JAR file and then submit the task in the client CLI.   
      * In remote submitting mode, package application JAR files and other externally dependent JAR files into one JAR file. Other externally dependent JAR files are depended by user programs, not provided by the example project.
      * Run storm -jar on the Storm client to submit the task. 
      *  
      * In a security environment, before submitting the task in the client CLI, run kinit to perform login in security mode. 
      * 
      * Run the following commands:
      * ./storm jar ../example/example.jar com.huawei.storm.example.WordCountTopology 
      */       
     private static void cmdSubmit(TopologyBuilder builder, Config conf) 
         throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException 
     { 
         if (conf == null) 
         { 
             conf = new Config(); 
         } 
         conf.setNumWorkers(1); 
  
         StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, conf, builder.createTopology()); 
     } 
  
      private static void localSubmit(TopologyBuilder builder)  
          throws InterruptedException  
      {  
          Config conf = new Config();  
          conf.setDebug(true);  
          conf.setMaxTaskParallelism(3);  
          LocalCluster cluster = new LocalCluster();  
          cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());  
          Thread.sleep(10000);  
          cluster.shutdown();  
      }  
  
  
     private static void remoteSubmit(TopologyBuilder builder) 
         throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, 
         IOException 
     { 
         Config config = createConf(); 
  
         String userJarFilePath = "User JAR file address";  
         System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); 
  
         //Preparations to be made in security mode
         if (isSecurityModel()) 
         { 
             securityPrepare(config); 
         } 
         config.setNumWorkers(1); 
         StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); 
     }  
  
        private static TopologyBuilder buildTopology()  
      {  
          TopologyBuilder builder = new TopologyBuilder();  
          builder.setSpout("spout", new RandomSentenceSpout(), 5);  
          builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout");  
          builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));  
          return builder;  
      }

1.1.1.4 Obtaining Example Code

Using the FusionInsight Client

Decompress the FusionInsight client installation package and obtain the storm-examples file, which is saved in the Storm sub-folder of the FusionInsight_Services_ClientConfig folder.

Using the Maven Project

Download the code from the GitHub website to the local PC. GitHub URL for downloading the example code in security mode:http://xxx.xxx.xxx/xxxxxxxx/components/storm/storm-examples

1.1.1.5 Running an Application

1.1.1.5.1 Packaging Eclipse Code

Scenarios

Export a JAR file using Eclipse and specify the name of the JAR file, for example, example.jar.

Procedure

                               Step 1      Right-click the storm_examples project, and choose export from the shortcut menu, as shown in Figure 1-162.

Figure 1-1 Choosing export from the Eclipse shortcut menu

20180622094106443001.png

 

                               Step 2      Select JAR file on the Export panel, and click Next, as shown in Figure 1-163.

Figure 1-2 Exporting an example project

20180622094106625002.png

 

                               Step 3      Select the src directory, select the directory to save the exported files, and click Finish, as shown in Figure 1-164.

Figure 1-3 Selecting files to be exported

20180622094107790003.png

 

----End

1.1.1.5.2 Packaging Services

Packaging services is to export the source.jar file that can be submitted. The storm-jartool tool is required for service packaging. The tool can run on a Windows or Linux OS.

1.5.2.5.2.1 Packaging Services on a Linux OS

Scenarios

Storm supports packaging in a Linux environment. You can upload JAR files exported from Eclipse and other related JAR files to a Linux environment and perform packaging.

Prerequisites

l   You have installed the Storm client.

l   1.5.2.5.1 Packaging Eclipse Code is complete.

l   When the host where the Linux OS runs is not a node of the cluster, you are required to set the mapping between the host name and IP address in the hosts file of the node where the Linux OS runs. The host name must be correctly mapped to the IP address.

Procedure

                               Step 1      Use WinSCP to copy the JAR file exported from Eclipse to a specified directory on the Linux client(for example, /opt/jarsource).

                               Step 2      Copy the obtained configuration files to the specified directory. For details about the depended configuration files, see related development guide.

                               Step 3      Copy the obtained depended JAR files to the specified directory. For details about the topology-depended packages, see related development guide.

                               Step 4      Run the packaging command in the Storm client installation directory Storm/storm-1.0.2/bin to package the preceding JAR files into a complete service JAR file and place the package in the /opt/jartarget directory (which can be any empty directory). After the storm-jartool.sh /opt/jarsource/ /opt/jartarget command is executed, source.jar is created in the /opt/jartarget directory.

----End

1.5.2.5.2.2 Packaging Services on a Windows OS

Scenarios

Storm supports packaging in a Windows environment.

Prerequisites

1.5.2.5.1 Packaging Eclipse Code is complete.

Procedure

                               Step 1      Place the JAR file exported from Eclipse to a specified directory (for example, D:\source).

                               Step 2      Copy the obtained configuration files to the specified directory. For details about the depended configuration files, see related development guide.

                               Step 3      Copy the obtained depended JAR files to the specified directory. For details about the topology-depended packages, see related development guide.

                               Step 4      Find storm-jartool.bat in the FusionInsight_Storm_ClientConfig\Storm\storm-examples\tools directory.

                               Step 5      Double-click the packaging tool, enter the directory where the JAR files to be packaged locate (D:\source) and press Enter, and enter the directory where the package to be created locates (D:\target). The source.jar file is created in the D:\target directory.

----End

1.1.1.5.3 Submitting a Topology

1.5.2.5.3.1 Submitting a Topology When a Client Is Installed on a Linux OS

Scenarios

You can use storm commands to submit topologies in a Linux environment.

Prerequisites

l   You have installed the Storm client.

l   If the host where the client is installed is not a node in the cluster, set the mapping between the host name and the IP address in the hosts file on the node where the client locates. The host name and IP address must be in one-to-one mapping.

l   1.5.2.5.2 Packaging Services has been performed and source.jar has been created.

Procedure

                               Step 1      In security mode, perform security authentication first.

1.         Initialize client environment variables.

Go to the installation directory /opt/Storm_Client, and run the following command to import environment variables:

source bigdata_env

2.         Use the developer account created in Preparing the Developer Account section for secure login.

Run the kinit command to log in to the client in security mode as a human-machine user.

kinit Username

For example, run the following command:

kinit developuser

Enter a password as prompted. If no error message appears, the Kerberos authentication is completed for the user.

                               Step 2      Submit a topology. (Wordcount is used as an example. For details about other topologies, see related development guide.) Go to the Storm/storm-1.0.2/bin directory on the Streaming client, and run the storm jar /opt/jartarget/source.jar com.huawei.storm.example.wordcount.WordCountTopology command to submit the created source.jar file. (If the package is created on a Windows OS, use WinSCP to upload the source.jar file to a specified directory on the Linux server, such as /opt/jartarget.)

                               Step 3      Run the storm list command to view the submitted applications. If the word-count application can be viewed, the task is submitted successfully.

note

If a service is set to the local mode and is submitted by using commands, ensure that the submitting environment is a normal one. Currently, services in local mode cannot be submitted by using commands in a security environment.

----End

1.5.2.5.3.2 Submitting a Topology When No Client Is Installed on a Linux OS

Scenarios

Storm supports topology running in a Linux environment where no Storm client is installed.

Prerequisites

l   Ensure that the time difference between the client and the FusionInsight HD cluster is less than 5 minutes.

l   If the host where the client is installed is not a node in the cluster, set the mapping between the host name and the IP address in the hosts file on the node where the client locates. The host name and IP address must be in one-to-one mapping.

Procedure

                               Step 1      Prepare for dependency JAR files and configuration files.

In the Linux environment, create a directory, such as /opt/test, and create sub-directories lib and src/main/resources/. Upload the JAR files in the lib folder of the example project to the lib directory in the Linux environment. Upload the configuration files in the src/main/resources folder of the example project to the src/main/resources directory in the Linux environment.

                               Step 2      Modify the WordCountTopology.java class in the Eclipse project and change the application submission mode to the remoteSubmit mode. Replace the user keytab file name, user principal name, and the JAR file address.

l   Submit the application in remoteSubmit mode.

public static void main(String[] args)  
         throws Exception  
     {  
         TopologyBuilder builder = buildTopology();  
           
         /*  
          * The task submitting modes are as follows:  
          * 1. Command submitting mode: In this mode, you need to copy an application JAR package to the client machine and run related commands for submitting.  
          * 2. Remote submitting mode: In this mode, you need to package related content into an application JAR package and run the main method in Eclipse for submitting.  
          * 3. Local submitting mode: In this mode, you need to run an application locally. This mode is usually used for tests.  
          * The command submitting mode and remote submitting mode are supported in both the security and normal modes.  
          * The local submitting mode is supported only in the normal mode.  
          * 
          * The user can only select one mode for task submitting. The command submitting mode is used by default. If another mode is used, delete the code comments.  
          */  
           
         submitTopology(builder, SubmitType.REMOTE);  
     }

l   Change the value of userJarFilePath to the specified directory /opt/test/lib/example.jar in the Linux environment.

private static void remoteSubmit(TopologyBuilder builder)  
         throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException,  
         IOException  
     {  
         Config config = createConf();  
           
         String userJarFilePath = "/opt/test/lib/example.jar ";  
         System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath);  
           
         //Preparations in security mode  
         if (isSecurityModel())  
         {  
             securityPrepare(config);  
         }  
         config.setNumWorkers(1);  
         StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology());  
     }

l   In security mode, make security authentication preparations, and change the values of userKeyTablePath and userPrincipal to the user keytab file name and principal obtained in step 2 based on the actual situation.

private static void securityPrepare(Config config)  
         throws IOException  
     {  
         String userKeyTablePath =  
             System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "user.keytab";  
         String userPrincipal = "StreamingDeveloper";  
         String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator +"krb5.conf";  
           
         //Separators need to be replaced in Windows paths.  
         userKeyTablePath = userKeyTablePath.replace("\\", "\\\\");  
         krbFilePath = krbFilePath.replace("\\", "\\\\");  
           
         String principalInstance = String.valueOf(config.get(Config.STORM_SECURITY_PRINCIPAL_INSTANCE));  
         LoginUtil.setKrb5Config(krbFilePath);  
         LoginUtil.setZookeeperServerPrincipal("zookeeper/" + principalInstance);  
         LoginUtil.setJaasFile(userPrincipal, userKeyTablePath);  
     }

                               Step 3      Export the JAR file to the Linux environment.

l   Right-click the example project and choose Export from the shortcut menu.

l   Select JAR file and click Next.

l   Select the src/main/java directory to export the JAR file to a specific directory, and name the JAR file example.jar.

l   Click Finish.

l   Copy the exported JAR file to the /opt/test/lib directory in the Linux environment.

                               Step 4      Go to the /opt/test directory, and run the following command to run the JAR file:

java -classpath /opt/test/lib/*:/opt/test/src/main/resources com.huawei.storm.example.wordcount.WordCountTopology

----End

1.5.2.5.3.3 Submitting a Topology in Eclipse Remotely

Scenario

Storm supports remote topology submitting in Eclipse. In current example code, only WordCountTopology can be submitted remotely. If you want to submit other topologies remotely, implement the remote submission function first. For details, see WordCountTopology.

Prerequisites

l   1.5.2.5.1 Packaging Eclipse Code is complete.

l   Ensure that the time difference between the client and the Storm cluster is less than 5 minutes.

l   When the host where the Linux OS runs is not a node of the cluster, you are required to set the mapping between the host name and IP address in the hosts file of the node where the Linux OS runs. The host name must be correctly mapped to the IP address.

Procedure

                               Step 1      Modify the WordCountTopology.java class and change the application submission mode to the remoteSubmit mode. Replace the user keytab file name, user principal name, and the JAR file address.

l   Submit the application in remoteSubmit mode.

public static void main(String[] args)  
         throws Exception  
     {  
         TopologyBuilder builder = buildTopology();  
           
         /*  
          * The task submitting modes are as follows:  
          * 1. Command submitting mode: In this mode, you need to copy an application JAR package to the client machine and run related commands for submitting.  
          * 2. Remote submitting mode: In this mode, you need to package related content into an application JAR package and run the main method in Eclipse for submitting.  
          * 3. Local submitting mode: In this mode, you need to run an application locally. This mode is usually used for tests.  
          * The command submitting mode and remote submitting mode are supported in both the security and normal modes.  
          * The local submitting mode is supported only in the normal mode.  
          * 
          * The user can only select one mode for task submitting. The command submitting mode is used by default. If another mode is used, delete the code comments.  
          */  
           
         submitTopology(builder, SubmitType.REMOTE);  
     }

l   Change the value of userJarFilePath to the actual topology JAR file address based on the actual situation.

private static void remoteSubmit(TopologyBuilder builder)  
         throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException,  
         IOException  
     {  
         Config config = createConf();  
         
         String userJarFilePath = "D:\\example.jar";  
         System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath);  
           
         //Preparations in security mode  
         if (isSecurityModel())  
         {  
             securityPrepare(config);  
         }  
         config.setNumWorkers(1);  
         StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology());  
     }

l   In security mode, make security authentication preparations, and change the values of userKeyTablePath and userPrincipal to the user keytab file path and principal obtained in step 2 based on the actual situation.

private static void securityPrepare(Config config)  
         throws IOException  
     {  
         String userKeyTablePath =  
             System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "user.keytab";  
         String userPrincipal = "StreamingDeveloper";  
         String krbFilePath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator +"krb5.conf";  
           
         //Separators need to be replaced in Windows paths.  
         userKeyTablePath = userKeyTablePath.replace("\\", "\\\\");  
         krbFilePath = krbFilePath.replace("\\", "\\\\");  
           
         String principalInstance = String.valueOf(config.get(Config.STORM_SECURITY_PRINCIPAL_INSTANCE));  
         LoginUtil.setKrb5Config(krbFilePath);  
         LoginUtil.setZookeeperServerPrincipal("zookeeper/" + principalInstance);  
         LoginUtil.setJaasFile(userPrincipal, userKeyTablePath);  
     }

                               Step 2      Execute the Main method of the WordCountTopology.java class to submit the application.

----End

1.1.1.5.4 Viewing Results

Scenario

After a Storm application is run, you can log in to the Storm WebUI to check the running result.

Procedure

                               Step 1      Log in to FusionInsight Manager.

Enter the address in the address box of your browser. The address format is http://FusionInsight Manager WebService floating IP address:8080/web.

For example, enter http://10.0.0.1:8080/web.

                               Step 2      Choose Services > Storm to go to the Streaming WebUI, as shown in Figure 1-165.

Figure 1-4 Storm service management page

20180622094109326005.jpg

 

                               Step 3      On the Storm UI, click the word-count application to view the application running status, as shown in Figure 1-166.

Figure 1-5 Storm application execution page

20180622094109107006.png

 

In Topology stats, the total volume of data transferred between operators in different time periods is displayed.

In Spouts, the total number of messages sent by the spout operator from the moment the operator is started till now is displayed. In Bolts, the total number of messages sent by the Count operator and the split operator is displayed, as shown in Figure 1-167.

Figure 1-6 Total volume of data sent by the Storm application operators

20180622094110401007.png

 

----End

 


This post was last edited by chz at 2018-08-03 07:55.

This article contains more resources

You need to log in to download or view. No account? Register

x

Thank you for sharing
View more
  • x
  • convention:

Hope to help you:)
View more
  • x
  • convention:

Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.