1.1.1 Case 1: Basic Storm Operations
1.1.1.1 Scenarios
Scenario Description
The following describes the service process of a dynamic word counting system. The data source is a logical unit that produces random text continuously.
l The data source continuously sends random text, such as "apple orange apple", to the text splitting logic.
l The word splitting logic splits each text entry sent by the data source by space, such as "apple", "orange", "apple", and then sends each word to the word counting logic.
l The word counting logic increases the number of times that a specific word occurs by one when receiving the word, and prints the real-time results, for example:
− Apple: 1
− Orange: 1
− Apple: 2
1.1.1.2 Development Idea
Function Type
Table 1-24 describes the procedure for a user to develop an application to calculate the number of times that each word appears in random text.
Table 1-1 Functions to be developed
|
SN |
Step |
Example |
|
1 |
Create a Spout to create random text. |
For details, see section "Creating a Spout" of FusionInsight HD Application Development Guide. |
|
2 |
Create a Bolt to split the random text into words. |
For details, see section "Creating a Bolt" of FusionInsight HD Application Development Guide. |
|
3 |
Create a Blot to calculate the number of times that each word appears. |
For details, see section "Creating a Bolt" of FusionInsight HD Application Development Guide. |
|
4 |
Creating a topology. |
For details, see section "Creating a Topology" of FusionInsight HD Application Development Guide. |
For details about certain code, see example code description. For details about complete code, see the Storm-examples example project.
1.1.1.3 Example Code Description
Creating a Spout
l Function description
A Spout is a message source of Storm and message producer of the topology. Generally, a message source reads data from an external source and sends messages (Tuple) to the topology.
One message source can send multiple message streams. OutputFieldsDeclarer.declarerStream can be used to define multiple streams and then SpoutOutputCollector can be used to emit specific streams.
l Example code
The following code snippets are used in the nextTuple method in the RandomSentenceSpout class of the com.huawei.storm.example.common package, and are used to split strings into words.
/**
* {@inheritDoc}
*/
@Override
public void nextTuple()
{
Utils.sleep(100);
String[] sentences =
new String[] {"the
cow jumped over the moon",
"an
apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"};
String sentence =
sentences[random.nextInt(sentences.length)];
collector.emit(new
Values(sentence));
}
Creating a Bolt
l Function description
All message processing logic is encapsulated in Bolts. Bolts provide multiple functions, such as filtering and aggregation.
If there are other topology operators, except for Bolts, OutputFieldsDeclarer.declareStream can be used to define streams, and OutputCollector.emit can be used to select streams to be emitted.
l Example code
The following code snippets are in the execute method in the SplitSentenceBolt class of the com.huawei.storm.example.common package, and are used to split a statement into words and send the words.
/**
* {@inheritDoc}
*/
@Override
public void execute(Tuple input, BasicOutputCollector
collector)
{
String sentence =
input.getString(0);
String[] words = sentence.split("
");
for (String word : words)
{
word =
word.trim();
if
(!word.isEmpty())
{
word
= word.toLowerCase();
collector.emit(new Values(word));
}
}
}
The following code snippets are used in the execute method in the WordCountBolt class of the com.huawei.storm.example.wordcount package, and are used to calculate the number of received words.
@Override
public void execute(Tuple tuple,
BasicOutputCollector collector)
{
String word =
tuple.getString(0);
Integer count =
counts.get(word);
if (count ==
null)
{
count = 0;
}
count++;
counts.put(word,
count);
System.out.println("word: " + word + ", count: " +
count);
}
Creating a Topology
l Function description
A topology is a directed acyclic graph (DAG) consisting of Spouts and Bolts.
Applications are submitted in storm jar mode. Therefore, a function for creating a topology must be invoked in the main function, and the class to which the main function belongs must be specified in storm jar parameters.
l Example code
The following code snippets are used in the main method in the WordCountTopology class of the com.huawei.storm.example.wordcount package to construct and submit applications.
public static void main(String[]
args)
throws
Exception
{
TopologyBuilder builder
= buildTopology();
/*
* Tasks can be
submitted in the following three modes:
* 1. CLI
submitting. In this mode, a user must copy the JAR file of an application to a
client and run related commands on the client.
* 2. Remote
submitting. In this mode, a user must package the JAR file of an application
and execute the main method in Eclipse.
* 3. Local
submitting. In this mode, a user must run an application for test on a local
computer.
* The CLI
submitting and remote submitting modes support both security and normal
modes.
* The local
submitting mode supports the normal mode only.
*
* A user can
select only one mode for submitting a task. By default, the CLI submitting mode
is used. To use another mode, delete code
comments.
*/
submitTopology(builder,
SubmitType.CMD);
}
private static void submitTopology(TopologyBuilder
builder, SubmitType type) throws Exception
{
switch (type)
{
case
CMD:
{
cmdSubmit(builder, null);
break;
}
case
REMOTE:
{
remoteSubmit(builder);
break;
}
case
LOCAL:
{
localSubmit(builder);
break;
}
}
}
/**
* CLI submitting mode
* The procedures are as follows:
* Package a JAR file and then submit the task in
the client CLI.
* In remote submitting mode, package application
JAR files and other externally dependent JAR files into one JAR file. Other
externally dependent JAR files are depended by user programs, not provided by
the example project.
* Run storm -jar on the Storm client to
submit the task.
*
* In a security environment, before submitting
the task in the client CLI, run kinit to perform login in security
mode.
*
* Run the following commands:
* ./storm jar ../example/example.jar
com.huawei.storm.example.WordCountTopology
*/
private static void cmdSubmit(TopologyBuilder builder,
Config conf)
throws AlreadyAliveException,
InvalidTopologyException, NotALeaderException, AuthorizationException
{
if (conf == null)
{
conf =
new Config();
}
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, conf,
builder.createTopology());
}
private static void localSubmit(TopologyBuilder
builder)
throws
InterruptedException
{
Config conf = new
Config();
conf.setDebug(true);
conf.setMaxTaskParallelism(3);
LocalCluster cluster =
new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, conf,
builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
private static void remoteSubmit(TopologyBuilder
builder)
throws AlreadyAliveException,
InvalidTopologyException, NotALeaderException, AuthorizationException,
IOException
{
Config config =
createConf();
String userJarFilePath =
"User JAR file address";
System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath);
//Preparations to be made in
security mode
if (isSecurityModel())
{
securityPrepare(config);
}
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config,
builder.createTopology());
}
private static TopologyBuilder
buildTopology()
{
TopologyBuilder builder
= new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentenceBolt(),
8).shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt(),
12).fieldsGrouping("split", new
Fields("word"));
return
builder;
}
1.1.1.4 Obtaining Example Code
Using the FusionInsight Client
Decompress the FusionInsight client installation package and obtain the storm-examples file, which is saved in the Storm sub-folder of the FusionInsight_Services_ClientConfig folder.
Using the Maven Project
Download the code from the GitHub website to the local PC. GitHub URL for downloading the example code in security mode:http://xxx.xxx.xxx/xxxxxxxx/components/storm/storm-examples
1.1.1.5 Running an Application
1.1.1.5.1 Packaging Eclipse Code
Scenarios
Export a JAR file using Eclipse and specify the name of the JAR file, for example, example.jar.
Procedure
Step 1 Right-click the storm_examples project, and choose export from the shortcut menu, as shown in Figure 1-162.
Figure 1-1 Choosing export from the Eclipse shortcut menu
![]()
Step 2 Select JAR file on the Export panel, and click Next, as shown in Figure 1-163.
Figure 1-2 Exporting an example project
![]()
Step 3 Select the src directory, select the directory to save the exported files, and click Finish, as shown in Figure 1-164.
Figure 1-3 Selecting files to be exported
![]()
----End
1.1.1.5.2 Packaging Services
Packaging services is to export the source.jar file that can be submitted. The storm-jartool tool is required for service packaging. The tool can run on a Windows or Linux OS.
1.5.2.5.2.1 Packaging Services on a Linux OS
Scenarios
Storm supports packaging in a Linux environment. You can upload JAR files exported from Eclipse and other related JAR files to a Linux environment and perform packaging.
Prerequisites
l You have installed the Storm client.
l 1.5.2.5.1 Packaging Eclipse Code is complete.
l When the host where the Linux OS runs is not a node of the cluster, you are required to set the mapping between the host name and IP address in the hosts file of the node where the Linux OS runs. The host name must be correctly mapped to the IP address.
Procedure
Step 1 Use WinSCP to copy the JAR file exported from Eclipse to a specified directory on the Linux client(for example, /opt/jarsource).
Step 2 Copy the obtained configuration files to the specified directory. For details about the depended configuration files, see related development guide.
Step 3 Copy the obtained depended JAR files to the specified directory. For details about the topology-depended packages, see related development guide.
Step 4 Run the packaging command in the Storm client installation directory Storm/storm-1.0.2/bin to package the preceding JAR files into a complete service JAR file and place the package in the /opt/jartarget directory (which can be any empty directory). After the storm-jartool.sh /opt/jarsource/ /opt/jartarget command is executed, source.jar is created in the /opt/jartarget directory.
----End
1.5.2.5.2.2 Packaging Services on a Windows OS
Scenarios
Storm supports packaging in a Windows environment.
Prerequisites
1.5.2.5.1 Packaging Eclipse Code is complete.
Procedure
Step 1 Place the JAR file exported from Eclipse to a specified directory (for example, D:\source).
Step 2 Copy the obtained configuration files to the specified directory. For details about the depended configuration files, see related development guide.
Step 3 Copy the obtained depended JAR files to the specified directory. For details about the topology-depended packages, see related development guide.
Step 4 Find storm-jartool.bat in the FusionInsight_Storm_ClientConfig\Storm\storm-examples\tools directory.
Step 5 Double-click the packaging tool, enter the directory where the JAR files to be packaged locate (D:\source) and press Enter, and enter the directory where the package to be created locates (D:\target). The source.jar file is created in the D:\target directory.
----End
1.1.1.5.3 Submitting a Topology
1.5.2.5.3.1 Submitting a Topology When a Client Is Installed on a Linux OS
Scenarios
You can use storm commands to submit topologies in a Linux environment.
Prerequisites
l You have installed the Storm client.
l If the host where the client is installed is not a node in the cluster, set the mapping between the host name and the IP address in the hosts file on the node where the client locates. The host name and IP address must be in one-to-one mapping.
l 1.5.2.5.2 Packaging Services has been performed and source.jar has been created.
Procedure
Step 1 In security mode, perform security authentication first.
1. Initialize client environment variables.
Go to the installation directory /opt/Storm_Client, and run the following command to import environment variables:
source bigdata_env
2. Use the developer account created in Preparing the Developer Account section for secure login.
Run the kinit command to log in to the client in security mode as a human-machine user.
kinit Username
For example, run the following command:
kinit developuser
Enter a password as prompted. If no error message appears, the Kerberos authentication is completed for the user.
Step 2 Submit a topology. (Wordcount is used as an example. For details about other topologies, see related development guide.) Go to the Storm/storm-1.0.2/bin directory on the Streaming client, and run the storm jar /opt/jartarget/source.jar com.huawei.storm.example.wordcount.WordCountTopology command to submit the created source.jar file. (If the package is created on a Windows OS, use WinSCP to upload the source.jar file to a specified directory on the Linux server, such as /opt/jartarget.)
Step 3 Run the storm list command to view the submitted applications. If the word-count application can be viewed, the task is submitted successfully.
![]()
If a service is set to the local mode and is submitted by using commands, ensure that the submitting environment is a normal one. Currently, services in local mode cannot be submitted by using commands in a security environment.
----End
1.5.2.5.3.2 Submitting a Topology When No Client Is Installed on a Linux OS
Scenarios
Storm supports topology running in a Linux environment where no Storm client is installed.
Prerequisites
l Ensure that the time difference between the client and the FusionInsight HD cluster is less than 5 minutes.
l If the host where the client is installed is not a node in the cluster, set the mapping between the host name and the IP address in the hosts file on the node where the client locates. The host name and IP address must be in one-to-one mapping.
Procedure
Step 1 Prepare for dependency JAR files and configuration files.
In the Linux environment, create a directory, such as /opt/test, and create sub-directories lib and src/main/resources/. Upload the JAR files in the lib folder of the example project to the lib directory in the Linux environment. Upload the configuration files in the src/main/resources folder of the example project to the src/main/resources directory in the Linux environment.
Step 2 Modify the WordCountTopology.java class in the Eclipse project and change the application submission mode to the remoteSubmit mode. Replace the user keytab file name, user principal name, and the JAR file address.
l Submit the application in remoteSubmit mode.
public static void main(String[]
args)
throws Exception
{
TopologyBuilder builder =
buildTopology();
/*
* The task submitting
modes are as follows:
* 1. Command submitting
mode: In this mode, you need to copy an application JAR package to the client
machine and run related commands for submitting.
* 2. Remote submitting
mode: In this mode, you need to package related content into an application JAR
package and run the main method in Eclipse for submitting.
* 3. Local submitting
mode: In this mode, you need to run an application locally. This mode is
usually used for tests.
* The command submitting
mode and remote submitting mode are supported in both the security and normal
modes.
* The local submitting
mode is supported only in the normal mode.
*
* The user can only
select one mode for task submitting. The command submitting mode is used by
default. If another mode is used, delete the code comments.
*/
submitTopology(builder,
SubmitType.REMOTE);
}
l Change the value of userJarFilePath to the specified directory /opt/test/lib/example.jar in the Linux environment.
private static void
remoteSubmit(TopologyBuilder builder)
throws AlreadyAliveException,
InvalidTopologyException, NotALeaderException,
AuthorizationException,
IOException
{
Config config =
createConf();
String userJarFilePath =
"/opt/test/lib/example.jar ";
System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath);
//Preparations in security
mode
if
(isSecurityModel())
{
securityPrepare(config);
}
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config,
builder.createTopology());
}
l In security mode, make security authentication preparations, and change the values of userKeyTablePath and userPrincipal to the user keytab file name and principal obtained in step 2 based on the actual situation.
private static void
securityPrepare(Config config)
throws IOException
{
String userKeyTablePath
=
System.getProperty("user.dir") + File.separator + "src" +
File.separator + "main" + File.separator + "resources" +
File.separator + "user.keytab";
String userPrincipal =
"StreamingDeveloper";
String krbFilePath =
System.getProperty("user.dir") + File.separator + "src" +
File.separator + "main" + File.separator + "resources" +
File.separator +"krb5.conf";
//Separators need to be
replaced in Windows paths.
userKeyTablePath =
userKeyTablePath.replace("\\", "\\\\");
krbFilePath =
krbFilePath.replace("\\", "\\\\");
String principalInstance =
String.valueOf(config.get(Config.STORM_SECURITY_PRINCIPAL_INSTANCE));
LoginUtil.setKrb5Config(krbFilePath);
LoginUtil.setZookeeperServerPrincipal("zookeeper/" +
principalInstance);
LoginUtil.setJaasFile(userPrincipal, userKeyTablePath);
}
Step 3 Export the JAR file to the Linux environment.
l Right-click the example project and choose Export from the shortcut menu.
l Select JAR file and click Next.
l Select the src/main/java directory to export the JAR file to a specific directory, and name the JAR file example.jar.
l Click Finish.
l Copy the exported JAR file to the /opt/test/lib directory in the Linux environment.
Step 4 Go to the /opt/test directory, and run the following command to run the JAR file:
java -classpath /opt/test/lib/*:/opt/test/src/main/resources com.huawei.storm.example.wordcount.WordCountTopology
----End
1.5.2.5.3.3 Submitting a Topology in Eclipse Remotely
Scenario
Storm supports remote topology submitting in Eclipse. In current example code, only WordCountTopology can be submitted remotely. If you want to submit other topologies remotely, implement the remote submission function first. For details, see WordCountTopology.
Prerequisites
l 1.5.2.5.1 Packaging Eclipse Code is complete.
l Ensure that the time difference between the client and the Storm cluster is less than 5 minutes.
l When the host where the Linux OS runs is not a node of the cluster, you are required to set the mapping between the host name and IP address in the hosts file of the node where the Linux OS runs. The host name must be correctly mapped to the IP address.
Procedure
Step 1 Modify the WordCountTopology.java class and change the application submission mode to the remoteSubmit mode. Replace the user keytab file name, user principal name, and the JAR file address.
l Submit the application in remoteSubmit mode.
public static void main(String[]
args)
throws Exception
{
TopologyBuilder builder =
buildTopology();
/*
* The task submitting
modes are as follows:
* 1. Command submitting
mode: In this mode, you need to copy an application JAR package to the client
machine and run related commands for submitting.
* 2. Remote submitting
mode: In this mode, you need to package related content into an application JAR
package and run the main method in Eclipse for submitting.
* 3. Local submitting
mode: In this mode, you need to run an application locally. This mode is
usually used for tests.
* The command submitting
mode and remote submitting mode are supported in both the security and normal
modes.
* The local submitting
mode is supported only in the normal mode.
*
* The user can only
select one mode for task submitting. The command submitting mode is used by
default. If another mode is used, delete the code comments.
*/
submitTopology(builder,
SubmitType.REMOTE);
}
l Change the value of userJarFilePath to the actual topology JAR file address based on the actual situation.
private static void
remoteSubmit(TopologyBuilder builder)
throws AlreadyAliveException,
InvalidTopologyException, NotALeaderException,
AuthorizationException,
IOException
{
Config config =
createConf();
String userJarFilePath =
"D:\\example.jar";
System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath);
//Preparations in security
mode
if
(isSecurityModel())
{
securityPrepare(config);
}
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config,
builder.createTopology());
}
l In security mode, make security authentication preparations, and change the values of userKeyTablePath and userPrincipal to the user keytab file path and principal obtained in step 2 based on the actual situation.
private static void
securityPrepare(Config config)
throws IOException
{
String userKeyTablePath
=
System.getProperty("user.dir") + File.separator + "src" +
File.separator + "main" + File.separator + "resources" +
File.separator + "user.keytab";
String userPrincipal =
"StreamingDeveloper";
String krbFilePath =
System.getProperty("user.dir") + File.separator + "src" +
File.separator + "main" + File.separator + "resources" +
File.separator +"krb5.conf";
//Separators need to be
replaced in Windows paths.
userKeyTablePath =
userKeyTablePath.replace("\\", "\\\\");
krbFilePath =
krbFilePath.replace("\\", "\\\\");
String principalInstance =
String.valueOf(config.get(Config.STORM_SECURITY_PRINCIPAL_INSTANCE));
LoginUtil.setKrb5Config(krbFilePath);
LoginUtil.setZookeeperServerPrincipal("zookeeper/" +
principalInstance);
LoginUtil.setJaasFile(userPrincipal, userKeyTablePath);
}
Step 2 Execute the Main method of the WordCountTopology.java class to submit the application.
----End
1.1.1.5.4 Viewing Results
Scenario
After a Storm application is run, you can log in to the Storm WebUI to check the running result.
Procedure
Step 1 Log in to FusionInsight Manager.
Enter the address in the address box of your browser. The address format is http://FusionInsight Manager WebService floating IP address:8080/web.
For example, enter http://10.0.0.1:8080/web.
Step 2 Choose Services > Storm to go to the Streaming WebUI, as shown in Figure 1-165.
Figure 1-4 Storm service management page
![]()
Step 3 On the Storm UI, click the word-count application to view the application running status, as shown in Figure 1-166.
Figure 1-5 Storm application execution page
![]()
In Topology stats, the total volume of data transferred between operators in different time periods is displayed.
In Spouts, the total number of messages sent by the spout operator from the moment the operator is started till now is displayed. In Bolts, the total number of messages sent by the Count operator and the split operator is displayed, as shown in Figure 1-167.
Figure 1-6 Total volume of data sent by the Storm application operators
![]()
----End
This post was last edited by chz at 2018-08-03 07:55.
