公司有一个订单备注系统,业务模型中符合订单备注的条件其中之一是必须在发货前。合作的一些大的淘宝商家比较多,因此需要处理的订单量也很大。之前的老系统是首先订单数据推送到库后发送一个MQ小时,此时系统从数据库中拉取订单分析后备注响应的内容。单一次推送的订单量大,而且也有延时,因此出现楼单以及备注延迟的情况也很多。

在决定重新设计该系统后,经过调研选择使用流式处理框架,至于为什么这么设计这需要另一篇文章说说。第一次接触Storm,其实安装也是比较简单。下面简单的记录一下安装的步骤。

步骤

  1. storm依赖zookeeper,需要先安装zookeeper;
  2. 安装storm的依赖,包括java和python;
  3. 修改storm的配置文件storm.yaml;
  4. 使用storm脚本或者选择的监控方式运行storm;

安装Zookeeper集群

这里有官方的文档,可以安装一个单机版或一个集群。Kafka集群也是以来与Zookeeper,因此可以直接使用。

安装依赖

  1. Java 8
  2. Python 2.6.6

安装storm

下载storm的安装包,解压放在需要的位置,需要配置的有:

  1. Storm配置文件storm.yaml位于conf/storm.yaml下,是一个yaml格式的文件,storm.zookeeper.servers:表示Zookeeper的集群地址。若不是使用Zookeeper默认的端口,需要使用storm.zookeeper.port来配置端口号;否则直接像下面一样配置:

    storm.zookeeper.servers:
    - "111.222.333.444"
    - "555.666.777.888"
  2. storm.local.dir:用来保存storm需要存储的少量状态数据,比如jars文件等,目录需要注意权限:

    storm.local.dir: "/mnt/storm"
  3. nimbus.host:工作节点要要直到下载jars文件和confs文件的master机器,这个是配置master机器:

    nimbus.host: "111.222.333.44"
  4. supervisor.slots.ports:每一个worker需要监听一个段端口来接收消息,这个配置用来定义监听的端口号,若配置5个端口,则storm运行5个workers,配置3个运行3个,默认是4个,分别是 6700, 6701, 6702, 6703.配置如:

    supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

在Config中有一个方法setNumWorkers可以设置worker的数量,它和supervisor.slots.ports:的区别在于supervisor.slots.ports:设置了worker数量的最大可用量。例如supervisor.slots.ports:设置为5个,setNumWorkers设置为3个,实际使用的worker只有3个,有2个worker是闲置的;若supervisor.slots.ports:设置为3个,setNumWorkers设置为5个,则实际只有3个workers存在并且运行;另外实际使用的woker还好Executors与Task有关,如果只有2个Executors和Task,那么即使启动3个woker实际也只有2个woker使用;

启动

  1. 在master机器supervision下运行bin/storm nimbus来启动nimbus;
  2. 在每一个worker机器supervision下运行bin/storm supervisor来启动supervisor,supervisor负责启动和停止
    工作进程;
  3. 在supervision下使用bin/storm ui可以启动监控页面;使用ui.port(value格式是数字)可以配置这个监控地址的端口号;

提交Topology

  1. 使用下面命令提交topology到storm:
    storm jar jar-file.jar fullname.MainClass args

注意:

Storm的woker之间通信是通过查找主机名,然后通过netty建立连接通信的,因此要配置好主机名与ip的映射关系,否则会发生topology能成功提交,任务调度失败的情况。