Redis进阶-分布式存储 Sequential partitioning & Hash partitioning
再来所以说为啥需要集群?
两个方面:
Redis Cluster 将所有数据划分为 16384 个 slots(槽位),每个节点负责其中一部分槽位。槽位的信息存储于每个节点中。
为啥要用16384个slot , 请移步 Redis进阶-分布式存储 Sequential partitioning & Hash partitioning
当 Redis Cluster 的客户端来连接集群时,它也会得到一份集群的槽位配置信息并将其缓存在客户端本地。这样当客户端要查找某个 key 时,可以直接定位到目标节点。
我们来看下jedis(Version:2.9.0)的实现
jedisCluster = new JedisCluster(jedisClusterNode, 6000, 5000, 10, "artisan", config);
跟下源码
public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);
initializeSlotsCache(nodes, poolConfig, password);
}
重点看下 initializeSlotsCache
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
for (HostAndPort hostAndPort : startNodes) {
Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
if (password != null) {
jedis.auth(password);
}
try {
cache.discoverClusterNodesAndSlots(jedis);
break;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
继续 cache.discoverClusterNodesAndSlots(jedis);
cache为 JedisClusterInfoCache 对象。
public void discoverClusterNodesAndSlots(Jedis jedis) {
w.lock();
try {
reset();
List<Object> slots = jedis.clusterSlots();
for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;
if (slotInfo.size() <= MASTER_NODE_INDEX) {
continue;
}
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
// hostInfos
int size = slotInfo.size();
for (int i = MASTER_NODE_INDEX; i < size; i++) {
List<Object> hostInfos = (List<Object>) slotInfo.get(i);
if (hostInfos.size() <= 0) {
continue;
}
HostAndPort targetNode = generateHostAndPort(hostInfos);
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
}
}
}
} finally {
w.unlock();
}
}
同时因为槽位的信息可能会存在客户端与服务器不一致的情况,还需要纠正机制来实现槽位信息的校验调整。
jedisCluster.set("clusterArtisan", "artisanValue")
跟下源码
@Override
public String set(final String key, final String value) {
return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
@Override
public String execute(Jedis connection) {
return connection.set(key, value);
}
}.run(key);
}
重点是这个connetion 对象是从哪里来的,继续看下 run
public T run(String key) {
..........
return runWithRetries(SafeEncoder.encode(key), this.maxAttempts, false, false);
}
继续
private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
Jedis connection = null;
try {
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
return execute(connection);
......
......
}
重点来了
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
是不是就是CRC16 ?
public static int getSlot(byte[] key) {
int s = -1;
int e = -1;
boolean sFound = false;
for (int i = 0; i < key.length; i++) {
if (key[i] == '{' && !sFound) {
s = i;
sFound = true;
}
if (key[i] == '}' && sFound) {
e = i;
break;
}
}
if (s > -1 && e > -1 && e != s + 1) {
return getCRC16(key, s + 1, e) & (16384 - 1);
}
return getCRC16(key) & (16384 - 1);
}
Cluster 默认会对 key 值使用 crc16 算法进行 hash 得到一个整数值,然后用这个整数值对 16384 进行取模来得到具体 槽位。 HASH_SLOT = CRC16(key) mod 16384
当客户端向一个错误的节点发出了指令,该节点会发现指令的 key 所在的槽位并不归自己管理,这时它会向客户端发送一个特殊的跳转指令携带目标操作的节点地址,告诉客户端去连这个节点去获取数据。
客户端收到指令后除了跳转到正确的节点上去操作,还会同步更新纠正本地的槽位映射表缓存,后续所有 key 将使用新的槽位映射表
redis cluster节点间采取gossip协议进行通信
集中式和gossip
Gossip protocol 也叫 Epidemic Protocol (流行病协议),别名很多比如:“流言算法”、“疫情传播算法”等。
Gossip protocol 最早是在 1987 年发表在 ACM 上的论文 《Epidemic Algorithms for Replicated Database Maintenance》中被提出。
主要用在分布式数据库系统中各个副本节点同步数据之用,这种场景的一个最大特点就是组成的网络的节点都是对等节点,是非结构化网络
我们先做一些前提设定
1、Gossip 是周期性的散播消息,把周期限定为 1 秒
2、被感染节点随机选择 k 个邻接节点(fan-out)散播消息,这里把 fan-out 设置为 3,每次最多往 3 个节点散播。
3、每次散播消息都选择尚未发送过的节点进行散播
4、收到消息的节点不再往发送节点散播,比如 A -> B,那么 B 进行散播的时候,不再发给 A。
这里一共有 16 个节点,节点 1 为初始被感染节点,通过 Gossip 过程,最终所有节点都被感染:
gossip协议包含多种消息,包括ping,pong,meet,fail等等。
每个节点都有一个专门用于节点间通信的端口,就是自己提供服务的端口号+10000,比如7001,那么用于节点间通信的就是17001端口。
每个节点每隔一段时间都会往另外几个节点发送ping消息,同时其他几点接收到ping消息之后返回pong消息。
真实世界的机房网络往往并不是风平浪静的,它们经常会发生各种各样的小问题。比如网络抖动就是非常常见的一种现象,突然之间部分连接变得不可访问,然后很快又恢复正常。
为解决这种问题,Redis Cluster 提供了一种选项cluster-node-timeout,表示当某个节点持续 timeout 的时间失联时,才可以认定该节点出现故障,需要进行主从切换。如果没有这个选项,网络抖动会导致主从频繁切换 (数据的重新复制)。
redis.conf配置文件中
当slave发现自己的master变为FAIL状态时,便尝试进行Failover,以期成为新的master。由于挂掉的master可能会有多个slave,从而存在多个slave竞争成为master节点的过程, 其过程如下:
从节点并不是在主节点一进入 FAIL 状态就马上尝试发起选举,而是有一定延迟,一定的延迟确保我们等待FAIL状态在集群中传播,slave如果立即尝试选举,其它masters或许尚未意识到FAIL状态,可能会拒绝投票。
延迟计算公式: DELAY = 500ms + random(0 ~ 500ms) + SLAVE_RANK * 1000ms
SLAVE_RANK表示此slave已经从master复制数据的总量的rank。Rank越小代表已复制的数据越新。这种方式下,持有最新数据的slave将会首先发起选举(理论上)。
当redis.conf的配置cluster-require-full-coverage为no时,表示当负责一个插槽的主库下线且没有相应的从库进行故障恢复时,集群仍然可用,如果为yes则集群不可用。
因为新master的选举需要大于半数的集群master节点同意才能选举成功,如果只有两个master节点,当其中一个挂了,是达不到选举新master的条件的。
奇数个master节点可以在满足选举该条件的基础上节省一个节点,比如三个master节点和四个master节点的集群相比,大家如果都挂了一个master节点都能选举新master节点,如果都挂了两个master节点都没法选举新master节点了,所以奇数的master节点更多的是从节省机器资源角度出发说的。