IOTDB-Cluster 源码解析

本文从源码的角度介绍了IOTDB的分布式模块。

Log 相关

log

log记录了集群中发生过的操作,每一条log都有如下4个变量:

属性 含义
currLogIndex 此条log的索引
currLogTerm 此条log的term
previousLogIndex 前一条log的索引
previousLogTerm 前一条log的term

log
类图非常清晰了,无需详细解析,目前支持4种类型的log:1.AddNodeLog; 2.RemoveNodeLog; 3.CloseFileLog; 4.PhysicalPlanLog

snapshot解析

snapshot

  1. 所有的snapshot都继承自父类Snapshot,父类有两个成员变量:lastLogIndex和lastLogTerm,指的是快照中最后的一条logid和log term
  2. SimpleSnapshot: 是最简单的快照实现方式,所有的log都保存在内存中,快照是一个list的数组实现方式。
  3. FileSnapshot:继承了Snapshot的同时,也实现了TimeseriesSchemaSnapshot接口,主要保存了两个属性:
    1
    2
    private Collection<MeasurementSchema> timeseriesSchemas
    private List<RemoteTsFileResource> dataFiles

RemoteTsFileResource是tsfile的文件描述,而MeasurementSchema也是描述的各个measurement。所以可以看出来FileSnapshot之所以称之为”文件快照“,主要是由于保存了TsFileResource的原因。

  1. PullSnapshotTaskDescriptor:首先看一下官方注释:PullSnapshotTaskDescriptor describes a pull-snapshot-task with the slots to pull., 有如下两个数据结构.

    1
    2
    private PartitionGroup previousHolders;
    private List<Integer> slots;
  2. PartitionedSnapshot: 使用一个map保存了每个slot id对应的快照。核心数据结构如下

    1
    2
    3
    4
    5
    // key是slot id,value是Snapshot的泛型(具体的某一种Snapshot)
    private Map<Integer, T> slotSnapshots;
    // SnapshotFactory是一个函数式接口,用来反序列化的时候创建Snapshot的实例,
    // 可以通过下图看到函数接口的实现都是Snapshot的默认构造函数。
    private SnapshotFactory<T> factory;

函数接口的实现

  1. MetaSimpleSnapshot:继承自SimpleSnapshot:多了一个属性storageGroups,保存了所有的storageGroups。除了需要保存storageGroups之外,MetaSimpleSnapshot还需要保存所有需要的元数据,例如ttl,dataAuth等。
  2. PullSnapshotTask, 他并不是一个快照的实现类,而是封装了执行快照任务的一个类,它实现了Callable接口,可以简单的理解为实现了Runnable接口一样,关于Runnable和Callable接口的差异请google。call 函数是具体实现pullSnapshot 任务的函数,返回值是一个map,key是slot id,value是snapshot,以供调用者可以读取snapshot的数据来进行数据追赶。PullSnapshotTask持有PullSnapshotTaskDescriptor,用来描述从哪儿拉取快照。

log applier

Applier
核心函数就是apply(Log),使得log所描述的action在本机生效,这里其实跟wal有点类似了,wal的作用就是执行任何事情之前先写wal,防止崩溃的时候wal可以恢复,只不过log applier是raft状态机中的组件。

DataLogApplier目前提供的apply类型:PhysicalPlanLog和CloseFileLog
MetaLogApplier目前提供的apply类型:AddNodeLog,RemoveNodeLog和PhysicalPlanLog。

两种都提供PhysicalPlanLog,会根据具体提供的操作(数据操作、元数据操作)不一样而执行不同的方法。

log manager

LogManager
log manager也就是管理log的一些类,也就是log 存在哪里,怎么存?怎么取?等,目前的实现都是基于内存的实现方式。如果一个复制组所有的节点都crash的话,则log也就丢失了,基于文件(可持久化)的log管理目前正在开发中(https://issues.apache.org/jira/browse/IOTDB-351)

  1. LogManager接口,主要提供了一些访问Log类属性的一些方法以及存储,读取log的一些函数。其中着重指出void commitLog(long maxLogIndex)函数,此函数会调用log applier中的apply函数。主要函数如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // 追加一条log,目前实现是在内存中的logbuffer中追加一条log,
    // 并且更新lastLogId以及lastLogTgiterm
    boolean appendLog(Log log);

    // 应用(apply) maxLogIndex之前的所有的日志
    void commitLog(long maxLogIndex);

    // 用于判断logIndex这条log是否还在内存中,如果不在内存中,说明已经形成快照了
    boolean logValid(long logIndex);

    Snapshot getSnapshot();

    /**
    * Take a snapshot of the committed logs instantly and discard the committed logs.
    */
    // 把commitIndex之前的日志形成快照,并且删除commitIndex之前的日志。
    void takeSnapshot() throws IOException;

    LogApplier getApplier();

    void setLastLogId(long lastLogId);

    void setLastLogTerm(long lastLogTerm);
  2. MemoryLogManager,虚类,把所有的log都保存在内存中。

  3. MetaSingleSnapshotLogManager, 继承自MemoryLogManager,其中的snapshot也是内存持有的,不过其getSnapshot函数返回的是MetaSimpleSnapshot。
  4. PartitionedSnapshotLogManager, 继承自MemoryLogManager,其中的snapshot也是内存持有的,不过其getSnapshot函数返回的是PartitionedSnapshot。实际中没有
  5. FilePartitionedSnapshotLogManager,继承自PartitionedSnapshotLogManager,与PartitionedSnapshotLogManager不同之处在于:PartitionedSnapshotLogManager在更新快照的时候会把快照保存在内存中,而FilePartitionedSnapshotLogManager想从tsfile中实时获取快照。笔者理解就是FilePartitionedSnapshotLogManager不想在内存中保存快照,而是在需要的时候实时从tsfile中获取快照,用以减少内存的使用。但是目前并未达到理想的效果,因为在LogManager append log的时候,已经把log保存在内存中了,所以iotdb作者起了一个issue来研究这个问题 https://issues.apache.org/jira/browse/IOTDB-439

总结

Log,LogManager和LogApplier以及Snapshot之间的关系几个组件的关系为:

  • LogManager管理了log,包括log的存储,读取;
  • LogManager调用了LogApplier来执行真正的操作;
  • Snapshot是一种特殊的log,为了解决log过多的问题,Snapshot主要有三种:
    1. 包含了一组log的集合(目前实现是保存在内存中);
    2. 包含了一组tsfile(文件的snapshot)的集合;
    3. 包含了一些元数据集合的MetaSimpleSnapshot。

Server

RaftServer
先看一下UML类图,一图胜千言,RaftServer是一个虚类,里面的函数主要是根据一些节点的配置(ip,port)等进行初始化,然后启动服务。其有两个子类,一个是DataClusterServer,主要负责data 操作相关的事情,然后通过rpc传到同一个raft组的其他节点上;另外一个是MetaClusterServer,主要负责meta数据的操作,然后把操作通过rpc传到同一个raft组的其他节点上。

RaftServer

主要包括两个大的功能:1. 初始化节点配置;2. 启动节点并且监听rpc端口服务(start()函数)

MetaClusterServer

首先看一下官方注释:MetaCluster manages the whole cluster’s metadata, such as what nodes are in the cluster and the data partition. Each node has one MetaClusterServer instance, the single-node IoTDB instance is started-up at the same time. 意思就是MetaClusterServer管理了集群中所有的meta信息,主要信息就是数据的分布了(后面会详细讲解节点的分布策略)。每个节点上都会有和一个MetaClusterServer实例(也就是说iotdb的cluster版本每个节点是同质的,假设有1000个节点,则1000个节点上都有meta服务,1000个节点组成了meta信息的raft复制组。这在性能,结果一致性上面还是有问题的),同时也会启动一个iotdb的实例。

MetaClusterServer主要变量如下:

1
2
3
4
5
6
// 每个clusterServer会有一个MetaGroupMember对象,后面会详细介绍。
private MetaGroupMember member;
// iotdb实例
private IoTDB ioTDB;
// 保存cluster集群一些状态,在此先不做详细介绍。
private RegisterManager registerManager = new RegisterManager();

DataClusterServer

DataClusterServer的作用就是来管理data一些数据的操作的,与MetaClusterServer不同之处在于他的RaftMember是一个map,

1
2
3
4
5
// key 是raft 复制组的header node, value是DataGroupMember,也就代表了一个复制组。
private Map<Node, DataGroupMember> headerGroupMap = new ConcurrentHashMap<>();

// 这个数据结构是关键,后面会详细讲解。
private PartitionTable partitionTable;

DataClusterServer的任何操作都是先要在这个map中根据rpc传过来的header,找到对应的DataGroupMember,然后再进行操作。

todo@戚厚亮,后面记得讲解PartitionTable

RaftMember

RaftMember
先上一个官方注释。
RaftMember process the common raft logic like leader election, log appending, catch-up and so on. 也就是说,raft算法中的一些核心逻辑:领导选举、log复制、快照追赶等逻辑都是RaftMember干的。

下面介绍一下核心属性的作用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// the name of the member, to distinguish several members from the logs
// name就是“META/DATA”+ip+port
String name;

// to choose nodes to join cluster request randomly
// 当有新的节点加入到集群中的时候,从已经在集群中的节点中随机的选择一个节点来处理加入操作。
Random random = new Random();

protected Node thisNode;
// the nodes known by this node
protected volatile List<Node> allNodes;

AtomicLong term = new AtomicLong(0);
// 默认开始都是ELECTOR的状态
volatile NodeCharacter character = NodeCharacter.ELECTOR;
volatile Node leader;
volatile long lastHeartbeatReceivedTime;

// the raft logs are all stored and maintained in the log manager
// logManager管理了所有的log操作,详细分析请看上面logManager
LogManager logManager;

// the single thread pool that runs the heartbeat thread
// 心跳线程池,其心跳服务在HeartbeatThread类中。
ExecutorService heartBeatService;

// the thread pool that runs catch-up tasks
// catch up线程池
private ExecutorService catchUpService;

// lastCatchUpResponseTime records when is the latest response of each node's catch-up. There
// should be only one catch-up task for each node to avoid duplication, but the task may time out and in that case, the next catch up should be enabled.
// 记录节点上次追赶log的时间,用于防止一个节点建立多个追赶任务,也用于判断追赶日志超时使用。
private Map<Node, Long> lastCatchUpResponseTime = new ConcurrentHashMap<>();

// the pool that provides reusable clients to connect to other RaftMembers. It will be initialized according to the implementation of the subclasses
// client 连接池,用于复用已经创建的连接,不用每次通信都需要建立连接
private ClientPool clientPool;

// when the commit progress is updated by a heart beat, this object is notified so that we may
// know if this node is synchronized with the leader
// 两个地方用到了这个锁;1.syncLeader函数,用来同步leader的commint log index的。2.在心跳算法sendHeartbeat中,也是用来同步leader 的commit log index的。
private Object syncLock = new Object();

// when the header of the group is removed from the cluster, the members of the group should no longer accept writes, but they still can be read candidates for weak consistency reads and provide snapshots for the new holders
volatile boolean readOnly = false;

这里要强调一下上面的allNodes变量,这里的allNodes并不是所有的节点,而仅仅是这个复制组里面的节点,通常节点个数与副本数一致

RaftMember中的重点函数解析

首先明确一点,心跳是leader发送给follower的。发送的内容是leader的commitLogTerm和commitLogIndex,来让follower更新自己的commitLogTerm和commitLogIndex。

心跳算法

  1. 如果follower收到的请求的term小于本地节点的term,则拒绝这次心跳请求,并且把本地节点保存的term返回给发送者,让leader更新自己的状态。
  2. 如果校验心跳请求参数合格的话,则会把本节点log最后更新的index发送给leader,让leader知道下次给”我”这个节点发送log的时候从哪里开始发送。
  3. 执行回调HeartbeatHandler.onComplete。leader会根据返回的信息做如下几个事情:
    • 如果follower还承认我是leader,则根据follower发送过来的logIndex让follower追赶我的日志 ,注意LogCatchUpTask和SnapshotCatchUpTask都是leader主动给follower发送数据的,让follower进行追赶leader的日志(快照)。
    • 如果follower的term比我leader的term还大,则leader主动发起退休(retireFromLeader)。

备注:此函数中有一个syncLock锁,是用来同步leader的CommitLogIndex的,只所以用锁是因为在syncLeader函数中也有同步leader的CommitLogIndex的的功能。

小提示:与etcd raft中心跳算法的不同之处在于:ectd raft心跳仅仅是leader把commitLogIndex发送给其他follower,并未返回一些信息让leader发起follower.catchUp的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* Process the HeartBeatRequest from the leader. If the term of the leader is smaller than the
* local term, turn it down and tell it the newest term. ELse if the local logs catch up the
* leader's, commit them. Else help the leader find the last match log. Also update the
* leadership, heartbeat timer and term of the local node.
*
* @param request
* @param resultHandler
*/
@Override
public void sendHeartbeat(HeartBeatRequest request, AsyncMethodCallback resultHandler) {
logger.trace("{} received a heartbeat", name);
// term可以理解为一个全局锁
synchronized (term) {
long thisTerm = term.get();
// 解析request中的数据,主要有leader的term
long leaderTerm = request.getTerm();
HeartBeatResponse response = new HeartBeatResponse();

// 如果leader的term比这个复制组的本节点的term小,则这个leader一定是过期的,忽略这个请求。并且把本节点的term告诉leader
if (leaderTerm < thisTerm) {
// a leader with term lower than this node is invalid, send it the local term to inform this
response.setTerm(thisTerm);
if (logger.isTraceEnabled()) {
logger.trace("{} received a heartbeat from a stale leader {}", name, request.getLeader());
}
} else {
// the heartbeat comes from a valid leader, process it with the sub-class logic
processValidHeartbeatReq(request, response);

response.setTerm(Response.RESPONSE_AGREE);
// tell the leader who I am in case of catch-up
response.setFollower(thisNode);
// TODO-CLuster: the log being sent should be chosen wisely instead of the last log, so that the leader would be able to find the last match log
// 注释已经说的很好了,发送给leader的log应该让leader知道下次发送给follwer的index是多少!
response.setLastLogIndex(logManager.getLastLogIndex());
response.setLastLogTerm(logManager.getLastLogTerm());

// The term of the last log needs to be the same with leader's term in order to preserve safety, otherwise it may come from an invalid leader and is not committed
if (logManager.getLastLogTerm() == leaderTerm) {
// syncLock只有两个地方有使用,除了此处之外,还在syncLeader函数中用到了,这里加这个锁就是防止本节点获取的leader信息是过时的。
synchronized (syncLock) {
logManager.commitLog(request.getCommitLogIndex());
syncLock.notifyAll();
}
}
// if the log is not consistent, the commitment will be blocked until the leader makes the node catch up

term.set(leaderTerm);
setLeader(request.getLeader());
if (character != NodeCharacter.FOLLOWER) {
setCharacter(NodeCharacter.FOLLOWER);
}
setLastHeartbeatReceivedTime(System.currentTimeMillis());
if (logger.isTraceEnabled()) {
logger.trace("{} received heartbeat from a valid leader {}", name, request.getLeader());
}
}
resultHandler.onComplete(response);
}
}

catchUp函数

在心跳算法中讲到leader发起heartBeat请求的时候,会有一个回调,即HeartbeatHandler.onComplete,在这里面会有执行catchUp方法。下面分析下这个函数。

catchUp函数也是leader主动发起的。当leader给follower发送心跳的时候,follower会把自己的lastLogIndex返回给leader,leader据此来发起catchUp操作,给follower推送lastLogIndex之后的所有的数据。

流程如下

  1. 首先检查一下follower上次catchUp的时间距离现在是否小于一个阈值(20000ms),如果是则不进行catchUp。(限制catchUp的时间间隔,心跳是1000ms),
  2. 判断内存中log信息是否有效,有效的定义如下:要请求的日志都在内存中则视为有效(即follower还未落下很远,无需通过拉取快照进行追赶)。
  3. 构造follower所需的数据,根据第2步是否有效执行如下不同的内容。
    • 若2有效,则通过logManager获取followerLastLogIndex之后的所有日志;
    • 若2无效,则证明follower所需的日志有些已经成为快照了,所以需要拉取快照和内存中日志两部分数据。
  4. 向catchUpService服务(catchUpService是一个ExecutorService的线程池)中提交一个任务,任务根据第2步是否有效分为两种:(LogCatchUpTask和SnapshotCatchUpTask都实现了Runnable接口)
    • 若2有效:则会创建一个LogCatchUpTask任务。这个任务就是与follower建立连接,并且遍历所有需要追赶的log,执行followr.appendEntry()即可。 注意:这里有一个优化点:可以调用followr.appendEntries()一次性把所有的log发送给follower,而不是每次都发送一条数据。
    • 若2无效:则会创建一个SnapshotCatchUpTask任务。SnapshotCatchUpTask继承了LogCatchUpTask,其run方法主要内容如下:
      1. 把第3步中构造的快照序列化之后调用follower.sendSnapshot发送给follower;
      2. 发送log:执行LogCatchUpTask中doLogCatchUp,即上面讲的LogCatchUpTask类中的主要内容。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* Update the followers' log by sending logs whose index >= followerLastMatchedLogIndex to the
* follower. If some of the logs are not in memory, also send the snapshot.
* <br>notice that if a part of data is in the snapshot, then it is not in the logs</>
*
* @param follower
* @param followerLastLogIndex
*/
public void catchUp(Node follower, long followerLastLogIndex) {
// TODO-Cluster: use lastMatchLogIndex instead of lastLogIndex
// for one follower, there is at most one ongoing catch-up
synchronized (follower) {
// check if the last catch-up is still ongoing
// 首先检查一下follower上次catchUp的时间距离现在是否小于一个阈值(20000ms),如果是则不进行catchUp
Long lastCatchupResp = lastCatchUpResponseTime.get(follower);
if (lastCatchupResp != null
&& System.currentTimeMillis() - lastCatchupResp < RaftServer.connectionTimeoutInMS) {
logger.debug("{}: last catch up of {} is ongoing", name, follower);
return;
} else {
// record the start of the catch-up
lastCatchUpResponseTime.put(follower, System.currentTimeMillis());
}
}
if (followerLastLogIndex == -1) {
// if the follower does not have any logs, send from the first one
followerLastLogIndex = 0;
}

// 连接到远程节点
AsyncClient client = connectNode(follower);
if (client != null) {
List<Log> logs;
boolean allLogsValid;
Snapshot snapshot = null;
synchronized (logManager) {
// check if the very first log has been snapshot
// 根据follower请求的logIndex判断要请求的所有log是否都在内存中,如果都在内存中,则只需要把内存中的log发送给follower即可,否则还需要把snapshot一起发送给follower
allLogsValid = logManager.logValid(followerLastLogIndex);
logs = logManager.getLogs(followerLastLogIndex, Long.MAX_VALUE);
// 所需要的数据不都在内存中,所以需要把快照数据发送给follower。
if (!allLogsValid) {
// if the first log has been snapshot, the snapshot should also be sent to the
// follower, otherwise some data will be missing
// 获取snapshot
snapshot = logManager.getSnapshot();
}
}

// 根据是否需要给follower发送snapshot,生成LogCatchUpTask或是SnapshotCatchUpTask
if (allLogsValid) {
if (logger.isDebugEnabled()) {
logger.debug("{} makes {} catch up with {} cached logs", name, follower, logs.size());
}
catchUpService.submit(new LogCatchUpTask(logs, follower, this));
} else {
logger.debug("{}: Logs in {} are too old, catch up with snapshot", name, follower);
catchUpService.submit(new SnapshotCatchUpTask(logs, snapshot, follower, this));
}
} else {
lastCatchUpResponseTime.remove(follower);
logger.warn("{}: Catch-up failed: node {} is currently unavailable", name, follower);
}
}

选举算法

处理选举过程的算法也很简单:

  1. 首先看发送过来的请求的node(也就是要当leader的node)是否与本地节点保存的leader一致,如果一致我就同意。
  2. 如果不一致,则根据请求过来的信息与本地信息进行对比来决定是否同意发起选举的节点作为leader,其算法在processElectionRequest中。主要判断逻辑如下:
    • thatTerm <= thisTerm 拒绝,发起选举的term都比我本地的要小,怎么可能给你投票!
    • thatLastLogTerm < thisLastLogTerm 拒绝,发起选举的节点的最后一条日志的term都比我本地的要小,拒绝你。
    • (thatLastLogTerm == thisLastLogTerm && thatLastLogId < thisLastLogIndex) 拒绝,虽然最后一条日志的term一致,但是发起选举节点的最后一条日志的索引比我本地的要小,拒绝你。
    • 其他情况下就给发起选举的节点投票了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Process an ElectionRequest. If the request comes from the last leader, agree with it. Else
* decide whether to accept by examining the log status of the elector.
*
* @param electionRequest
* @param resultHandler
*/
@Override
public void startElection(ElectionRequest electionRequest, AsyncMethodCallback resultHandler) {
synchronized (term) {
if (electionRequest.getElector().equals(leader)) {
// always agree with the last leader
resultHandler.onComplete(Response.RESPONSE_AGREE);
return;
}

// check the log status of the elector
long response = processElectionRequest(electionRequest);
logger.info("{} sending response {} to the elector {}", name, response,
electionRequest.getElector());
resultHandler.onComplete(response);
}
}

syncLeader函数

在分析心跳算法的时候,在代码注释里面提到了syncLeader这个函数。这个函数主要用来同步leader的commint log index的。

  1. 首先保证所有的快照下载任务都已经完成了。否则就等待所有的快照任务执行结束。
  2. rpc与leader节点建立通信,请求CommitLogIndex。
  3. 判断本地的CommitLogIndex与leader的CommitLogIndex是否一致,即follwer是否追赶上了leader的日志。如果追赶上了,则返回成功;否则未超时的情况下(20秒),等待心跳(默认1000ms,因为心跳的时候leader会使得follwer追赶自己的日志)。当等待超过20s之后若还未追赶上,则返回失败。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* Request and check the leader's commitId to see whether this node has caught up. If not, wait
* until this node catches up.
*
* @return true if the node has caught up, false otherwise
*/
public boolean syncLeader() {
// make sure all snapshot pulling are done, otherwise some data will remain in the old nodes
// 首先要保证所有的的下载快照任务都已经完成了
logManager.waitRemoteSnapshots();

if (character == NodeCharacter.LEADER) {
return true;
}
if (leader == null) {
// the leader has not been elected, we must assume the node falls behind
return false;
}
logger.debug("{}: try synchronizing with the leader {}", name, leader);
long startTime = System.currentTimeMillis();
long waitedTime = 0;
AtomicReference<Long> commitIdResult = new AtomicReference<>(Long.MAX_VALUE);
// syncLeaderMaxWaitMs = 20 * 1000,也就是20s
while (waitedTime < RaftServer.syncLeaderMaxWaitMs) {
// 连接到leader节点
AsyncClient client = connectNode(leader);
if (client == null) {
// cannot connect to the leader
return false;
}
try {
synchronized (commitIdResult) {
// rpc调用,请求leader的commitIndex
client.requestCommitIndex(getHeader(), new GenericHandler<>(leader, commitIdResult));
commitIdResult.wait(RaftServer.syncLeaderMaxWaitMs);
}
long leaderCommitId = commitIdResult.get();
long localCommitId = logManager.getCommitLogIndex();
logger.debug("{}: synchronizing commitIndex {}/{}", name, localCommitId, leaderCommitId);
// 判断本地的commitIndex是否已经追赶上leader的commitIndex
if (leaderCommitId <= localCommitId) {
// before the response comes, the leader may commit new logs and the localCommitId may be updated by catching up, so it is possible that localCommitId > leaderCommitId at this time,this node has caught up
// 个人理解这种情况发生在requestCommitIndex请求还未返回的时候,leader提交了新的日志,并且也把这些日志同步给了follower。
if (logger.isDebugEnabled()) {
waitedTime = System.currentTimeMillis() - startTime;
logger.debug("{}: synchronized with the leader after {}ms", name, waitedTime);
}
return true;
}
// wait for next heartbeat to catch up
// the local node will not perform a commit here according to the leaderCommitId because
// the node may have some inconsistent logs with the leader
// 真是一个骚操作啊!!!!!等待heartbeat追赶日志?heartbeat还有这个功能,在心跳章节中解释了为什么心跳的时候还有日志追赶的功能。
waitedTime = System.currentTimeMillis() - startTime;
synchronized (syncLock) {
// heartBeatIntervalMs 1000ms
syncLock.wait(RaftServer.heartBeatIntervalMs);
}
} catch (TException | InterruptedException e) {
logger.error("{}: Cannot request commit index from {}", name, leader, e);
}
}
return false;
}

DataGroupMember

里面就是具体的功能实现了,// TODO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* When a DataGroupMember pulls data from another node, the data files will be firstly stored in the "REMOTE_FILE_TEMP_DIR", and then load file functionality of IoTDB will be used to load the files into the IoTDB instance.
*/
private static final String REMOTE_FILE_TEMP_DIR = "remote";

/**
* The MetaGroupMember that in charge of the DataGroupMember. Mainly for providing partition table and MetaLogManager.
*/
private MetaGroupMember metaGroupMember;

/**
* The thread pool that runs the pull snapshot tasks. Pool size is the # of CPU cores.
*/
private ExecutorService pullSnapshotService;

/**
* "logManager" manages the logs of this DataGroupMember. Although the logs of different data partitions (slots) are mixed together before a snapshot is taken, after the taking of snapshot, logs of different logs will be stored separately.
*/
private PartitionedSnapshotLogManager logManager;

/**
* "queryManger" records the remote nodes which have queried this node, and the readers or
* executors this member has created for those queries. When the queries end, an
* EndQueryRequest will be sent to this member and related resources will be released.
*/
private ClusterQueryManager queryManager;

MetaGroupMember

里面就是具体的功能实现了,// TODO

1
2
3
// nodes in the cluster and data partitioning
// 重点是这个PartitionTable的实现,这里面包含了tsfile的数据分布,在partition章节将详细分析
private PartitionTable partitionTable;

详细函数分析

####List getSeriesTypesByPath(List paths, List aggregations)
此函数的功能是获取paths中每个path所对应的tsDataType,函数思路:

  1. 从本地获取,如果本地有则返回,否则执行第2步骤;
  2. 从远端获取:
    • 先执行pullTimeSeriesSchemas(path) 获取这个path的schema,然后从schema中获取dataType。
    • 把这些schema cache在本地。
注意:此处有一个优化点:第1步从本地获取的时候,是批量执行的,即如果有一个path对应的schema在本地不存在,则这批操作都失败,然后都去远程去拿,其实可以优化为对于失败的那些去远程读取,这样会少远程读取的操作

数据分布partition

数据分布主要有以下几个类:

  1. PartitionGroup. 这个类继承了ArrayList,所以实际上就是一个Node的列表,这个列表中所有的节点组成了一个raft复制组的所有节点,第一个节点记为header。
  2. PartitionTable是一个interface,其实现是SlotPartitionTable,其核心就是如下几个属性
1
2
3
4
5
6
7
8
9
10
11
12
13
//The following fields are used for determining which node a data item belongs to.
// the slots held by each node
// key:node,value是这个node上面所有的slot,slot就是tsfile的逻辑单位
private Map<Node, List<Integer>> nodeSlotMap = new ConcurrentHashMap<>();
// each slot is managed by whom
// key:slot id,value是这个slot id所在的node
private Map<Integer, Node> slotNodeMap = new ConcurrentHashMap<>();//TODO a List is enough
// the nodes that each slot belongs to before a new node is added, used for the new node to find the data source
private Map<Node, Map<Integer, Node>> previousNodeMap = new ConcurrentHashMap<>();

//the filed is used for determining which nodes need to be a group. the data groups which this node belongs to.
// localGroups保存了本节点上的所有的复制组
private List<PartitionGroup> localGroups;

下面详细讲解一下SlotPartitionTable中的一些函数

SlotPartitionTable初始化

1
2
3
4
5
6
7
8
9
private void init(Collection<Node> nodes) {
logger.info("Initializing a new partition table");
nodeRing.addAll(nodes);
// 根据nodeIdentifier对所有的node进行排序,nodeIdentifier的生成是在MetaGroupMember::genNodeIdentifier()函数中,会根据ip,port和当前时间进行hash生成一个数字
nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
// getPartitionGroups 这个函数算法详解在下面单独列出
localGroups = getPartitionGroups(thisNode);
assignPartitions();
}

getPartitionGroups函数详解

  1. 首先找到这个节点的索引index(所有的节点都已经按照nodeIdentifier排序了)
  2. 找到这个节点的上面的所有的复制组,算法如下。
    partition 划分图解
    • 首先把所有的节点按照索引大小排列为一个环,如下所示(假设有10个节点),假设此节点的index为1,副本数replicaNum为3,则这个节点上面就会形成3个data group(复制组)
    • 首先以节点1为起始点,顺时针找replicaNum个节点组成一个复制组,即节点1,2,3组成了一个复制组data group0,这个复制组的header为节点1;
    • 然后在以节点0(节点1的上一个节点)为起始点,顺时针找replicaNum个节点组成一个复制组,即节点0,1,2组成了一个复制组data group1,这个复制组的header为节点0;
    • 然后在以节点9(节点0的上一个节点)为起始点,顺时针找replicaNum个节点组成一个复制组,即节点9,0,1组成了和一个复制组data group2,这个复制组的header为节点9;

可以看到每个节点上面最多有replicaNum个复制组。getHeaderGroup(Node node)的作用就是以参数node为起始点,顺时针找到replicaNum个节点作为一个复制组然后返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// find replicationNum groups that a node is in
private List<PartitionGroup> getPartitionGroups(Node node) {
List<PartitionGroup> ret = new ArrayList<>();

int nodeIndex = nodeRing.indexOf(node);
for (int i = 0; i < replicationNum; i++) {
// the previous replicationNum nodes (including the node itself) are the headers of the
// groups the node is in
int startIndex = nodeIndex - i;
if (startIndex < 0) {
startIndex = startIndex + nodeRing.size();
}
ret.add(getHeaderGroup(nodeRing.get(startIndex)));
}

logger.debug("The partition groups of {} are: {}", node, ret);
return ret;
}

Slot与node的映射关系计算

可以详细看如下代码,思路如下:

  1. 计算出总的node个数: nodeRingSize;
  2. 求出配置的totalSlotNumbers,默认是10000个:
  3. 则每个node上面分配的slot数量为:slotsPerNode = totalSlotNumbers / nodeRingSize,因为这个结果并不一定是整数,则最后面的一个node上面的slot num个数可能会多一些。所以Map<Node, List> nodeSlotMap这个数据结构就初始化好了。
  4. 有了Map<Node, List> nodeSlotMap这个数据结构,则每个slot在哪个节点,即Map<Integer, Node> slotNodeMap这个数结构也就很清楚了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void assignPartitions() {
// evenly assign the slots to each node
int nodeNum = nodeRing.size();
// 计算出每个节点上面有多少个slot
int slotsPerNode = totalSlotNumbers / nodeNum;
for (Node node : nodeRing) {
nodeSlotMap.put(node, new ArrayList<>());
}

for (int i = 0; i < totalSlotNumbers; i++) {
int nodeIdx = i / slotsPerNode;
if (nodeIdx >= nodeNum) {
// the last node may receive a little more if total slots cannot de divided by node number
// 对于最后一个节点,则可能会多一些slot
nodeIdx--;
}
// nodeSlotMap key就是node,value就是slot num的list
nodeSlotMap.get(nodeRing.get(nodeIdx)).add(i);
}

// build the index to find a node by slot
for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) {
for (Integer slot : entry.getValue()) {
// slotNodeMap key是slot id,value是node
slotNodeMap.put(slot, entry.getKey());
}
}
}

数据路由

以insert数据为例,其数据路由如下:

  1. 根据要插入数据的storageGroupName以及timestamp 进行hash,确定slotId。算法如下:
1
2
3
4
5
6
7
8
9
public static int calculateStorageGroupSlotByTime(String storageGroupName, long timestamp,
int slotNum) {
// 获取partitionId,partitionInstance=timestamp / timePartitionInterval;
long partitionInstance = StorageEngine.getTimePartition(timestamp);
// 计算hash值
int hash = Murmur128Hash.hash(storageGroupName, partitionInstance, HASH_SALT);
// 对slotNum取模,计算出所属的slotId。
return Math.abs(hash % slotNum);
}
  1. 从slotNodeMap这个map中获取这个slot所在的node;
  2. 以此node作为header,寻找这个node作为header的PartitionGroup.
  3. 获取此PartitionGroup(其实是用了这个partitionGroup的header)所在的GroupMember,后续对数据的操作皆用此GroupMember对象。(代码在DataGroupMember::getDataMember中,即iotdb为每个header创建了一个DataGroupMember,然后保存在Map<Node, DataGroupMember> headerGroupMap这个数据结构中,所以通过header就可以找到这个header所属的DataGroupMember)。

节点管理

添加节点

增加节点这个操作是MetaClusterServer来处理的,MetaClusterServer收到请求之后,调用MetaGroupMember.joinCluster()来处理,这个函数的处理逻辑如下:

  1. 从种子节点中随机的选择一个节点(假设选择的为节点A,待加入节点为B,这个操作也是在节点B操作的,节点B执行shell脚本把自己加入到集群中),把这个添加节点的请求转发给这个节点(节点A);
    • 连接节点A,给节点A发送addNode的rpc请求(最终请求到达节点A之后,是MetaGroupMember调用的add方法);
      • 节点A首先看本机是否是复制组的leader,如果不是,则把请求转发给leader;否则自己处理;
      • 当发现自己是所在的复制组的leader的时候:
        • 检查一下节点B的参数与节点A本地的参数是否一致(PartitionInterval、HashSalt、ReplicationNum等),不一致则返回。
        • 构造AddNodeLog,并且把其发送给同一个复制组(目前的实现也就是所有的节点,因为metaDataGroup所有的节点组成了一个复制组),当大多数节点收到回复的时候,即认为成功;注意:leader发送日志给follower的时候,follower仅仅把日志保存下来,并不去commit日志,commit日志只有在leader与follower通过心跳通信的时候,才让follower去commit日志,然后在走一遍状态机
        • 当节点A收到大多数节点的回复的时候,节点A提交日志,提交日志的时候会走状态机,对于AddNodeLog,状态机会更新本地的partitionTabel;
        • 节点A序列化本地最新的partitionTabel,返回给节点B;
    • 若rpc请求返回成功:接受节点A返回的partitionTable信息,然后调用dataGroupMember.pullSnapshot 方法来追赶新复制组的数据。
    • 其他情况下都是失败的,等待重试。
  2. 当加入成功之后,设置自己的角色为FOLLOWER,并且启动心跳服务。结束;
  3. 若第一步加入失败,则等待1000ms(心跳时间)之后,再次重试,最多重试10次;

删除节点

删除节点与添加节点逻辑类似,再次不再分析