Druid调研

Druid初期是由metamarkets的技术人员开发,用于向买家、卖家、广告主进行广告展示的底层实时分析平台,为OLAP的事件查询而设计。目前已发展成一个开源的实时数据库。

时序数据库(TSDB)是一种特定类型的数据库,主要用来存储时序数据。

架构

Druid的架构比较灵活,是一个集群模式,由多个不同类型的节点组成,并且各个节点也能组成自己的小集群以增加可用性和扩展性。

Druid中每类节点都只完成一小部分事情,节点分类如下:

  • Historical节点
    Historical节点是Druid集群的骨干,用于负责存储和查询历史数据。它从Deep Storage下载segments到本地,响应从broker节点上分发到查询请求,并将结果数据返回给broker节点。Historical节点采用shared nothing的架构,所以每个节点能够独自加载segments、删除segments、以及为segments提供查询服务。

Historical节点与Zookeeper进行通信(Historical节点之间并不通信),将该节点上已加载的segments和正在提供服务的segments记录在zk上,zk会通知Historical节点进行segments的加载和删除

  • Broker节点
    Broker节点是客户端和应用程序从Druid查询数据的入口。Broker节点负责分发查询,将查询请求分发给实时节点和历史节点,以及收集和合并来自实时节点和历史节点的结果。Broker节点通过Zookeeper确定segments是在实时节点还是历史节点存活

  • Coordinator节点
    Coordinator节点通过Metadata Storage和zk管理集群中historical节点上的segments。Coordinator从Metadata Storage中获取segments的元数据信息,来决定哪些segments需要加载到历史节点集群中,利用zk判断历史节点集群中某些节点是否存在。当Historical需要加载或者删除一些segments时会在zk上创建一些entries。

Coordinator不与Historical通信,直接与zk进行通信

  • Realtime节点
    实时节点负责加载实时的数据到系统中。

  • Indexing Service节点
    索引服务节点由多个worker组成的集群,负责为加载批量的和实时的数据创建索引,并且允许对已经存在的数据进行修改。

Real-time处理目前可以通过独立的realtime节点,或者通过indexing service节点实现,这两种方式都很常见。 Real-time处理包括加载数据、创建数据索引(创建segments)、以及交接segments给historical节点。实时处理逻辑加载之后数据立马可查。数据交接的过程也是安全的,数据在整个流程中都保持可查。

Druid集群还会依赖一些外部组件:

  • Zookeeper
    Zookeeper主要作用是帮助群集服务发现和维护当前数据的拓扑结构,Druid依赖Zookeeper来保证集群内的信息一致。

  • Metadata Storage
    Druid依赖metadata storage存储segments的元数据和配置。创建segments的服务在元数据中记录信息,coordinator监听着元数据以便了解什么时候需要下载新数据或者删除旧数据。 元数据的存储不涉及查询的路径。MySQL和PostgreSQL非常有利于生产环境下元数据的存储

  • Deep Storage
    Deep storage是segments的永久备份。创建segments的服务(实时或者离线)上传segments到Deep storage,然后historical节点下载。Deep storage不涉及查询路径。 S3和HDFS是比较推荐的deep storages

Druid集群整体架构图如下:
Druid架构图
上面Druid架构图中展示了Druid集群中内部组件和外部依赖组件的整体架构,下面再来一张官方的架构图,和上面的架构图大体一样,我感觉两个图结合起来看就比较完美了。嘿嘿。。
Druid架构图-官方

特点

高可用性
由于Druid使用的是shared noting架构,所以其内部并无单点故障,不同类型的节点失败也不会影响到其他类型节点的正常服务。

可扩展性
Druid中的每类节点都可部署为集群模式,使Druid处理能力能够水平扩展。

亚秒级响应
官网说10亿量级下做到亚秒响应,能够做到实时导入,导入即可查询。

Roll-up
在数据接入时对原始数据选定一系列维度之后进行第一级聚合,减少数据量,提高查询速度和减少存储空间。

数据源

Druid数据源支持多种格式,比如json、csv或者是用特殊字符分割的数据。
数据来源主要分为批量数据和实时数据。接入方式主要有3种:

  • 从文件中批量加载,数据文件可以存放在本地、HDFS和S3等文件存储系统。
  • 实时推送数据,是使用Tranquility通过HTTP推送数据到Druid。Tranquility是一个单独的工具包,并没有和Druid做集成,需要单独下载,Tranquility启动一个server进程,通过HTTP将数据发送给Druid,而不需要启动一个JVM之类的进程。

Tranquility是用scala编写的,主要是通过Druid indexing service来实时抽取数据,弥补Druid indexing service API的不足

  • 实时拉取数据,是使用Firehose与要读取的数据进行连接,然后Realtime节点实时从中抽取数据。

Firehose是一个可插拔的组件,是Druid中的消费实时数据模型,可以有不同的实现,Druid自带了一个基于Kafka High Level API实现的对于Kafka的数据消费(druid-kafka-eight Firehose)

数据接入时需要注意一下几点:

  • 数据集应该被什么调用?由”dataSchema”的”dataSource”字段设置
  • 数据集位于什么位置?文件路径在”inputSpec”的”paths”。如果加载多个文件,将字符串以逗号分隔。
  • 什么字段作为时间戳?由”timestampSpec”的”column”字段设置。
  • 什么字段作为维度?由”dimensionsSpec”的”dimensions”字段设置。
  • 什么字段作为度量?由”metricsSpec”控制。
  • 什么时间范围(时间间隔)被加载?由”granularitySpec”的”intervals”字段设置。

数据格式

Druid的数据集有三部分组成。分别为

  • Timestamp列: 将timestamp区别开是因为Druid是一个时间序列的olap工具,其中的查询都以时间为中心。

  • Dimension列: Dimensions对应事件的维度,通常用于筛选过滤数据。

  • Metric列: Metrics是用于聚合和计算的列。在OLAP的术语中也被叫做measures。

数据源配置

接入数据不需要代码的开发,只需要进行一些配置。
下面看个从HDFS接入数据的demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
{
"type" : "index_hadoop",
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "hadoopyString",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
}
},
"metricsSpec" : [
{
"type" : "count",
"name" : "count"
},
{
"type" : "doubleSum",
"name" : "added",
"fieldName" : "added"
},
{
"type" : "doubleSum",
"name" : "deleted",
"fieldName" : "deleted"
},
{
"type" : "doubleSum",
"name" : "delta",
"fieldName" : "delta"
}
],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "DAY",
"queryGranularity" : "NONE",
"intervals" : [ "2013-08-31/2013-09-01" ]
}
},
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type" : "static",
"paths" : "/MyDirectory/example/wikipedia_data.json"
}
},
"tuningConfig" : {
"type": "hadoop"
}
},
"hadoopDependencyCoordinates": "my_hadoop_version"
}

简单解释下这些配置项:
json中最外层有3个key,分别为type、spec和hadoopDependencyCoordinates,前两个是必须的。
如果从HDFS中加载数据,type始终是index_hadoopspec的value是数据的一些信息,也是一个json。
spec value就是DataSchema,由dataSchema、ioConfig和tuningConfig,如果tuningConfig不设置将提供默认值。
dataSchema描述了数据源的具体信息,包括数据的格式、Timestamp、维度和度量。
ioConfig描述了数据从何而来,和最终去向哪里。

查询

Druid的查询语言是json,通过HTTP REST提交给可查询的节点(Broker, Historical, or Realtime)。官方目前暂不支持sql查询,有第三方的插件可以支持sql,但支持度不是太完美。

Druid支持的查询包括聚合查询、搜索查询(类似模糊匹配)和选择查询。

下面简单看几个json查询语句。

  • 聚合查询json语句
    聚合查询比较复杂,又分为3个小类,分别为timeseries、topN和groupBy。
    这里展示一个timeseries的json语句:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
"queryType": "timeseries",
"dataSource": "sample_datasource",
"granularity": "day",
"descending": "true",
"filter": {
"type": "and",
"fields": [
{ "type": "selector", "dimension": "sample_dimension1", "value": "sample_value1" },
{ "type": "or",
"fields": [
{ "type": "selector", "dimension": "sample_dimension2", "value": "sample_value2" },
{ "type": "selector", "dimension": "sample_dimension3", "value": "sample_value3" }
]
}
]
},
"aggregations": [
{ "type": "longSum", "name": "sample_name1", "fieldName": "sample_fieldName1" },
{ "type": "doubleSum", "name": "sample_name2", "fieldName": "sample_fieldName2" }
],
"postAggregations": [
{ "type": "arithmetic",
"name": "sample_divide",
"fn": "/",
"fields": [
{ "type": "fieldAccess", "name": "sample_name1", "fieldName": "sample_fieldName1" },
{ "type": "fieldAccess", "name": "sample_name2", "fieldName": "sample_fieldName2" }
]
}
],
"intervals": [ "2012-01-01T00:00:00.000/2012-01-03T00:00:00.000" ]
}

timeseries是按照某段时间区间作为数据查询区间的,这里使用的是2012-01-01到2012-01-03的数据,分别对sample_fieldName1和sample_fieldName2进行聚合,最后两个聚合指标相除。结果如下:

1
2
3
4
5
6
7
8
9
10
[
{
"timestamp": "2012-01-01T00:00:00.000Z",
"result": { "sample_name1": "some_value", "sample_name2": "some_value", "sample_divide": "some_value" }
},
{
"timestamp": "2012-01-02T00:00:00.000Z",
"result": { "sample_name1": "some_value", "sample_name2": "some_value", "sample_divide": "some_value" }
}
]
  • 搜索查询json语句
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"queryType": "search",
"dataSource": "sample_datasource",
"granularity": "day",
"searchDimensions": [
"dim1",
"dim2"
],
"query": {
"type": "insensitive_contains",
"value": "Ke"
},
"sort" : {
"type": "lexicographic"
},
"intervals": [
"2013-01-01T00:00:00.000/2013-01-03T00:00:00.000"
]
}

searchDimensions定义了要在哪些维度中进行搜索,query定义了搜索方式(本语句中的意思是包含Ke的维度值),结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[
{
"timestamp": "2012-01-01T00:00:00.000Z",
"result": [
{
"dimension": "dim1",
"value": "Ke$ha"
},
{
"dimension": "dim2",
"value": "Ke$haForPresident"
}
]
},
{
"timestamp": "2012-01-02T00:00:00.000Z",
"result": [
{
"dimension": "dim1",
"value": "SomethingThatContainsKe"
},
{
"dimension": "dim2",
"value": "SomethingElseThatContainsKe"
}
]
}
]
  • 选择查询json语句
1
2
3
4
5
6
7
8
9
10
11
12
{
"queryType": "select",
"dataSource": "wikipedia",
"descending": "false",
"dimensions":[],
"metrics":[],
"granularity": "all",
"intervals": [
"2013-01-01/2013-01-02"
],
"pagingSpec":{"pagingIdentifiers": {}, "threshold":5}
}

select查询支持分页显示,pagingSpec定义了分页的信息,这里只显示前5个,结果显示如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
[{
"timestamp" : "2013-01-01T00:00:00.000Z",
"result" : {
"pagingIdentifiers" : {
"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9" : 4
},
"events" : [ {
"segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
"offset" : 0,
"event" : {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "1",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "11._korpus_(NOVJ)",
"language" : "sl",
"newpage" : "0",
"user" : "EmausBot",
"count" : 1.0,
"added" : 39.0,
"delta" : 39.0,
"variation" : 39.0,
"deleted" : 0.0
}
}, {
"segmentId" : "wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
"offset" : 1,
"event" : {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "112_U.S._580",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 70.0,
"delta" : 70.0,
"variation" : 70.0,
"deleted" : 0.0
}
}, {
"segmentId" : "wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
"offset" : 2,
"event" : {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._243",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 77.0,
"delta" : 77.0,
"variation" : 77.0,
"deleted" : 0.0
}
}, {
"segmentId" : "wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
"offset" : 3,
"event" : {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._73",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 70.0,
"delta" : 70.0,
"variation" : 70.0,
"deleted" : 0.0
}
}, {
"segmentId" : "wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
"offset" : 4,
"event" : {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._756",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 68.0,
"delta" : 68.0,
"variation" : 68.0,
"deleted" : 0.0
}
} ]
}
} ]

数据生命周期

如果数据来源于Kafka,则Realtime节点从kafka消费数据,实时数据在Realtime节点中创建索引,存入节点内存,内存数据量超过阈值时(或者定期)时,将内存数据写到外存形成外存索引。 内存数据加上多个外存索引, Realtime节点以这种方式支持亚秒级数据可见性。
但是Realtime节点的容量和查询能力是有限的, 所以它会定期合并多个外存索引生成segment,(segment对应一段时间范围内的进入druid的数据)。segment生成之后马上被上传到deep storage,很快就会有Historical节点下载该segment,并替代Realtime节点提供查询服务。
Historcial节点从DeepStorage下载segment之后,由Coordinator节点通过Zookeeper来通知Historcial节点加载或者删除segment。

您的肯定,是我装逼的最大的动力!