本文基于InfluxDB之前开源的cluster版本(v0.11)来分析一下其cluster方案,主要分析如下问题:
- InfluxDB cluster方案提供了哪些模块?
- InfluxDB cluster方案中meta node元数据都有哪些?
- InfluxDB cluster方案提供的与集群交互的client客户端是什么方式?
- data node之间是如何交互的呢?数据格式什么样?
- InfluxDB cluster方案中 meta node 一致性怎么保证的?
- InfluxDB cluster方案提供的hintedoff具体是怎么工作的?
InfluxDB cluster方案提供了哪些模块?
关于这个问题,这篇文章讲解的非常好,主要包括Meta Node和Data Node两个模块。各个模块的工作模式和原理已经在这篇文章中讲解的很清楚了,这里就不在赘述。
InfluxDB cluster方案中meta node元数据都有哪些?
cluster meta data 都在influxdb/services/meta/data.go中,其结构体如下所示,之前我们介绍过单机版的InfluxDB中的meta,相比单机版的,cluster版本的元数据主要多了MetaNodes和DataNodes两个属性,其对应的数据结构也很简单,就是描述了一个node的信息。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20type Data struct {
Term uint64 // associated raft term
Index uint64 // associated raft index
ClusterID uint64
MetaNodes []NodeInfo
DataNodes []NodeInfo
Databases []DatabaseInfo
Users []UserInfo
MaxNodeID uint64
MaxShardGroupID uint64
MaxShardID uint64
}
// NodeInfo represents information about a single node in the cluster.
type NodeInfo struct {
ID uint64
Host string
TCPHost string
}
由此可见,meta信息中除了汇总了单机版的meta信息之外,还多了一些meta node和data node的信息。
InfluxDB cluster方案提供的与集群交互的client客户端是什么方式?
InfluxDB提供了两种与集群交互的方式:(1)influxDB CLI/Shell (2) influx client。首先声明一下,client是直接与data node交互的。
下面我们简单介绍一下这两种方式是怎么与cluster交互的。
influxDB CLI/Shell
在其官方文档中描述了具体的用法,但是这里我们主要想知道其实怎么发现cluster集群的,通过阅读文档,我们可以知道,有如下两个参数,可以指定连接的node(这里的node如果是meta node则就可以对meta 信息操作;如果是data node,则就可以对data node的数据进行操作)的ip和port
1 | -host 'host name' // The host to which influx connects. By default, InfluxDB runs on localhost. |
所以,可以说influxDB CLI是通过指定ip和port来连接到cluster集群中的。
HTTP API Client Libraries
在其官方文档中列出了一些可以使用的能够连接到influxdb cluster的客户端,我们以其官方提供的influxdb1-client(备注:这里面的代码与influxdb/client包下面的类似)为例来简单介绍下;其官方提供了如下的一个例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14func ExampleClient_query() {
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086", // 这个localhost可以换成meta/data node中的任何一台机器。(这里的node如果是meta node则就可以对meta 信息操作;如果是data node,则就可以对data node的数据进行操作)
})
if err != nil {
fmt.Println("Error creating InfluxDB Client: ", err.Error())
}
defer c.Close()
q := client.NewQuery("SELECT count(value) FROM cpu_load", "mydb", "")
if response, err := c.Query(q); err == nil && response.Error() == nil {
fmt.Println(response.Results)
}
}
小结
可以看到用户与Influxdb cluster交互的方式本质就是rest api;Influxdb cluster提供的与cluster交互的cli比较特殊,如果ip指定了是meta node,则就可以跟meta node交互,如果指定了是data node,则就可以读取/写入数据(这个也可以在Cluster Node Configuration文章中得到证明)。这个还是真的第一次见。。。为什么不在封装一下,提供一个统一的操纵meta/data数据的入口呢?
client是怎么与集群cluster交互的(以数据写入为例)
关于influxdb cluster中的数据写入,此文章进行了深入的分析,我们这里主要补充前半部分,即代码是如何走到WritePoints函数的:
1 | services/httpd/handler.go::NewHandler()102行->services/httpd/handler.go::serveQuery()305行->cluster/query_executor.go::ExecuteQuery()73行->cluster/query_executor.go::executeQuery()120行->cluster/query_executor.go::executeSelectStatement()472行->cluster/query_executor.go::writeInto()849行->cluster/points_writer.go::WritePointsInto()230行->cluster/points_writer.go::WritePoints()234行 |
综上,还是通过rest api访问的!points_writer.go中的PointsWriter struct中,有MetaClient接口,MetaClient的实现就是在service/meta/client.go里面的,这个client拥有meta server相关的元信息以及cluster meta data(InfluxDB cluster方案中meta node元数据都有哪些?章节提供的代码),所以就能够知道往哪个节点上写数据。以写数据为例,可能要写的数据的shard group并不一定存在,所以就会在此client中调用CreateShardGroup函数调用retryUntilExec去meta server中创建shard group,然后client中还有一个pollForUpdates函数,一直去meta server上面拉取meta data 元信息,当有变化的时候就去更新本地的cache。
具体详情请参考Influxdb Cluster版本中的Meta。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21func (c *Client) pollForUpdates() {
for {
data := c.retryUntilSnapshot(c.index())
if data == nil {
// this will only be nil if the client has been closed,
// so we can exit out
return
}
// update the data and notify of the change
c.mu.Lock()
idx := c.cacheData.Index
c.cacheData = data
c.updateAuthCache()
if idx < data.Index {
close(c.changed)
c.changed = make(chan struct{})
}
c.mu.Unlock()
}
}
Metadata Client概述
- 定义在 services/meta/client.go中;
- Cluster 版本中的Meta是本地的一个内存缓存,数据来源MetaServer;
- 对Meta的所有写操作,也将通过http+pb的方式发送到MetaServer, 然后阻塞等待从MetaServer返回的新的Metadata通知;
- MetaClient通过http long polling来及时获取Metadata的变化;
data node之间是如何交互的呢?数据格式什么样?
各个data node之间实际上是通过建立tcp,然后发送数据交互的。建立连接的代码是在cluster/shard_writer.go 中的connFactory::dial()函数。
其request,response数据格式如下:1
2
3
4
5
6
7
8
9
10
11
12cluster/internal/data.proto
message WriteShardRequest {
required uint64 ShardID = 1;
repeated bytes Points = 2;
optional string Database = 3;
optional string RetentionPolicy = 4;
}
message WriteShardResponse {
required int32 Code = 1;
optional string Message = 2;
}
InfluxDB cluster方案中 meta node 一致性怎么保证的?
raft实现:https://github.com/hashicorp/raft
存储: https://github.com/hashicorp/raft-boltdb
InfluxDB cluster方案提供的hintedoff具体是怎么工作的?
请参考Influxdb Cluster下的数据写入 数据按一致性要求写入 章节
参考文献
Influxdb · 源码分析 · Influxdb cluster实现探究
InfluxDB CLI/Shell
HTTP API Client Libraries
Influxdb Cluster下的数据写入
Cluster Node Configuration
Influxdb Cluster版本中的Met