当前位置:首页 » 计算机系统

海量时间序列数据的实时查询系统(Druid系统)概述

2015-09-21 11:23 本站整理 浏览(781)

Druid是为处理海量时间序列数据而设计的一款开源的OLAP系统。它是分布式处理系统,在设计之初就采纳了线性扩容和高可用的设计方案。集群的规模可以随着业务的增长而动态增加。要查询的数据集都做了分片(partition)和副本(replication),可以在集群部分机器down机之后,查询服务依然可用。Druid采用模块化设计的理念,模块之间的依赖通过Guice依赖注入框架自动进行,Druid支持用户自定义的组件,方便用户扩展。

Druid擅长按照不同的时间粒度(Time Granularity)和各种维度(Dimensions)组合去查询聚合数据,不擅长原始数据的查询。Druid的实时性体现在两个方面。一是Druid查询的响应时间非常快,大部分查询会在10s以内返回。二是能够查询的数据的时效性非常高,进入Druid采集系统的数据,就能被立即查询到,延迟在ms级别。

数据源

Druid仅支持时间序列类型的数据源,例如

时间(timestamp)列标示事件发生的时间,必须存在。该数据在进入采集系统后,会按照配置的时间粒度(半小时,小时,天,周,月,年等)进行聚合。

维度(dimensions)列标示数据里面的维度信息,一般为string类型。

度量(metrics)数据标示数据里面的各种度量信息,数据一般为数据类型

Druid系统在做数据查询时,一般会按照时间粒度和维度做分组,然后在各个分组内部聚合度量数据,聚合的方法有sum,mean,max,min等。Druid系统也支持自定义的聚合函数。

核心服务

Druid的核心服务有两种:数据采集服务和数据查询服务,为了保证数据的时效性,所有采集服务的数据均可以被查询系统所查询。Druid系统服务内部有一系列相互独立的服务进程构成,进程之间不会直接通信(比如RPC),而是通过zookeeper进行间接通信。查询的接口以restapi的方式提供给上层。

数据采集服务

时间序列数据通过采集服务进入到Druid集群。Druid不保存时间序列的原始信息,仅保存聚合之后的数据集。聚合一般按照时间和数据维度进行。为了加快查询的速度,Druid会建立采集数据的索引,以加快查找。同时为了避免查询过程中的对无关数据的扫描,采用列式存储的数据结构。Druid采集方式主要有实时采集和离线批处理采集两种。

实时采集有两种运行方式:

l Real-time Nodes采集。在实际采集工作中,Real-time Nodes以集群形式运行,进程运行在不同的机器上。Real-timeNodes一般和消息队列服务(比如Kafka)共同工作。在数据采集过程中,Real-timeNode每隔10分钟(系统默认值,用户可自由配置)会向消息队列服务记录commit读取的位置。 Real-time Node支持failover机制,在服务进程down掉之后,如果迅速重启,不会带来任何数据丢失,进程会从消息队列commit的读取位置重新读取数据。服务重新启动并开始提供查询服务一般耗时几秒钟,在此期间约有最多一个ingestingtime

granularity的数据不可查。如果由于特殊原因导致某台采集机器down机很长时间,这时该机器上的最多一个ingestingtime granularity的数据不可查。新的数据一般会通过消息队列采集端的rebalance策略重新分配给别的采集集群。Real-time采集在消费数据是采用pull的方式,很难做到数据的replication。该采集方式,官方已不建议使用。

l Index service real-time task采集。Index service real-timetask会在indexservice服务的MiddleManager进程运行Real-timeNode节点服务。数据源依然可以选择消息队列(例如Kafka),不过官方建议在实际使用中,采用事件的pushapi(Tranquility)的方式,由采集客户端push给index 服务进程。采用push方式的api,在index service内部可以容易做到事件的自动replication,增加采集程序的稳定性。

离线采集有三种运行方式

l Hadoop Index 任务,该任务采用Hadoop map reduce的方式,对历史数据批量生成Druid的数据分块文件(datasegment),数据分开文件产生后,用户需要自己把数据块的原信息导入数据库中,官方已不推荐使用。

l Index service local index task,该任务数据源一般存放在Middle Manager的本地磁盘,处理能力有限,处理时间很长,仅适合用于少量数据的导入。

l Index service Hadoop index task,该任务也是通过Hadoop map reduce 的方式,对历史数据做批量处理。在数据块文件建立之后,会自动把元数据信息导入数据库。

以上两类采集方式,在实际应用中各有千秋。实时采集能保证数据的时效性,刚刚发生的事件都可以被查询到;缺点是对事件的处理很难做到exactlyonce,很可能会出现一个事件被聚合多次的情况,不适合对数据准确性要求特别高的场合。虽然Druid官方采取了很多措施来争取做到事件的exactlyonce处理,但是现在的版本依然很难做到。在某些极限情况下,必然会导致同一事件的重复处理。离线数据采集,优点是采集数据吞吐量非常大,事件必然是exactlyonce的处理,数据的准确度较高,缺点是local的index task能处理的数据非常有限,而Hadoop的task要求客户装有Hadoop集群(这点在实际应用中一般不是问题,因为时间序列的原始数据一般会保存下来,并作进一步的分析处理),

离线的采集程序做不到数据的实时响应,延迟时间较长。

鉴于以上两类采集服务的优缺点,在实际使用中,可以采用实时采集和离线采集相结合的方式,来保证数据的准确性和实效性。事件数据首先进入消息队列,然后一份数据进入实时采集服务,提供数据的实时查询,另外一份数据存入hdfs文件系统,通过定时任务的方式驱动Hadoop的index 任务,实现对实时采集到的数据进行更新。

Druid数据采集服务有两种运行模式,

1. Real-time Node + Hadoop MR job,该模式下两种处理方式相互独立,Real-time Node以采集集群的形式运行,而HadoopMR job以Hadoopjar的形式运行,是Druid在早期版本推荐的处理方式。其中Real-timeNode方式很难做到数据的多副本采集,而HadoopMR job只能产生datasegments,用户还需要把数据导入到deepstorage和数据库。

2. Index Service,是Druid为解决统一接口的数据采集而开发设计的一套新的采集系统,它能够支持老的Real-timeNode Hadoop MR job运行方式,并且解决了以上运行方式的上述弊端。

Real-time Node+ Hadoop MR job的架构比较简单,不再详细陈述。而Indexservice采用Master/Slaver的架构,核心组件有两部分组成,一是Overload节点,一是MiddleManager节点。Overload节点接受所有的indextasks,并分配给MiddleManager运行,并监控MiddleManager的运行状态,做到MiddleManager节点运行失败时的自动切换。MiddleManager会在进程内部开启多个处理线程(peon)来处理具体的采集任务。Overload节点和MiddleManager节点不直接通信,而是通过zookeeper间接通信,其架构如图

查询服务

Druid查询服务有四个服务进程组成,Broker Node,HistoricalNode,Real-timeNode 和CoordinatorNode。

Broker Node是对外统一提供restful查询接口的节点,负责接受用户发送的查询json,生成查询计划,按照数据的时间跨度去HistoricalNode和Real-timeNode做分布式查询,之后merge所有节点产生的查询结果并返回给用户。BrokerNode会缓存DataSegments的元数据(该信息从zookeeper获取,并在zookeepr数据更新时,同步更新),所以即便zookeeper服务down掉,也不会影响已有的查询,只是不能查询新加入的数据块而已。为了加快查询的速度,BrokerNode会在内部对查询的结果按照datasegment做本地缓存,也可以配置分布式缓存(Mecache

或者Redid)在所有BrokerNode之间共享缓存数据。在生产环境,一般建议部署分布式缓存服务,以加快查询速度。所有Broker节点的功能和配置是一样的,也就是说用户的查询发送给任何一台Brokder节点,在生产环境,可以把所有的Broker接点利用LBS做负载均衡。

Historical Node,负责向Broker Node返回历史数据的查询结果。归集好的历史数据一般存储在deepstorage(s3或者hdfs中),它是不可改变的,由CoordinatorNode协调各个HistoricalNode按照某种策略从deepstorage 加载数据。大部分情况下,由于用户对时间序列数据查询的热度不一样,比如最近一个月发生的数据经常被查到,而过去几个月的数据可能很少被查询。HistoricalNode可以multi-tie部署,把热点数据部署在配置较高的机器上,而把冷数据部署在配置低的机器上。

Real-time Node,负责向Broker Node返回正在采集的数据的查询结果,由于该部分数据时刻变化,BrokerNode永远不会缓存Real-timeNode的数据。Real-timeNode查询的数据跨度较低,只能查询最近一个的时间短的数据,负载较小。Real-timeNode查询服务可以运行在单独的Real-time进程,也可以运行在IndexService的某台MiddleManager机器上。

Coordinator Node负责协调各Historical Node的数据,它不会处理任何与查询相关的操作,是查询服务的辅助组件。在CoordinatorNode配置历史数据的副本和数据加载策略,做到HistoricalNode数据的冗余备份。Coordinator会监控HistoricalNode的运行状况,并在某些HistoricalNode挂掉之后,把该节点的数据分配到别的HistoricalNode。Coordinator在一个Druid集群里面可以配置多个,不过只有一个处于活动状态,如果活跃的Coordinator挂点之后,系统会由zookeeper选举出新的leader节点,并提供服务。

Broker Node,Historical Node,和Real-timeNode查询接口都是restapi形式,支持的查询语句均为json格式。

其他服务

Druid是专为查询而设计的系统,数据块的存储和元数据的存储需要依赖意外的系统Druid所有数据均要存放在Deep Storage(s3或者hdfs)系统之上,Deepstorage只用来存放索引好的数据,不提供存储之外的其他服务,Deepstorage 上的数据只有加载到HistoricalNode才能被用户查询。

Druid依赖数据库(比如Mysql)去存储data segments的原信息。

Druid依赖zookeeper作为分布式协处理服务。

在实际运行中,Druid采集的事件数据一般存放在消息队列系统之上,所以也会依赖像kafka一样的消息队列服务。

数据流

Druid系统的数据流转如下图

实时采集的数据会首先进入到Real-time Node,并在一段时间后,加载到DeepStorage,加载过程中会更新Mysql的数据库。离线采集的数据会直接更新到Deepstorage环境,并写入Mysql数据库。HistoricalNodes会按照zookeeper的指示,从Deepstorage加载数据块,并提供查询服务。

部署

在生产环境,对于热点的Historical Nodes建议配置大内存和SSD存储。对于Broker节点建议采用配置大内存。Real-timeNodes和CoordinatorNodes的配置一般可以低一些。服务的监控,Druid集群所有节点都会定时上报一些系统监控信息,比如cpu、mem,io的使用情况;Druid每次查询结果的度量信息也会通过上述端口上报。上述端口的数据可以通过http请求统一发送给kafka集群,然后在Druid集群内部建立一个运行状况表,专门收集此信息。