Got it

storm example

269 0 0 0 0

Hello, everyone!


Today I'm going to introduce you FI storm. It helps beginners learn about FI faster.

Spout usage example, which is used to build a Storm Topology.

     public static void main( String[] args )
    {     
        String mode = "Local";  //使用本地测试模式。
           String consumer_group_name = "";   //每个Topology需要设定唯一的消费组名称。
        String project = "";    // 日志服务的Project。 
        String logstore = "";   // 日志服务的Logstore。
        String endpoint = "";   // 日志服务访问域名。
        String access_id = "";  // 用户AK信息。
        String access_key = "";        // 构建一个Loghub Storm Spout需要使用的配置。
        LogHubSpoutConfig config = new LogHubSpoutConfig(consumer_group_name,
                endpoint, project, logstore, access_id,
                access_key, LogHubCursorPosition.END_CURSOR);
        TopologyBuilder builder = new TopologyBuilder();        // 构建Loghub Storm Spout。
        LogHubSpout spout = new LogHubSpout(config);        // 在实际场景中,Spout的个数可以和Logstore Shard个数相同。
        builder.setSpout("spout", spout, 1);
        builder.setBolt("exclaim", new SampleBolt()).shuffleGrouping("spout");
        Config conf = new Config();
        conf.setDebug(false);
        conf.setMaxSpoutPending(1); 
        // 如果使用Kryo进行数据的序列化和反序列化,则需要显示设置LogGroupData的序列化方法LogGroupDataSerializSerializer。
        Config.registerSerialization(conf, LogGroupData.class, LogGroupDataSerializSerializer.class);        if (mode.equals("Local")) {
            logger.info("Local mode...");
            LocalCluster cluster  = new LocalCluster();
            cluster.submitTopology("test-jstorm-spout", conf, builder.createTopology());            try {
                Thread.sleep(6000 * 1000);   
            } catch (InterruptedException e) {
                e.printStackTrace();
            }  
            cluster.killTopology("test-jstorm-spout");
            cluster.shutdown();  
        } else if (mode.equals("Remote")) {
            logger.info("Remote mode...");
            conf.setNumWorkers(2);            try {
                StormSubmitter.submitTopology("stt-jstorm-spout-4", conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        } else {
            logger.error("invalid mode: " + mode);
        }
    }
}

Example of the bolt code of consumption data. The content of each log is printed.

public class SampleBolt extends BaseRichBolt {    private static final long serialVersionUID = 4752656887774402264L;    private static final Logger logger = Logger.getLogger(BaseBasicBolt.class);    private OutputCollector mCollector;    @Override
    public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context,
            OutputCollector collector) {
        mCollector = collector;
    }    @Override
    public void execute(Tuple tuple) {
        String shardId = (String) tuple
                .getValueByField(LogHubSpout.FIELD_SHARD_ID);        @SuppressWarnings("unchecked")
        List<LogGroupData> logGroupDatas = (ArrayList<LogGroupData>) tuple.getValueByField(LogHubSpout.FIELD_LOGGROUPS);        for (LogGroupData groupData : logGroupDatas) {            // 每个logGroup由一条或多条日志组成。
            LogGroup logGroup = groupData.GetLogGroup();            for (Log log : logGroup.getLogsList()) {
                StringBuilder sb = new StringBuilder();                // 每条日志,有一个时间字段,以及多个Key,Value对。
                int log_time = log.getTime();
                sb.append("LogTime:").append(log_time);                for (Content content : log.getContentsList()) {
                    sb.append("\t").append(content.getKey()).append(":")
                            .append(content.getValue());
                }
                logger.info(sb.toString());
            }
        }        // 在LogHub Spout中,强制依赖Storm的ACK机制,用于确认Spout将消息正确发送至bolt,所以在bolt中一定要调用ACK。
        mCollector.ack(tuple);
    }    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {        //do nothing
    }
}

Adding a Maven Dependency

For Storm 1.0 and earlier versions (for example, 0.9.6), use the following commands:

<dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>loghub-storm-spout</artifactId>
  <version>0.6.6</version></dependency>

For Storm 1.0 and later versions, use the following commands:

<dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>loghub-storm-1.0-spout</artifactId>
  <version>0.1.3</version></dependency>


Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.