Hello, everyone!
Today I'm going to introduce you FI storm. It helps beginners learn about FI faster.
Spout usage example, which is used to build a Storm Topology.
public static void main( String[] args )
{
String mode = "Local"; //使用本地测试模式。
String consumer_group_name = ""; //每个Topology需要设定唯一的消费组名称。
String project = ""; // 日志服务的Project。
String logstore = ""; // 日志服务的Logstore。
String endpoint = ""; // 日志服务访问域名。
String access_id = ""; // 用户AK信息。
String access_key = ""; // 构建一个Loghub Storm Spout需要使用的配置。
LogHubSpoutConfig config = new LogHubSpoutConfig(consumer_group_name,
endpoint, project, logstore, access_id,
access_key, LogHubCursorPosition.END_CURSOR);
TopologyBuilder builder = new TopologyBuilder(); // 构建Loghub Storm Spout。
LogHubSpout spout = new LogHubSpout(config); // 在实际场景中,Spout的个数可以和Logstore Shard个数相同。
builder.setSpout("spout", spout, 1);
builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(false);
conf.setMaxSpoutPending(1);
// 如果使用Kryo进行数据的序列化和反序列化,则需要显示设置LogGroupData的序列化方法LogGroupDataSerializSerializer。
Config.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class); if (mode.equals("Local")) {
logger.info("Local mode...");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology()); try {
Thread.sleep(6000 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cluster.killTopology("test-jstorm-spout");
cluster.shutdown();
} else if (mode.equals("Remote")) {
logger.info("Remote mode...");
conf.setNumWorkers(2); try {
StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
} else {
logger.error("invalid mode: " + mode);
}
}
}Example of the bolt code of consumption data. The content of each log is printed.
public class SampleBolt extends BaseRichBolt { private static final long serialVersionUID = 4752656887774402264L; private static final Logger logger = Logger.getLogger(BaseBasicBolt.class); private OutputCollector mCollector; @Override
public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
OutputCollector collector) {
mCollector = collector;
} @Override
public void execute(Tuple tuple) {
String shardId = (String) tuple
.getValueByField(LogHubSpout.FIELD_SHARD_ID); @SuppressWarnings("unchecked")
List<LogGroupData> logGroupDatas = (ArrayList<LogGroupData>) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS); for (LogGroupData groupData : logGroupDatas) { // 每个logGroup由一条或多条日志组成。
LogGroup logGroup = groupData.GetLogGroup(); for (Log log : logGroup.getLogsList()) {
StringBuilder sb = new StringBuilder(); // 每条日志,有一个时间字段,以及多个Key,Value对。
int log_time = log.getTime();
sb.append("LogTime:").append(log_time); for (Content content : log.getContentsList()) {
sb.append("\t").append(content.getKey()).append(":")
.append(content.getValue());
}
logger.info(sb.toString());
}
} // 在LogHub Spout中,强制依赖Storm的ACK机制,用于确认Spout将消息正确发送至bolt,所以在bolt中一定要调用ACK。
mCollector.ack(tuple);
} @Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { //do nothing
}
}Adding a Maven Dependency
For Storm 1.0 and earlier versions (for example, 0.9.6), use the following commands:
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-storm-spout</artifactId> <version>0.6.6</version></dependency>
For Storm 1.0 and later versions, use the following commands:
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-storm-1.0-spout</artifactId> <version>0.1.3</version></dependency>