且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

更新时间:2022-09-11 21:52:11

 StormTopologyMoreTask.java

 

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现
package zhouls.bigdata.stormDemo;

import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;


public class StormTopologyMoreTask {
    
    public static class MySpout extends BaseRichSpout{
        private Map conf;
        private TopologyContext context;
        private SpoutOutputCollector collector;
        public void open(Map conf, TopologyContext context,
                SpoutOutputCollector collector) {
            this.conf = conf;
            this.collector = collector;
            this.context = context;
        }

        int num = 0; 
        public void nextTuple() {
            num++;
            System.out.println("spout:"+num);
            this.collector.emit(new Values(num));
            Utils.sleep(1000);
        }

    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("num"));
        }
        
    }
    
    
    
    public static class MyBolt extends BaseRichBolt{
        
        private Map stormConf;
        private TopologyContext context;
        private OutputCollector collector;
        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
            this.stormConf = stormConf;
            this.context = context;
            this.collector = collector;
        }
        
        
        public void execute(Tuple input) {
            Integer num = input.getIntegerByField("num");
            System.out.println("线程id:"+Thread.currentThread().getId()+",接收的值为:"+num);
        }

        
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            
        }
        
    }
    
    
    
    public static void main(String[] args) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        String spout_id = MySpout.class.getSimpleName();
        String bolt_id = MyBolt.class.getSimpleName();
        
        topologyBuilder.setSpout(spout_id, new MySpout());
        topologyBuilder.setBolt(bolt_id, new MyBolt()).setNumTasks(3).shuffleGrouping(spout_id);
        
        
        Config config = new Config();
        String topology_name = StormTopologyMoreTask.class.getSimpleName();
        if(args.length==0){
            //在本地运行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
        }else{
            //在集群运行
            try {
                StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            } catch (AuthorizationException e) {
                e.printStackTrace();
            }
        }
        
    }

}
Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 

 

 打jar包

 Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现
[hadoop@master jar]$ pwd
/home/hadoop/app/apache-storm-1.0.2/jar
[hadoop@master jar]$ ll
total 24
-rw-r--r-- 1 hadoop hadoop 4869 Jul 27 22:17 StormTopology.jar
-rw-r--r-- 1 hadoop hadoop 5091 Jul 27 23:00 StormTopologyMoreExecutor.jar
-rw-r--r-- 1 hadoop hadoop 4992 Jul 27 22:39 StormTopologyMoreWorker.jar
[hadoop@master jar]$ rz

[hadoop@master jar]$ ll
total 32
-rw-r--r-- 1 hadoop hadoop 4869 Jul 27 22:17 StormTopology.jar
-rw-r--r-- 1 hadoop hadoop 5091 Jul 27 23:00 StormTopologyMoreExecutor.jar
-rw-r--r-- 1 hadoop hadoop 5105 Jul 27 23:20 StormTopologyMoreTask.jar
-rw-r--r-- 1 hadoop hadoop 4992 Jul 27 22:39 StormTopologyMoreWorker.jar
[hadoop@master jar]$ 
Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 

提交作业之前

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 

 

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 

 

 

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 

 

 

  

 

为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

   因为,我之前运行的StormTopologyMoreExecutor没有停掉

 

 

 

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 

 

 

 

  

为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

 

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 

 

 

 

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现

 

 

 

 

 

Storm编程入门API系列之Storm的Topology多个tasks数目控制实现



本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/7247940.html,如需转载请自行联系原作者