且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

《Storm分布式实时计算模式》——1.5 理解数据流分组

更新时间:2022-10-08 15:29:29

本节书摘来自华章计算机《Storm分布式实时计算模式》一书中的第1章,第1.5节,作者:(美)P. Taylor Goetz Brian O’Neill 更多章节内容可以访问云栖社区“华章计算机”公众号查看。

1.5 理解数据流分组

看了前面的例子,你会纳闷为什么没有增加ReportBolt的并发度。答案是,这样做没有任何意义。为了理解其中的原因,需要了解Storm中数据流分组的概念。
数据流分组定义了一个数据流中的tuple如何分发给topology中不同bolt的task。举例说明,在并发版本的单词计数topology中,SplitSentenceBolt类指派了四个task。数据流分组决定了指定的一个tuple会分发到哪个task上。
Storm定义了七种内置数据流分组的方式:

  • Shuffle grouping(随机分组):这种方式会随机分发tuple给bolt的各个task,每个bolt实例接收到的相同数量的tuple。
  • Fields grouping(按字段分组):根据指定字段的值进行分组。比如说,一个数据流根据“word”字段进行分组,所有具有相同“word”字段值的tuple会路由到同一个bolt的task中。
  • All grouping(全复制分组):将所有的tuple复制后分发给所有bolt task。每个订阅数据流的task都会接收到tuple的拷贝。
  • Globle grouping(全局分组):这种分组方式将所有的tuples路由到唯一一个task上。Storm按照最小的task ID来选取接收数据的task。注意,当使用全局分组方式时,设置bolt的task并发度是没有意义的,因为所有tuple都转发到同一个task上了。使用全局分组的时候需要注意,因为所有的tuple都转发到一个JVM实例上,可能会引起Storm集群中某个JVM或者服务器出现性能瓶颈或崩溃。
  • None grouping(不分组):在功能上和随机分组相同,是为将来预留的。
    Direct grouping(指向型分组):数据源会调用emitDirect()方法来判断一个tuple应该由哪个Storm组件来接收。只能在声明了是指向型的数据流上使用。
  • Local or shuffle grouping(本地或随机分组):和随机分组类似,但是,会将tuple分发给同一个worker内的bolt task(如果worker内有接收数据的bolt task)。其他情况下,采用随机分组的方式。取决于topology的并发度,本地或随机分组可以减少网络传输,从而提高topology性能。

除了预定义好的分组方式之外,还可以通过实现CustomStreamGrouping(自定义分组)接口来自定义分组方式:


《Storm分布式实时计算模式》——1.5 理解数据流分组https://yqfile.alicdn.com/f2e2e58a3937d8f22b3e66c66ca1cacba5fab5c3.png
" >

prepare()方法在运行时调用,用来初始化分组信息,分组的具体实现会使用这些信息决定如何向接收task分发tuple。WorkerTopologyContext对象提供了topology的上下文信息,GlobalStreamId提供了待分组数据流的属性。最有用的参数是targetTasks,是分组所有待选task的标识符列表。通常,会将targetTasks的引用存在变量里作为chooseTasks()的参数。
chooseTasks()方法返回一个tuple发送目标task的标识符列表。它的两个参数是发送tuple的组件的id和tuple的值。
为了说明数据流分组的重要性,我们在topology中引入一个bug。首先,修改SentenceSpout的nextTuple()方法,使每个句子只发送一次:


《Storm分布式实时计算模式》——1.5 理解数据流分组

程序的输出是这样的:


《Storm分布式实时计算模式》——1.5 理解数据流分组https://yqfile.alicdn.com/20abac2e6b8ea5fa2dc3f129f5f13ad3b09ef5ee.png
" >


《Storm分布式实时计算模式》——1.5 理解数据流分组

然后将CountBolt中按字段分组方式修改为随机分组方式:


《Storm分布式实时计算模式》——1.5 理解数据流分组

运行程序的结果是这样的:


《Storm分布式实时计算模式》——1.5 理解数据流分组https://yqfile.alicdn.com/4a768e6f173acb4230fd61b9f0b90d0317760b24.png
" >

结果是错误的,因为CountBolt的参数是和状态相关的:它会对收到的每个单词进行计数。这个例子中,在并发状况下,计算的准确度取决于是否按照tuple的内容进行适当的分组。我们引入的bug只会在CountBolt并发实例超过一个时出现。这也是我们为什么一再强调,要在不同的并发度配置下测试topology。
通常,需要避免将信息存在bolt中,因为bolt执行异常或者重新指派时,数据会丢失。一种解决方法是定期对存储的信息快照并放在持久性存储中,比如数据库。这样,如果task被重新指派就可以恢复数据。