系统设计
架构
系统包括以下几个模块:
- Client模块,业务方接入SDK,用于上报统计原始消息数据;
- RPC模块,用于接收客户端上报的统计消息数据并对外提供统计结果查询接口;
- Tasks运算模块,功能包括封装各种流式统计运算场景,执行限流规则判断,解析各统计项的配置信息,消费消息数据并按统计配置进行计算以及保存统计结果;
- Web模块,功能包括对统计组和统计项进行管理维护、查看统计结果、设置限流规则和管理统计指标访问权限。
系统默认集成的存储引擎为HBase,默认集成的RPC服务为Zeroc-ICE。系统支持自定义扩展,可根据自身情况选择相应的存储引擎和RPC服务。
系统设计
XL-LightHouse是通用型流式大数据统计系统,它将流式数据统计需求抽象分类成多种运算场景,并对各种运算场景进行高性能的封装从而让每一种运算可以达到无限制复用的效果。 XL-LightHouse使用【统计工程-统计组-统计项】的三层结构来管理所有统计需求。每一个统计需求叫做一个统计项。用户可根据需要创建若干个统计工程,每个统计工程可包含多个统计项,而基于同一份元数据的多个统计项叫做一个统计组。
多流并行处理计算模型
一个Flink任务只能并行处理一个或少数几个数据流,而XL-LightHouse一个任务可以并行处理数万个、几十万个数据流;一个Flink任务只能实现一个或少数几个数据指标,而XL-LightHouse单个任务就能支撑大批量、数十万计的数据指标。
我以Flink和Spark作为对比阐述XL-LightHouse计算模型的基本设计理念。
流式计算是一个很宽泛的概念,目前没有办法用特定的某些运算操作类型将其抽象出来。流式计算是基于事件流驱动的运算方式,常见的应用场景有:计算用户实时画像、实时推荐、监控告警、实时电信反诈骗等等。正是因为目前流式计算没有办法通过某些运算操作类型将其抽象,所以只能从“流式数据处理过程”这种宽泛的视角来解决此类问题。因此我们可以看到不管是Flink还是Spark,都有类似数据源(Source)、转化操作(Map)、执行操作(Action)、窗口(Window)、结果处理(Sink)这类概念,因为这些概念都是围绕着“流式数据处理过程”而衍生出来的。当然Flink的工程师为了扩充其适用场景、提高产品的完善度,提供了类似状态管理、水印、聚合函数等等功能,但也都不可能脱离流式数据处理过程的主线。
而反观流式统计虽然是属于流式计算的一种计算形式,但它其实是一种完全可以被抽象成几种运算操作类型的计算形式。绝大多数的流式统计无外乎Count运算、Sum运算、Bitcount运算(count distinct)、Max运算、Min运算、Avg运算、Seq运算(时序数据)、Dimens运算(维度划分)、Limit运算(topN/lastN),然后再结合时间窗口(滚动窗口、滑动窗口)的划分就可以完成一个个的流式数据统计需求。
任何一类业务需求只要可以被抽象成若干种运算操作类型,那就一定可以为此类业务需求设计出一种与其适配的通用化解决方案。这个过程就像Photoshop将所有图片处理的操作抽象出移动工具、钢笔工具、圈选工具、裁剪工具、橡皮擦工具等,关系型数据库将所有数据相关操作抽象出增加、删除、修改、查询、事务、索引等过程类似。而很显然Flink、Spark面对流式计算场景,自身都没有基于这种“功能抽象”的概念来实现。 正因为Flink、Spark在流式处理方面都是基于宽泛的流数据处理过程所设计,所以流式计算的实现并不能算是一种“通用化”解决方案。
每个Flink任务只能并行处理一个或少数几个数据流,而窗口则对应与之相应的数据流。这种设计如果从流式计算的角度来看并无问题,但如果从流式统计这个细分领域的角度来看却明显先天不足。所以我一直认为:Flink和Spark称得上是优秀的流式计算工具,但根本不能算是优秀的流式统计工具。流式统计是一种可以被抽象的计算形式,所以必然能够为其设计出一套通用型且性能远远超过Flink和Spark的技术方案。
XL-LightHouse作为适配流式统计的技术方案,它不再是围绕着“流数据处理过程”这条主线,而是围绕着“运算操作类型”来实现。它将所有流式统计需求抽象分类成多种运算场景,每个统计需求都是基于其中的一种或多种运算场景来实现。系统中统计项配置包括了其计算规则、维度信息和统计周期等元素,在系统接收到统计消息后,系统将原始消息按照统计项标识、运算函数类型、维度信息和统计周期划分成一个个算子。这些算子分属于众多的统计指标,它们之间彼此独立,但却可以同时并行运行在一个进程当中,每个数据流本身的运算过程中不再有窗口的概念。XL-LightHouse虽然也会按窗口批次来处理消息,但这个窗口跟Flink任务中的窗口已有本质区别,这里的窗口是相对于大批量的数据指标汇总的数据流整体来说的,更多的只是作为事件触发的功能而已。
这种设计更加贴合流式统计的运算特点,已然完全不同于Flink和Spark这种数据流彼此之间处于资源隔离的运算形式,打破了流式计算的束缚,大大提高了集群资源的利用效率。也正因如此,XL-LightHouse一个任务就可以同时并行处理数十万条数据流,单个任务就能够支撑庞大量级数据指标,这种优势是Flink和Spark刻板的使用流式计算的方式去解决流式统计的问题永远都无法比拟的。 很多时候无关乎技术,一个软件本身的定位就已经决定了它在某个领域所能企及的上限。软件自身或许并无优劣,而只是侧重点不同而已。在流式统计这个细分领域,面对企业繁杂的流式数据统计需求,XL-LightHouse的运算性能瞬间就可以秒杀Flink和Spark之类的解决方案。
自定义流式统计规范(XL-Formula)
SQL规范在大数据查询和统计分析方面被广泛应用,SQL在离线数据分析、OLAP、OLTP等领域都具有不可撼动的地位。而且随着FlinkSQL和SparkSQL等组件功能的日趋完善,SQL在流式统计领域也开始被越来越多的使用。但是由于SQL本身是基于数据表的概念进行数据处理,不可避免需要存储较多的原始数据和中间态数据在内存中,造成较高的内存浪费;分布式SQL在数据处理过程中会触发Shuffle,造成大量的网络传输,影响执行效率;SQL在一些分组聚合操作可能引起较为严重的数据倾斜,对程序的正常执行造成影响;SQL语法过于臃肿和复杂、不够清晰简洁、多过滤条件的组合逻辑需要依赖较长的SQL语句来实现,不便于理解,书写较长SQL语句容易出错;SQL函数定制化扩展不够方便;SQL开发相对较复杂,实现相同功能SQL可能会有多种写法,不同写法执行和解析效率也各有差异,并且很多SQL计算任务需要依据数据量和运算逻辑进行特定优化。
这些问题使得相应功能的实现需要依赖专业的数据研发人员,导致统计任务研发成本高、周期长。当企业数据指标呈现指数级增长时,基于SQL规范的技术方案的瓶颈也将凸显出来,需要耗费大量的研发成本、数据维护成本和服务器运算成本。我认为SQL规范的这些问题限制了它在流式统计这个细分场景内的快速扩张,使得SQL在这个细分领域内的应用基本局限在定制化需求开发的范围之内。
XL-LightHouse作为一套通用型流式大数据统计系统,侧重于帮助企业解决繁杂的流式数据统计问题。它并没有拘泥于现行的大数据领域的业内标准,而是寄希望通过使用更为轻巧的技术方案解决目前企业所面对的问题。它定义了一套较为完善的用于描述形式各样的流式统计需求的配置规范,通过各个属性的组合可以实现非常强大的统计功能,从而帮助企业低成本、快速的搭建起一套较为完善的、稳定可靠的数据化运营体系。
计算环节逐层递减原则
为了更加贴合流式统计运算特点,XL-LightHouse的数据消费链路是一个逐层递减的结构。
系统将整个数据消费链路分成以下基本环节:Client模块上报消息数据环节、RPC模块处理消息数据环节、运算模块执行展开和分组操作环节、统计结果存储环节。在每个环节系统使用异步处理、批量消费、对重复性计算进行聚合处理的方案。各环节接收到消息后放入消息缓冲池,系统依据各环节的预定义聚合逻辑将消息划分成不同的计算类型,对单节点单进程内相同类型的消息进行聚合处理。这种设计可以减少数据向下游传输、提升网络IO效率、又可以直接减少下游运算量以及DB的写入压力。从Client端发送消息到最终的统计结果入库中间的每个环节都对重复性消息进行聚合处理尽可能减少消息量,并且将与下游运算无关的参数都会尽早抛弃掉。系统各环节的消息聚合逻辑略有不同,以Client模块上报消息数据环节为例消息聚合主要包括以下内容:
- 消息体参数裁剪
为了提高消息的传输速度并提升后续步骤消息聚合效率,Client模块对原始消息进行裁剪操作,其目的是去掉统计无关字段。统计无关字段是系统根据各统计组下所有有效统计项计算得来,对于与所有有效统计项均不相关的字段在Client模块上报数据之前将其过滤掉。
- 篡改消息体时间戳
Client模块上报消息环节在执行聚合操作前修改消息原始时间戳为最小批次时间,其目的是为了后续步骤中在保证数据准确性的前提下能够将尽可能多的消息聚合到一起。Client模块以当前统计组下所有有效统计项的统计周期的最大公约数为时间窗口,按照该时间窗口和消息原始时间戳计算得到消息所对应的最小批次时间。Client模块将消息原来的时间戳修改为最小批次时间后再放入缓冲池。
- 聚合操作
聚合操作即为将同类型消息按预定义聚合逻辑合并到一起。不同环节的聚合逻辑略有不同,Client模块的聚合逻辑是指消息内容一致的消息,即为相同统计组、相同参数值的消息。原始消息发送到缓冲池后消费线程组定时从缓冲池中批量读取消息,并将其中符合聚合规则的消息聚合到一起。经过聚合操作后消息体的数据结构由单条消息体内容变更为消息体内容和消息体重复次数两个属性。
消息展开与分组
在XL-LightHouse中集群内的所有统计任务共用集群运算资源,运算模块接收到数据后对统计消息进行展开和分组操作。
- 消息展开操作
在大多数业务场景中针对一份元数据往往有多个数据指标,统计组下的所有统计项共用一份原始数据消息。展开操作即为查询统计组下有效统计项,提取各统计项的关联字段,为各统计项复制一份单独的消息数据并只保留其运算相关字段的过程。展开操作的目的是为了避免各统计项的后续运算逻辑相互之间产生影响。
- 消息分组操作
分组操作即系统依据统计项标识、运算函数类型、时间周期和维度标识,将统计项计算逻辑拆分成若干个算子的过程。
消息缓冲池
聚合逻辑所依赖的消息缓冲池基于有界优先阻塞队列实现。系统将消息缓冲池分成若干个Slot,每个Slot的组成结构包括一个BoundedPriorityBlockingQueue(有界优先阻塞队列)和Slot对应的最后访问时间戳。消息缓冲池的处理逻辑包括以下步骤:
- Producer按照不同环节的聚合逻辑生成消息事件的Key,Key用于区分是否为相同计算类型的消息;
- 消息缓冲池依据消息Key按照Hash取余分配对应的Slot;
- 按照预定义时间窗口划分消息到不同的处理周期;
- Slot对相同处理周期的消息按照Key进行优先排序,不同处理周期的消息按处理周期排序;
- 消费线程组定时轮询各个Slot;
- 判断Slot的使用容量是否超出阈值,阈值为batchsize * backlog_factor,其中batchsize为指定的单次消费最大消息数量,backlog_factor为指定的消息积压系数;
- 如果Slot使用容量没有超出阈值,则继续判断Slot的上次消费访问时间,如果超出时间阈值则读取消息批量消费,否则跳过本次任务。 消费Slot消息后同时更新Slot使用容量以及最后访问时间。
该消息缓冲池实现可以高效率的将尽可能多的相同计算类型的消息聚合到一起处理,减少对下游运算量和DB写入压力。
基数运算
XL-LightHouse对每一种运算类型都进行了高性能封装,以使其可以达到无限制复用的效果,以BitCount计算逻辑为例,阐述其实现方案。
bitcount基数运算是指distinct(非重复值数量统计),系统使用基数过滤装置过滤已存在的基数值,通过判定在过滤装置中不存在的基数数量然后更新DB中的统计结果从而实现基数统计。基数过滤装置包括内存基数过滤装置和分布式基数过滤装置两部分。内存基数过滤装置的作用在于初步判断基数值是否已存在,其作用在于内存判断效率更高,从而尽可能避免重复性的基数判断对整体性能的影响。内存基数过滤装置使用RoaringBitMap工具包实现。分布式基数过滤装置内含多个分片,每个分片对应一个RoaringBitMap数据存储结构,分片数可根据实际需要指定,通过提高分片数可以提高基数运算的准确度。分布式基数过滤装置的实现方案包括如下步骤:
- 将原始数值经过MurmurHash-128Bit生成原始数值对应的Long类型的Hash值。
- 设置统计任务所需的分片数,每个分片对应一个RoaringBitMap数据结构,本系统目前过滤装置采用Redis扩展Redis-Roaring插件的方式实现,原始数值对应的分片可通过Hash取余获得。
- 将Long类型的Hash值按高32bit和低32bit拆分成两个Int类型整数,如果为负数取其绝对值,两个Int值的组合对应原始值在RoaringBitMap数据结构中的Index值。
- 批量将多个基数值对应的Int值组合发送到Redis,将基数判断的多个操作使用Lua脚本合并执行。判断Int值组合是否在过滤装置中存在,如果两个Int值都在过滤装置中存在,则表示原始值已存在,否则为原始值不存在,如果原始值在过滤装置中不存在系统在判定完成后更新相应Index值。
- 统计在过滤装置中不存在的原始值的数量并更新到DB中。
该实现方案的好处在于基数运算不需要存储原始值可减少对内存的占用;使用MurmurHash-128Bit生成Index值从而不需要维护原始数值和Index的映射关系;RoaringBitMap算法本身具有压缩位图功能可以减少基数稀疏情况下的内存浪费的问题;使用Lua脚本实现基数过滤功能可以减少对Redis的访问次数提升整体性能;使用内存基数过滤装置进行初筛可以避免不必要的重复判定;通过调整分片数可以很方便的提升基数统计的准确率。
避免shuffle
在大数据任务的执行过程中shuffle是影响性能比较主要的一个因素,Shuffle除了会带来大量的网络开销还可能引起数据倾斜甚至OOM等问题。系统采用避免Shuffle这种不可控的因素从而规避Shuffle可能带来的不可预料的问题。运算模块基于Structured Streaming开发,采用完全规避Shuffle的计算方式,通过设置运算节点数量调整任务执行并行度,系统将单运算节点内的统计消息依据统计项标识、维度标识、时间批次、统计运算单元拆分成不同的计算类型。统计结果数据和中间态数据基于外部存储实现。本系统中统计结果存储在HBase中,bitcount基数运算的中间态数据存储在Redis中、limit运算的排序数据存储在Redis中。每个运算节点在运算过程中只与外部存储通信,不同运算节点之间互不影响。
统计限流
为了避免因为某个大数据量的统计需求的突然接入或某个统计项的流量暴涨而导致系统的不稳定,系统针对统计组消息量、统计项结果量、统计项运算量等维度的熔断保护机制。该限流保护机制的作用在于可以更好的保障整体服务的稳定性,目前包含以下策略:
- 统计组消息量限流
统计组消息量限流是针对单位时间内接收到的统计组消息数量的限流策略。系统内置统计组消息量计数装置用于计算单位时间内接收到的统计组消息数量。当单位时间内消息量超出阈值后触发限流,使当前统计组进入限流状态。Client模块以及Tasks模块自动抛弃非正常状态下的统计组消息。由于一个统计组可对应一个或多个统计项,所以该限流策略会影响统计组下所有统计项的正常统计。统计组进入限流状态后在指定时间内(默认20分钟)自动抛弃相应消息,当限流时间达到时间阈值后统计组自动恢复到正常状态。
- 统计项结果量限流
统计项结果量限流是针对单位时间内统计项生成的统计结果数量的限流策略。系统内置统计项结果量计数装置用于计算单位时间内生成统计结果的数量。当单位时间内结果量超出阈值后触发限流,使当前统计项进入限流状态。统计项结果量跟两个因素有关,一是统计周期的时间粒度,统计周期粒度越细的指标数据量越多,比如秒级和分钟级统计单位时间内生成的统计结果要多于小时级和天级的统计。第二个影响因素是维度,维度数量越多的统计项单位时间内生成的统计结果更多,比如以城市为维度的统计指标生成的统计结果量要高于以省份为维度的统计指标。统计项结果量限流是针对当前统计项的限流策略,所以只对当前统计项有影响,对统计组下其他统计项没有影响。当统计项进入限流状态后在指定时间内(默认20分钟)自动抛弃相应相应消息,当限流时间达到时间阈值后当前统计项自动恢复到正常状态。
时间戳压缩
系统针对流式统计场景对数据存储格式进一步优化,目的在于提高DB的数据吞吐量。系统统计结果数据存储采用时间戳压缩,根据统计周期划分成不同的时段,将每个统计项相同维度下的同一时段内的多个统计结果数值存储在不同的column内,列名采用delta压缩,同一时段内的数据使用相同的Key,减少Key值的重复。
异常熔断
熔断机制是为了保障业务方自身服务的稳定性,避免因统计服务的不稳定而对业务方自身服务产生影响。异常熔断机制是指在调用client接口时,如果单位时间内的失败次数或超时次数超出阈值,则进入熔断状态,此时client模块自动跳过统计消息发送逻辑。进入熔断状态后,client模块周期性检测统计服务状态是否恢复正常,如果统计服务恢复正常则自动重连。
系统功能边界
- 不支持原始数据明细查询;
- 暂时只涉及流式滚动窗口数据统计(滑动窗口统计将在后续版本中支持);
- 暂时不支持秒级粒度数据统计(将在后续版本支持);
- 不涉及原始数据采集细节,系统所有的计算都基于接入方上报的原始消息,原始数据消息需要接入方组装好通过SDK上报。目前只提供Java版本的SDK,针对JVM语言的服务,接入方可以在服务中直接调用。由其他语言开发的服务,可以将数据采集后存入kafka等消息队列再通过消费数据的形式接入XL-LightHouse。