non-relational
“,泛指非关系型的数据库,很多人也说它是”Not Only SQL“
Redis
、MemCache
、MongoDB
等
SQL | NoSQL | |
---|---|---|
数据结构 | 结构化 | 非结构化 |
数据关联 | 关联的 | 无关联的 |
查询方式 | SQL查询 | 非SQL |
事务特性 | ACID | BASE |
存储方式 | 磁盘 | 内存 |
扩展性 | 垂直 | 水平 |
使用场景 | 1)数据结构固定2)相关业务对数据安全性、一致性要求较高 | 1)数据结构不固定2)对一致性、安全性要求不高3)对性能要求 |
Redis诞生于2009年全称是Remote Dictionary Server,远程词典服务器,是一个基于内存的键值型NoSQL数据库。
Redis的特征:
key-value
)型,value支持多种不同数据结构,功能丰富brew install redis
# 启动
brew services start redis
==> Successfully started `redis` (label: homebrew.mxcl.redis)
# 关闭
brew services stop redis
# 连接redis
redis-cli
# 关闭连接,并退出
127.0.0.1:6379> shutdown
not connected> quit
远程连接Redis
redis-cli -h xxx.xx.xx.xx -p 6379 -a 123456
-h xxx.xx.xx.xx
:指定要连接的redis节点的IP地址,默认是127.0.0.1-p 6379
:指定要连接的redis节点的端口,默认是6379-a 123456
:指定redis的访问密码 指定密码
#方法1
#在配置文件中配置requirepass的密码(当redis重启后密码依然有效)。
requirepass foobared 修改成 : requirepass 123321
#方法2(当redis重启后密码无效)。
127.0.0.1:6379>config set requirepass 123321 #设置密码
Ok
127.0.0.1:6379>config get requirepass #查看密码
127.0.0.1:6379> auth 123321 #指定密码(登录时未指定密码可以用此命令制定密码)
Ok
心跳测试
127.0.0.1:6379> ping
PONG
本次安装Redis是基于Linux系统下安装的,因此需要一台Linux服务器或者虚拟机。如果您使用的是自己购买的服务器,请提前开放
6379
端口,避免后续出现的莫名其妙的错误!
Redis是基于C语言编写的,因此首先需要安装Redis所需要的gcc依赖
yum install -y gcc tcl
安装成功如下图所示:
将redis-6.2.6.tar
上传至/usr/local/src
目录
在xShell中cd
到/usr/local/src
目录执行以下命令进行解压操作
tar -xzf redis-6.2.6.tar.gz
解压成功后依次执行以下命令
cd redis-6.2.6
make
make install
安装成功后打开/usr/local/bin目录(该目录为Redis默认的安装目录)
Redis的启动方式有很多种,例如:前台启动、后台启动、开机自启
这种启动属于前台启动,会阻塞整个会话窗口,窗口关闭或者按下
CTRL + C
则Redis停止。不推荐使用。
安装完成后,在任意目录输入redis-server
命令即可启动Redis
redis-server
启动成功如下图所示
如果要让Redis以后台方式启动,则必须修改Redis配置文件,配置文件所在目录就是之前我们解压的安装包下
因为我们要修改配置文件,因此我们需要先将原文件备份一份
cd /usr/local/src/redis-6.2.6
cp redis.conf redis.conf.bck
然后修改redis.conf
文件中的一些配置
# 允许访问的地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问,生产环境不要设置为0.0.0.0
bind 0.0.0.0
# 守护进程,修改为yes后即可后台运行
daemonize yes
# 密码,设置后访问Redis必须输入密码
requirepass 1325
Redis其他常用配置
# 监听的端口
port 6379
# 工作目录,默认是当前目录,也就是运行redis-server时的命令,日志、持久化等文件会保存在这个目录
dir .
# 数据库数量,设置为1,代表只使用1个库,默认有16个库,编号0~15
databases 1
# 设置redis能够使用的最大内存
maxmemory 512mb
# 日志文件,默认为空,不记录日志,可以指定日志文件名
logfile "redis.log"
启动Redis
# 进入redis安装目录
cd /usr/local/src/redis-6.2.6
# 启动
redis-server redis.conf
停止Redis服务
# 通过kill命令直接杀死进程
kill -9 redis进程id
# 利用redis-cli来执行 shutdown 命令,即可停止 Redis 服务,
# 因为之前配置了密码,因此需要通过 -a 来指定密码
redis-cli -a 132537 shutdown
我们也可以通过配置来实现开机自启
首先,新建一个系统服务文件
vi /etc/systemd/system/redis.service
将以下命令粘贴进去
[Unit]
Description=redis-server
After=network.target
[Service]
Type=forking
ExecStart=/usr/local/bin/redis-server /usr/local/src/redis-6.2.6/redis.conf
PrivateTmp=true
[Install]
WantedBy=multi-user.target
然后重载系统服务
systemctl daemon-reload
现在,我们可以用下面这组命令来操作redis了
# 启动
systemctl start redis
# 停止
systemctl stop redis
# 重启
systemctl restart redis
# 查看状态
systemctl status redis
执行下面的命令,可以让redis开机自启
systemctl enable redis
通过Redis的中文文档学习:http://www.redis.cn/commands.html
通过菜鸟教程官网来学习:https://www.runoob.com/redis/redis-keys.html
Redis是一个key-value的数据库,key一般是String类型,不过value的类型多种多样
指令 | 描述 |
---|---|
keys | 查看符合模板的所有key,不建议在生产环境设备上使用 |
del | 删除一个指定的key |
exists | 判断key是否存在 |
expire | 给一个key设置有效期,有效期到期时该key会被自动删除 |
ttl | 查看一个KEY的剩余有效期,-1表示永不过期,-2表示已过期 |
type | 查看你的key是什么类型 |
unlink | 根据value选择非阻塞删除 |
select | 切换数据库 |
dbsize | 查看当前数据库的key的数量 |
flushdb | 清空当前库 |
flushall | 通杀全部库 |
可以通过help [command]
可以查看一个命令的具体用法!
String类型是Redis最基本的数据类型,一个Redis中字符串value最多可以是512M
String的常见命令
命令 | 描述 |
---|---|
set | 添加或者修改已经存在的一个String类型的键值对 |
get | 根据key获取String类型的value |
mset | 批量添加多个String类型的键值对 |
mget | 根据多个key获取多个String类型的value |
Strlen | 获得value的长度 |
incr/decr | 让一个整型的key自增/自减1 |
incrby/decrby | 让一个整型的key自增/自减,并指定步长,例如:incrby num 2 让num值自增2 |
incrbyfloat | 让一个浮点类型的数字自增并指定步长 |
setnx | 添加一个String类型的键值对,前提是这个key不存在,才执行 |
setex | 添加一个String类型的键值对,并且指定有效期 |
getrange | 获得value的范围,例如:getrange name 0 4 |
setrange | 从指定位置覆盖key存储的value,例如:setrange name 0 jian |
getset | 以新换旧,设置了新值同时获得旧值。例如:getset name xiaojian |
数据结构:String的数据结构为简单动态字符串(Simple Dynamic String,缩写SDS)。是可以修改的字符串,内部结构实现上类似于Java的ArrayList,采用预分配冗余空间的方式来减少内存的频繁分配.
hash是一个string类型的field和value的映射表,hash特别适合用于存储对象。类似Java里面的HashMap
每个 hash 可以存储 232 - 1 键值对(40多亿)。
Hash的常见命令
命令 | 描述 |
---|---|
hset key field value | 添加或者修改hash类型key的field的值 |
hget key field | 获取一个hash类型key的field的值 |
hmset | hmset 和 hset 效果相同 ,4.0之后hmset可以弃用了 |
hmget | 批量获取多个hash类型key的field的值 |
hgetall | 获取一个hash类型的key中的所有的field和value |
hkeys | 获取一个hash类型的key中的所有的field |
hvals | 获取一个hash类型的key中的所有的value |
hincrby | 让一个hash类型key的字段值自增并指定步长 |
hsetnx | 添加一个hash类型的key的field值,前提是这个field不存在,否则不执行 |
127.0.0.1:6379> hset userkey name "jack" age 18 birth "2004-07"
(integer) 3
127.0.0.1:6379> hgetall userkey
1) "name"
2) "jack"
3) "age"
4) "18"
5) "birth"
6) "2004-07"
数据结构
Hash类型对应的数据结构是两种:ziplist(压缩列表),hashtable(哈希表)。当field-value长度较短且个数较少时,使用ziplist,否则使用hashtable。
Redis中的List类型与Java中的LinkedList类似,可以看做是一个双向链表结构。既可以支持正向检索和也可以支持反向检索。
一个列表最多可以包含 232 - 1 个元素 (4294967295, 每个列表超过40亿个元素)。
特征也与LinkedList
类似:
常用来存储一个有序数据,例如:朋友圈点赞列表,评论列表等。
List的常见命令
命令 | 描述 |
---|---|
lpush key element … | 向列表左侧插入一个或多个元素 |
lpop key | 移除并返回列表左侧的第一个元素 |
rpush key element … | 向列表右侧插入一个或多个元素 |
rpop key | 移除并返回列表右侧的第一个元素 |
lrange key star end | 返回一段角标范围内的所有元素 |
blpop和brpop | 与LPOP和RPOP类似,只不过在没有元素时等待指定时间,而不是直接返回nil |
rpoplpush <key1><key2> | 从<key1>列表右边吐出一个值,插到<key2>列表左边。 |
lindex <key><index> | 按照索引下标获得元素(从左到右) |
llen <key> | 获得列表长度 |
lrem <key><n><value> | 从左边删除n个value |
lset <key><index><value> | 将列表key下标为index的值替换成value |
数据结构
List的数据结构为快速链表quickList和压缩链表ziplist。
当列表元素较少的情况下会使用一块连续的内存存储,这个结构是ziplist。它将所有的元素紧挨着一起存储,分配的是一块连续的内存。
当数据量比较多的时候才会改成quicklist。因为普通的链表需要的附加指针空间太大,会比较浪费空间。比如这个列表里存的只是int类型的数据,结构上还需要两个额外的指针prev和next。
将链表和ziplist结合起来组成了quicklist。也就是将多个ziplist使用双向指针串起来使用。这样既满足了快速的插入删除性能,又不会出现太大的空间冗余。
思考问题
Redis的Set是string类型的无序集合。它底层其实是一个value为null的hash表,与Java中的HashSet类似,因此具备与HashSet类似的特征
Set的常见命令有
命令 | 描述 |
---|---|
sadd key member … | 向set中添加一个或多个元素 |
srem key member … | 移除set中的指定元素 |
scard key | 返回set中元素的个数 |
sismember key member | 判断一个元素是否存在于set中 |
smembers key | 获取set中的所有元素 |
sinter key1 key2 … | 求key1与key2的交集 |
sdiff key1 key2 … | 求key1与key2的差集 |
sunion key1 key2 .. | 求key1和key2的并集 |
spop key | 随机从该集合中吐出一个值 |
srandmember <key><n> | 随机从该集合中取出n个值。不会从集合中删除 |
smove <source><des>value | 把集合中一个值从一个集合移动到另一个集合 |
交集、差集、并集图示
数据结构
Set数据结构是dict字典,字典是用哈希表实现的。Java中HashSet的内部实现使用的是HashMap,只不过所有的value都指向同一个对象。Redis的set结构也是一样,它的内部也使用hash结构,所有的value都指向同一个内部值。
Redis的ZSet(SortedSet)是一个可排序的set集合,与Java中的TreeSet有些类似,但底层数据结构却差别很大。SortedSet中的每一个元素都带有一个score属性,可以基于score属性对元素排序,底层的实现是一个跳表(SkipList)加 hash表。
SortedSet具备下列特性:
因为SortedSet的可排序特性,经常被用来实现排行榜这样的功能。
SortedSet的常见命令
命令 | 描述 |
---|---|
zadd key score member | 添加一个或多个元素到sorted set ,如果已经存在则更新其score值 |
zrem key member | 删除sorted set中的一个指定元素 |
zscore key member | 获取sorted set中的指定元素的score值 |
zrankkey member | 获取sorted set 中的指定元素的排名 |
zcard key | 获取sorted set中的元素个数 |
zcount key min max | 统计score值在给定范围内的所有元素的个数 |
zincrby key increment member | 让sorted set中的指定元素自增,步长为指定的increment值 |
zrange key min max | 按照score排序后,获取指定排名范围内的元素 |
zrangebyscore key min max | 按照score排序后,获取指定score范围内的元素 |
zdiff、zinter、zunion | 求差集、交集、并集 |
注意:所有的排名默认都是升序,如果要降序则在命令的Z后面添加REV
即可
数据结构
SortedSet(zset)是Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map<String, Double>,可以给每一个元素value赋予一个权重score,另一方面它又类似于TreeSet,内部的元素会按照权重score进行排序,可以得到每个元素的名次,还可以通过score的范围来获取元素的列表。
zset底层使用了两个数据结构
(1)hash,hash的作用就是关联元素value和权重score,保障元素value的唯一性,可以通过元素value找到相应的score值。
(2)跳跃表,跳跃表的目的在于给元素value排序,根据score的范围获取元素列表。
简介
Redis 提供了 Bitmaps 可以实现对位的操作,可以把 Bitmaps 想象成一个以位为单位的数组, 数组的每个单元只能存储 0 和 1, 数组的下标在 Bitmaps 中叫做偏移量。
Bitmaps的常见命令
BITFIELD key GET encoding offset|[OVERFLOW WRAP|SAT|FAIL] SET encoding offset value|INCRBY encoding offset increment
# GET查询 SET修改 INCRBY自增
# encoding 设置符号位和操作长度,u代表无符号,i代表有符号
# offset 偏移量,从第几位开始
# 查询bit数组中从0位开始的2位,返回10进制
BITFIELD key GET u2 0
# 例如数据是11100,则返回3
# 功能跟BITFIELD的查询功能一样
BITFIELD_RO key GET encoding offset [GET encoding offset ...]
实例1
每个独立用户是否访问过网站,结果存放在Bitmaps中, 将访问的用户记做1, 没有访问的用户记做0, 用偏移量作为用户的id。 假设现在有20个用户,userid=1, 6, 11, 15, 19的用户对网站进行了访问, 那么当前Bitmaps初始化结果如图
users:20220717代表2022-07-17这天的独立访问用户的Bitmaps
127.0.0.1:6379> setbit users:20220717 1 1
(integer) 0
127.0.0.1:6379> setbit users:20220717 6 1
(integer) 0
127.0.0.1:6379> setbit users:20220717 11 1
(integer) 0
127.0.0.1:6379> setbit users:20220717 15 1
(integer) 0
127.0.0.1:6379> setbit users:20220717 19 1
(integer) 0
获取id=6,8的用户是否在2022-07-17这天访问过, 返回0说明没有访问过,返回1说明访问过
127.0.0.1:6379> getbit users:20220717 6
(integer) 1
127.0.0.1:6379> getbit users:20220717 8
(integer) 0
计算2022-07-17这天的独立访问用户数量
127.0.0.1:6379> bitcount users:20220717
(integer) 5
start和end代表起始和结束字节数, 下面计算用户id在第1个字节到第3个字节之间的独立访问用户数, 对应的用户id是11, 15, 19。
127.0.0.1:6379> bitcount users:20220717 1 3
(integer) 3
举例: K1 【01000001 01000000 00000000 00100001】,对应【0,1,2,3】
实例2
2022-07-02 日访问网站的userid=1,2,5,9。
setbit users:20220702 1 1
setbit users:20220702 2 1
setbit users:20220702 5 1
setbit users:20220702 9 1
2022-07-03 日访问网站的userid=0,1,4,9。
setbit users:20220703 0 1
setbit users:20220703 1 1
setbit users:20220703 4 1
setbit users:20220703 9 1
计算出两天都访问过网站的用户数量
127.0.0.1:6379> bitop and users:and:20220702_03 users:20220702 users:20220703
(integer) 2
127.0.0.1:6379> bitcount users:and:20220702_03
(integer) 2
计算出任意一天都访问过网站的用户数量(例如月活跃就是类似这种) , 可以使用or求并集
127.0.0.1:6379> bitop or users:or:20220702_03 users:20220702 users:20220703
(integer) 2
127.0.0.1:6379> bitcount users:or:20220702_03
(integer) 6
Bitmaps与set对比
假设网站有1亿用户, 每天独立访问的用户有5千万, 如果每天用集合类型和Bitmaps分别存储活跃用户可以得到表
数据类型 | 每个用户id占用空间 | 需要存储的用户量 | 全部内存量 |
---|---|---|---|
集合类型 | 64位 | 5千万 | 64位*5千万 = 400MB |
Bitmaps | 1位 | 1亿 | 1位*1亿 = 12.5MB |
很明显, 这种情况下使用Bitmaps能节省很多的内存空间, 尤其是随着时间推移节省的内存是非常可观的
数据类型 | 一天 | 一个月 | 一年 |
---|---|---|---|
集合类型 | 400MB | 12GB | 144GB |
Bitmaps | 12.5MB | 375MB | 4.5GB |
但Bitmaps并不是万金油, 假如该网站每天的独立访问用户很少, 例如只有10万(大量的僵尸用户) , 那么两者的对比如下表所示, 很显然, 这时候使用Bitmaps就不太合适了, 因为基本上大部分位都是0。
数据类型 | 每个userid占用空间 | 需要存储的用户量 | 全部内存量 |
---|---|---|---|
集合类型 | 64位 | 10万 | 64位*10万 = 800KB |
Bitmaps | 1位 | 1亿 | 1位* 1亿 = 12.5MB |
简介
在工作当中,我们经常会遇到与统计相关的功能需求,比如统计网站PV(PageView页面访问量),可以使用Redis的incr、incrby轻松实现。但像UV(UniqueVisitor,独立访客)、独立IP数、搜索记录数等需要去重和计数的问题如何解决?这种求集合中不重复元素个数的问题称为基数问题。
解决基数问题有很多种方案:
以上的方案结果精确,但随着数据不断增加,导致占用空间越来越大,当数据集非常大时是不切实际的。
能否能够降低一定的精度来平衡存储空间?Redis推出了HyperLogLog
什么是基数?
比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}, 基数(不重复元素)为5。 基数估计就是在误差可接受的范围内,快速计算基数。
HyperLogLog的常见命令
127.0.0.1:6379> pfadd hll "redis"
(integer) 1
127.0.0.1:6379> pfadd hll "mysql"
(integer) 1
127.0.0.1:6379> pfadd hll "redis"
(integer) 0
127.0.0.1:6379> pfcount hll
(integer) 2
127.0.0.1:6379> pfadd hll2 "mongodb"
(integer) 1
127.0.0.1:6379> pfadd hll2 "redis"
(integer) 1
127.0.0.1:6379> pfcount hll hll2
(integer) 3
127.0.0.1:6379> pfmerge hllsum hll hll2
OK
127.0.0.1:6379> pfcount hllsum
(integer) 3
简介
Redis 3.2 中增加了对GEO类型的支持。GEO,Geographic,地理信息的缩写。该类型就是元素的2维坐标,在地图上就是经纬度。redis基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度Hash等常见操作。
命令
geoadd china:city 121.47 31.23 shanghai
geoadd china:city 106.50 29.53 chongqing 114.05 22.52 shenzhen 116.38 39.90 beijing
127.0.0.1:6379> geopos china:city shanghai
1) 1) "121.47000163793563843"
2) "31.22999903975783553"
127.0.0.1:6379> geodist china:city shanghai beijing km
"1068.1535"
127.0.0.1:6379> georadius china:city 110 30 1000 km
1) "chongqing"
2) "shenzhen"
<!--引入Jedis依赖-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.2.0</version>
</dependency>
<!--引入单元测试依赖-->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.8.2</version>
<scope>test</scope>
</dependency>
public class JedisTest {
public static void main(String[] args) {
// 获取连接
Jedis jedis = new Jedis("127.0.0.1",6379);
// 如果 Redis 服务设置了密码,需要下面这行,没有就不需要
// jedis.auth("123456");
// 选择库(默认是下标为0的库)
jedis.select(0);
System.out.println("连接成功");
//查看服务是否运行
System.out.println("服务正在运行: "+jedis.ping());
jedis.close();
}
}
Jedis-API: Key
jedis.set("k1", "v1");
jedis.set("k2", "v2");
jedis.set("k3", "v3");
Set<String> keys = jedis.keys("*");
System.out.println(keys.size());
for (String key : keys) {
System.out.println(key);
}
System.out.println(jedis.exists("k1"));
System.out.println(jedis.ttl("k1"));
System.out.println(jedis.get("k1"));
Jedis-API: String
jedis.mset("str1","v1","str2","v2","str3","v3");
System.out.println(jedis.mget("str1","str2","str3"));
Jedis-API: List
List<String> list = jedis.lrange("mylist",0,-1);
for (String element : list) {
System.out.println(element);
}
Jedis-API: set
jedis.sadd("orders", "order01");
jedis.sadd("orders", "order02");
jedis.sadd("orders", "order03");
jedis.sadd("orders", "order04");
Set<String> smembers = jedis.smembers("orders");
for (String order : smembers) {
System.out.println(order);
}
jedis.srem("orders", "order02");
Jedis-API: hash
jedis.hset("hash1","userName","lisi");
System.out.println(jedis.hget("hash1","userName"));
Map<String,String> map = new HashMap<String,String>();
map.put("telphone","13735679666");
map.put("address","beijing");
map.put("email","abc@163.com");
jedis.hmset("hash2",map);
List<String> result = jedis.hmget("hash2", "telphone","email");
for (String element : result) {
System.out.println(element);
}
Jedis-API: zset
jedis.zadd("zset01", 100d, "z3");
jedis.zadd("zset01", 90d, "l4");
jedis.zadd("zset01", 80d, "w5");
jedis.zadd("zset01", 70d, "z6");
List<String> zrange = jedis.zrange("zset01", 0, -1);
for (String e : zrange) {
System.out.println(e);
}
Jedis本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,因此推荐大家使用Jedis连接池代替Jedis的直连方式
public class JedisConnectionFactory {
private static final JedisPool jedisPool;
static {
//配置连接池
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(8);
jedisPoolConfig.setMaxIdle(8);
jedisPoolConfig.setMinIdle(0);
jedisPoolConfig.setMaxWaitMillis(200);
//创建连接池对象
jedisPool = new JedisPool(jedisPoolConfig,"127.0.0.1",6379,1000,"132537");
}
public static Jedis getJedis(){
return jedisPool.getResource();
}
}
要求: 1、输入手机号,点击发送后随机生成6位数字码,2分钟有效 2、输入验证码,点击验证,返回成功或失败 3、每个手机号每天只能输入3次
思路:
生成六位的验证码:
//1.生成6位数字验证码
public static String getCode() {
Random random = new Random();
String code = "";
for(int i=0;i<6;i++) {
int rand = random.nextInt(10);
code += rand;
}
return code;
}
验证码只能发送三次:
//2 每个手机每天只能发送三次,验证码放到redis中,设置过期时间120
public static void verifyCode(String phone) {
//连接redis
Jedis jedis = new Jedis("127.0.0.1",6379);
//拼接key
//手机发送次数key
String countKey = "VerifyCode"+phone+":count";
//验证码key
String codeKey = "VerifyCode"+phone+":code";
//每个手机每天只能发送三次
String count = jedis.get(countKey);
if(count == null) {
//没有发送次数,第一次发送
//设置发送次数是1
jedis.setex(countKey,24*60*60,"1");
} else if(Integer.parseInt(count)<=2) {
//发送次数+1
jedis.incr(countKey);
} else if(Integer.parseInt(count)>2) {
//发送三次,不能再发送
System.out.println("今天发送次数已经超过三次");
jedis.close();
}
//发送验证码放到redis里面
String vcode = getCode();
jedis.setex(codeKey,120,vcode);//120秒
jedis.close();
}
判断验证码是否一致:
//3 验证码校验
public static void judgeCode(String phone,String code) {
//从redis获取验证码
Jedis jedis = new Jedis("127.0.0.1",6379);
//验证码key
String codeKey = "VerifyCode"+phone+":code";
String redisCode = jedis.get(codeKey);
//判断
if(redisCode.equals(code)) {
System.out.println("成功");
}else {
System.out.println("失败");
}
jedis.close();
}
完整功能代码展示
public class PhoneCode {
public static void main(String[] args) {
//模拟验证码发送
verifyCode("13678765435");
//模拟验证码校验
//judgeCode("13678765435","217173");
}
//3 验证码校验
public static void judgeCode(String phone,String code) {
//从redis获取验证码
Jedis jedis = new Jedis("127.0.0.1",6379);
//验证码key
String codeKey = "VerifyCode"+phone+":code";
String redisCode = jedis.get(codeKey);
//判断
if(redisCode.equals(code)) {
System.out.println("成功");
}else {
System.out.println("失败");
}
jedis.close();
}
//2 每个手机每天只能发送三次,验证码放到redis中,设置过期时间120
public static void verifyCode(String phone) {
//连接redis
Jedis jedis = new Jedis("127.0.0.1",6379);
//拼接key
//手机发送次数key
String countKey = "VerifyCode"+phone+":count";
//验证码key
String codeKey = "VerifyCode"+phone+":code";
//每个手机每天只能发送三次
String count = jedis.get(countKey);
if(count == null) {
//没有发送次数,第一次发送
//设置发送次数是1
jedis.setex(countKey,24*60*60,"1");//有效期1天
} else if(Integer.parseInt(count)<=2) {
//发送次数+1
jedis.incr(countKey);
} else if(Integer.parseInt(count)>2) {
//发送三次,不能再发送
System.out.println("今天发送次数已经超过三次");
jedis.close();
return;//超过三次之后就会自动退出不会再发送了,不添加这一行,即使显示发送次数,但还会有验证码还是会改变
}
//发送验证码放到redis里面
String vcode = getCode();//调用生成的验证码
jedis.setex(codeKey,120,vcode);//设置生成的验证码只有120秒的时间
jedis.close();
}
//1 生成6位数字验证码,code是验证码
public static String getCode() {
Random random = new Random();
String code = "";
for(int i=0;i<6;i++) {
int rand = random.nextInt(10);
code += rand;
}
return code;
}
}
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--连接池依赖-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
application.yml 版本
spring:
redis:
host: 127.0.0.1 #指定redis所在的host
port: 6379 #指定redis的端口
password: 123456 #设置redis密码
lettuce:
pool:
max-active: 8 #最大连接数
max-idle: 8 #最大空闲数
min-idle: 0 #最小空闲数
max-wait: 100ms #连接等待时间
application.properties 版本
#Redis服务器地址
spring.redis.host=127.0.0.1
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database= 0
#连接超时时间(毫秒)
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
@SpringBootTest
class SpringRedisApplicationTests {
@Resource
private RedisTemplate redisTemplate;
@Test
void testString() {
// 1.通过RedisTemplate获取操作String类型的ValueOperations对象
ValueOperations ops = redisTemplate.opsForValue();
// 2.插入一条数据
ops.set("name","jianjian");
// 3.获取数据
String name = (String) ops.get("name");
System.out.println("name = " + name);
}
}
RedisTemplate可以接收任意Object作为值写入Redis,只不过写入前会把Object序列化为字节形式,
默认是采用JDK序列化
,得到的结果是这样的
缺点:
那么如何解决以上的问题呢?我们可以通过自定义RedisTemplate序列化的方式来解决。
编写一个配置类RedisConfig
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
// 1.创建RedisTemplate对象
RedisTemplate<String ,Object> redisTemplate = new RedisTemplate<>();
// 2.设置连接工厂
redisTemplate.setConnectionFactory(factory);
// 3.创建序列化对象
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
// 4.设置key和hashKey采用String的序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
// 5.设置value和hashValue采用json的序列化方式
redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);
return redisTemplate;
}
}
此时我们已经将RedisTemplate的key设置为String序列化
,value设置为Json序列化
的方式,再来执行方法测试
由于我们设置的value序列化方式是Json的,因此我们可以直接向redis中插入一个对象
@Test
void testSaveUser() {
redisTemplate.opsForValue().set("user:100", new User("Vz", 21));
User user = (User) redisTemplate.opsForValue().get("user:100");
System.out.println("User = " + user);
}
尽管Json序列化可以满足我们的需求,但是依旧存在一些问题。
如上图所示,为了在反序列化时知道对象的类型,JSON序列化器会将类的class类型写入json结果中,存入Redis,会带来额外的内存开销。
那么我们如何解决这个问题呢?我们可以通过下文的StringRedisTemplate
来解决这个问题。
为了节省内存空间,我们并不会使用JSON序列化器来处理value,而是统一使用String序列化器,要求只能存储String类型的key和value。当需要存储Java对象时,手动完成对象的序列化和反序列化。
Spring默认提供了一个StringRedisTemplate类,它的key和value的序列化方式默认就是String方式。省去了我们自定义RedisTemplate的过程
编写一个测试类使用StringRedisTemplate来执行以下方法
@SpringBootTest
class RedisStringTemplateTest {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Test
void testSaveUser() throws JsonProcessingException {
// 1.创建一个Json序列化对象
ObjectMapper objectMapper = new ObjectMapper();
// 2.将要存入的对象通过Json序列化对象转换为字符串
String userJson1 = objectMapper.writeValueAsString(new User("Vz", 21));
// 3.通过StringRedisTemplate将数据存入redis
stringRedisTemplate.opsForValue().set("user:100",userJson1);
// 4.通过key取出value
String userJson2 = stringRedisTemplate.opsForValue().get("user:100");
// 5.由于取出的值是String类型的Json字符串,因此我们需要通过Json序列化对象来转换为java对象
User user = objectMapper.readValue(userJson2, User.class);
// 6.打印结果
System.out.println("user = " + user);
}
}
执行完毕回到Redis的图形化客户端查看结果
从输入 Multi 命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入 Exec 后,Redis 会将之前的命令队列中的命令依次执行。组队的过程中可以通过discard来放弃组队。
127.0.0.1:6379[1]> multi
OK
127.0.0.1:6379[1](TX)> set k1 v1
QUEUED
127.0.0.1:6379[1](TX)> set k2 v2
QUEUED
127.0.0.1:6379[1](TX)> exec
1) OK
2) OK
组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消。
127.0.0.1:6379[1]> multi
OK
127.0.0.1:6379[1](TX)> set m1 v1
QUEUED
127.0.0.1:6379[1](TX)> set m2
(error) ERR wrong number of arguments for 'set' command
127.0.0.1:6379[1](TX)> set m3 v3
QUEUED
127.0.0.1:6379[1](TX)> exec
(error) EXECABORT Transaction discarded because of previous errors.
如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。
127.0.0.1:6379[1]> multi
OK
127.0.0.1:6379[1](TX)> set m1 v1
QUEUED
127.0.0.1:6379[1](TX)> incr m1 #错误语句
QUEUED
127.0.0.1:6379[1](TX)> set m2 v2
QUEUED
127.0.0.1:6379[1](TX)> exec
1) OK
2) (error) ERR value is not an integer or out of range
3) OK
想想一个场景:有很多人有你的账户,同时去参加双十一抢购
一个请求想给金额减8000
一个请求想给金额减5000
一个请求想给金额减1000
这样就会导致事务冲突,如何解决呢?
悲观锁(Pessimistic Lock), 每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会 block 直到它拿到锁。
悲观锁可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等
乐观锁(Optimistic Lock),每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。
乐观锁:会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过。
乐观锁的典型代表:就是CAS(Compare And Swap),利用cas进行无锁化机制加锁,var5 是操作前读取的内存值,while中的var1+var2 是预估值,如果预估值 == 内存值,则代表中间没有被人修改过,此时就将新值去替换内存值,其中do while 是为了在操作失败时,再次进行自旋操作,即把之前的逻辑再操作一次。
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
在执行 multi 之前,先执行 watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。
127.0.0.1:6379[1]> watch balence
OK
127.0.0.1:6379[1]> multi
OK
127.0.0.1:6379[1](TX)> decrby balence 10
QUEUED
127.0.0.1:6379[1](TX)> incrby debt 10
QUEUED
127.0.0.1:6379[1](TX)> exec
1) (integer) -10
2) (integer) 10
unwatch 取消 WATCH 命令对所有 key 的监视。如果在执行 WATCH 命令之后,EXEC 命令或DISCARD 命令先被执行了的话,那么就不需要再执行 UNWATCH 了。
RDB(Redis DataBase) 在指定的时间间隔内将内存中的数据集快照写入磁盘, 也就是 Snapshot 快照,它恢复时是将快照文件直接读到内存里
整个过程中,主进程是不进行任何 IO 操作的,这就确保了极高的性能,RDB的缺点是最后一次持久化后的数据可能丢失。
a>配置文件中默认的快照配置
其中 save 3600 1
的含义是:每 3600 秒时,至少有 1 个 key 变化,则触发RDB;save 300 10和save 60 10000同理。
b>通过 save
或 bgsave
命令触发RDB策略
127.0.0.1:6379>save
127.0.0.1:6379>bgsave
可以通过lastsave 命令获取最后一次成功执行快照的时间
127.0.0.1:6379>lastsave
save
bgsave
优点
缺点
AOF(Append Only File):以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录), 只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作
AOF采用文件追加方式,文件会越来越大为避免出现此种情况,新增了重写机制, 当AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩, 只保留可以恢复数据的最小指令集.可以使用命令bgrewriteaof
优点
缺点
AOF和RDB同时开启,redis听谁的?
AOF和RDB同时开启,系统默认取AOF的数据(数据不会存在丢失)
AOF和RDB用哪个好?
官方推荐两个都启用。
如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那 RDB 方式要比AOF方式更加的高效。不建议单独用 AOF,因为可能会出现Bug。如果只是做纯内存缓存,可以都不用。
主机(master)数据更新后根据配置和策略, 自动同步到备/从机(slaver)的机制,Master 以写为主,Slaver 以读为主
主从复制原理
模拟三台 redis 服务器(6379、6380、6381),主机(6379)从机(6380、6381) 我的 redis 配置文件路径: /opt/homebrew/etc/redis.conf
$ pwd
/Users/jianjian
$ mkdir myredis
$ cd /myredis
$ pwd
/Users/jianjian/myredis
$ cp /opt/homebrew/etc/redis.conf ~/myredis/redis.conf
$ vi redis6379.conf
include /Users/jianjian/myredis/redis.conf
pidfile /var/run/redis_6379.pid
port 6379
dbfilename dump6379.rdb
$ vi redis6380.conf
include /Users/jianjian/myredis/redis.conf
pidfile /var/run/redis_6380.pid
port 6380
dbfilename dump6380.rdb
$ vi redis6381.conf
include /Users/jianjian/myredis/redis.conf
pidfile /var/run/redis_6381.pid
port 6381
dbfilename dump6381.rdb
$ redis-server redis6379.conf
$ redis-server redis6380.conf
$ redis-server redis6381.conf
$ ps -ef | grep redis
501 85018 80912 0 3:54PM ttys000 0:00.60 redis-server 127.0.0.1:6380
501 86115 85019 0 3:56PM ttys001 0:00.49 redis-server 127.0.0.1:6379
501 86631 86116 0 3:57PM ttys002 0:00.35 redis-server 127.0.0.1:6381
$ redis-cli -p 6379
127.0.0.1:6379> info replication
# Replication
role:master
connected_slaves:0
......
$ redis-cli -p 6380
127.0.0.1:6380> info replication
# Replication
role:master
connected_slaves:0
......
$ redis-cli -p 6381
127.0.0.1:6381> info replication
# Replication
role:master
connected_slaves:0
......
slaveof <ip><port>
成为某个实例的从服务器# 在6380和6381上执行: slaveof 127.0.0.1 6379
#变为从机
$ redis-cli -p 6380
127.0.0.1:6380>slaveof 127.0.0.1 6379
127.0.0.1:6380> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
......
#变为从机
$ redis-cli -p 6381
127.0.0.1:6381>slaveof 127.0.0.1 6379
127.0.0.1:6381> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
# 主机写入数据
$ redis-cli -p 6379
127.0.0.1:6379> set k1 v1
127.0.0.1:6379> keys *
1) "k1"
# 从机可以读取数据
$ redis-cli -p 6380
127.0.0.1:6380> keys *
1) "k1"
# 从机可以读取数据
$ redis-cli -p 6381
127.0.0.1:6381> get k1
1) "v1"
注:如果主机挂掉,重启就行,一切如初。从机重启需重设:slaveof 127.0.0.1 6379
可以将配置增加到文件中。永久生效。
Q&A
slave1、slave2是从头开始复制还是从切入点开始复制?比如从k4进来,那之前的k1,k2,k3是否也可以复制?
从头开始复制;可以
从机是否可以写?set可否?
不可以
主机shutdown后情况如何?从机是上位还是原地待命?
主机挂掉,从机原地待命
主机又回来了后,主机新增记录,从机还能否顺利复制?
可以
其中一台从机down后情况如何?依照原有它能跟上大部队吗?
从机挂掉重启后需要重设:slaveof 127.0.0.1 6379
上一个 Slave 可以是下一个 slave 的Master,slave 同样可以接收其他 slave 的连接和同步请求,那么该 slave 作为了链条中下一个的 master,可以有效减轻 master 的写压力,去中心化降低风险。
$ redis-cli -p 6380
127.0.0.1:6380>slaveof 127.0.0.1 6379
$ redis-cli -p 6381
127.0.0.1:6381>slaveof 127.0.0.1 6380
即 主机挂掉之后,备机上位成为主机,保证服务正常进行
当一个 master 宕机后,后面的 slave 可以立刻升为 master,其后面的 slave 不用做任何修改。用 slaveof no one
将从机变为主机。
# master 宕机
$ redis-cli -p 6379
127.0.0.1:6379> shutdown
# slave1升为master
$ redis-cli -p 6380
127.0.0.1:6380> slave no one
127.0.0.1:6380> set k2 v2
$ redis-cli -p 6381
127.0.0.1:6381> get k2
"v2"
反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从机转换为主机
配置哨兵
$ vi sentinel.conf
sentinel monitor mymaster 127.0.0.1 6379 1
其中 mymaster 为监控对象起的服务器名称, 1 为至少有多少个哨兵同意迁移的数量。
启动哨兵
$ redis-sentinel sentinel.conf
当主机挂掉,从机选举中产生新的主机
$ 127.0.0.1:6379> shutdown
这里显示6380变成了主机,原主机6379重启后会变为从机。
哪个从机会被选举为主机呢?根据优先级别:replica-priority
Redis 集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。Redis 集群通过分区(partition)来提供一定程度的可用性(availability): 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。
Redis 集群提供了以下好处
Redis 集群的不足
模拟6台 redis 服务器,6379、6380、6381、6389、6390、6391 主机(6379、6380、6381);从机(6389、6390、6391),将他们加入一个集群中
$ vi redis6379.conf
include /Users/jianjian/myredis/redis.conf
pidfile /var/run/redis_6379.pid
port 6379
dbfilename dump6379.rdb
## cluster配置修改
dir "/Users/jianjian/myredis/redis_cluster"
logfile "/Users/jianjian/myredis/redis_cluster/redis_err_6379.log"
# 打开集群模式
cluster-enabled yes
# 设定节点配置文件名
cluster-config-file nodes-6379.conf
# 设定节点失联时间,超过该时间(毫秒),集群自动进行主从切换
cluster-node-timeout 15000
$ cp redis6379.conf redis6380.conf
$ cp redis6379.conf redis6381.conf
$ cp redis6379.conf redis6389.conf
$ cp redis6379.conf redis6390.conf
$ cp redis6379.conf redis6391.conf
#将redis6380.conf中的6379替换为6380
$ vi redis6380.conf
:%s/6379/6380
合成之前,请确保所有redis实例启动后,nodes-xxxx.conf文件都生成正常。
$ redis-cli --cluster create --cluster-replicas 1 192.168.11.101:6379 192.168.11.101:6380 192.168.11.101:6381 192.168.11.101:6389 192.168.11.101:6390 192.168.11.101:6391
此处不要用127.0.0.1, 请用真实IP地址;--cluster-replicas 1
表示采用最简单的方式配置集群,一台主机,一台从机,正好三组。
$ redis-cli -c -p 6379
127.0.0.1:6379> cluster nodes
什么是slots
一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个, 集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。
集群中的每个节点负责处理一部分插槽。 举个例子, 如果一个集群可以有主节点, 其中:
节点 A 负责处理 0 号至 5460 号插槽。
节点 B 负责处理 5461 号至 10922 号插槽。
节点 C 负责处理 10923 号至 16383 号插槽。
在集群中录入值
在redis-cli每次录入、查询键值,redis都会计算出该key应该送往的插槽 slot ,如果不是该客
户端对应服务器的插槽,redis会报错,并告知应前往的redis实例地址和端口。
redis-cli客户端提供了 –c 参数实现自动重定向。如 redis-cli -c –p 6379 登入后,再录入、查询键值对可以自动重定向。
CLUSTER GETKEYSINSLOT <slot><count>
返回 count 个 slot 槽中的键
如果主节点下线?从节点能否自动升为主节点?注意:15秒超时
…
主节点恢复后,主从关系会如何?
主节点回来变成从机。
如果所有某一段插槽的主从节点都宕掉,redis服务是否还能继续?
即使连接的不是主机,集群会自动切换主机存储。主机写,从机读。
无中心化主从集群。无论从哪台主机写的数据,其他主机上都能读到数据。
public class JedisClusterTest {
public static void main(String[] args) {
Set<HostAndPort>set =new HashSet<HostAndPort>();
set.add(new HostAndPort("192.168.31.211",6379));
JedisCluster jedisCluster=new JedisCluster(set);
jedisCluster.set("k1", "v1");
System.out.println(jedisCluster.get("k1"));
}
}
问题描述
key 对应的数据在数据源并不存在,每次针对此 key 的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户 id 获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。
一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。
解决方案
问题描述
key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端 DB 压垮。
key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。
解决方案
问题描述
key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。缓存雪崩与缓存击穿的区别在于这里针对很多 key 缓存,前者则是某一个 key。
缓存失效时的雪崩效应对底层系统的冲击非常可怕!
解决方案
问题描述
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
解决方案
分布式锁主流的实现方案:
每一种分布式锁解决方案都有各自的优缺点:
这里,我们就基于redis实现分布式锁。
$ 127.0.0.1:6379> set key value EX second
$ 127.0.0.1:6379> set sku:1:info “OK” NX PX 10000
@GetMapping("testLock")
public void testLock(){
//1获取锁,setne
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
//2获取锁成功、查询num的值
if(lock){
Object value = redisTemplate.opsForValue().get("num");
//2.1判断num为空return
if(StringUtils.isEmpty(value)){
return;
}
//2.2有值就转成成int
int num = Integer.parseInt(value+"");
//2.3把redis的num加1
redisTemplate.opsForValue().set("num", ++num);
//2.4释放锁,del
redisTemplate.delete("lock");
}else{
//3获取锁失败、每隔0.1秒再获取
try {
Thread.sleep(100);
testLock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
优化之设置锁的过期时间
问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放
解决:设置过期时间,自动释放锁。
设置过期时间有两种方式:
优化之UUID防误删
场景:如果业务逻辑的执行时间是7s。执行流程如下
解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁
优化之LUA脚本保证删除的原子性
问题:删除操作缺乏原子性。
场景:
uuid=v1
set(lock,uuid);
在redis中没有了lock,没有了锁。
index2线程获取到了cpu的资源,开始执行方法
uuid=v2
set(lock,uuid);
index1 因为已经在方法中了,所以不需要重新上锁。index1有执行的权限。index1已经比较完成了,这个时候,开始执行
删除的index2的锁!
@GetMapping("testLockLua")
public void testLockLua() {
//1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
String uuid = UUID.randomUUID().toString();
//2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
String skuId = "25"; // 访问skuId 为25号的商品 100008348542
String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
// 3 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
// 第一种: lock 与过期时间中间不写任何的代码。
// redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
// 如果true
if (lock) {
// 执行的业务逻辑开始
// 获取缓存中的num 数据
Object value = redisTemplate.opsForValue().get("num");
// 如果是空直接返回
if (StringUtils.isEmpty(value)) {
return;
}
// 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
int num = Integer.parseInt(value + "");
// 使num 每次+1 放入缓存
redisTemplate.opsForValue().set("num", String.valueOf(++num));
/*使用lua脚本来锁*/
// 定义lua 脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 使用redis执行lua执行
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
} else {
// 其他线程等待
try {
// 睡眠
Thread.sleep(1000);
// 睡醒了之后,调用方法。
testLockLua();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
总结
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
同一时间只有一个人有锁,而且开锁解锁都是同一个人,不会死锁
Redis ACL是Access Control List(访问控制列表)的缩写,该功能允许根据可以执行的命令和可以访问的键来限制某些连接。
在Redis 5版本之前,Redis 安全规则只有密码控制 还有通过 rename 来调整高危命令比如 flushdb , KEYS* , shutdown 等。Redis 6 则提供ACL的功能对用户进行更细粒度的权限控制 :
(1)接入权限:用户名和密码
(2)可以执行的命令
(3)可以操作的 KEY
acl list
展现用户权限列表acl cat
查看添加权限指令类别,加参数类型名可以查看类型下具体命令acl whoami
查看当前用户acl setuser
创建和编辑用户ACL下面是有效ACL规则的列表。某些规则只是用于激活或删除标志,或对用户ACL执行给定更改的单个单词。其他规则是字符前缀,它们与命令或类别名称、键模式等连接在一起。
acl setuser 用户名
acl setuser user2 on >password ~cached:* +get
auth user2 password
Redis 6终于支持多线程了,告别单线程了吗?
IO 多线程其实指客户端交互部分的网络IO交互处理模块多线程,而非执行命令多线程。Redis6执行命令依然是单线程。
原理架构
Redis 6 加入多线程,但跟 Memcached 这种从 IO处理到数据访问多线程的实现模式有些差异。Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程。之所以这么设计是不想因为多线程而变得复杂,需要去控制 key、lua、事务,LPUSH/LPOP 等等的并发问题。整体的设计大体如下:
另外,多线程IO默认也是不开启的,需要在配置文件中配置
io-threads-do-reads yes
io-threads 4
之前老版 Redis 想要搭集群需要单独安装 ruby 环境,Redis 5 将 redis-trib.rb 的功能集成到 redis-cli 。另外官方 redis-benchmark 工具开始支持 cluster 模式了,通过多线程的方式对多个分片进行压测。
1、RESP3新的 Redis 通信协议:优化服务端与客户端之间通信
2、Client side caching客户端缓存:基于 RESP3 协议实现的客户端缓存功能。为了进一步提升缓存的性能,将客户端经常访问的数据cache到客户端。减少TCP网络交互。
3、Proxy集群代理模式:Proxy 功能,让 Cluster 拥有像单实例一样的接入方式,降低大家使用cluster的门槛。不过需要注意的是代理不改变 Cluster 的功能限制,不支持的命令还是不会支持,比如跨 slot 的多Key操作。
4、Modules API:Redis 6中模块API开发进展非常大,因为Redis Labs为了开发复杂的功能,从一开始就用上Redis模块。Redis可以变成一个框架,利用Modules来构建不同系统,而不需要从头开始写然后还要BSD许可。Redis一开始就是一个向编写各种系统开放的平台。