马上加入IBC程序猿 各种源码随意下,各种教程随便看! 注册 每日签到 加入编程讨论群

C#教程 ASP.NET教程 C#视频教程程序源码享受不尽 C#技术求助 ASP.NET技术求助

【源码下载】 社群合作 申请版主 程序开发 【远程协助】 每天乐一乐 每日签到 【承接外包项目】 面试-葵花宝典下载

官方一群:

官方二群:

Redis实战篇

[复制链接]
查看2958 | 回复2 | 2019-10-24 09:50:36 | 显示全部楼层 |阅读模式

Redis实战篇

1 Redis 客户端

1.1 客户端通讯 原理

客户端和服务器通过 TCP 毗连来举行数据交互, 服务器默认的端标语为 6379 。
客户端和服务器发送的下令或数据一律以 \r\n (CRLF 回车+换行)末端。

假如使用 wireshark 对 jedis 抓包:
情况:Jedis 毗连到虚拟机 202,运行 main,对 VMnet8 抓包。
过滤条件:ip.dst==192.168.8.202 and tcp.port in {6379}
set qingshan 抓包:

095036ahwrv6h386hlldrn.png

可以看到现实发出的数据包是:

  1. <code>*3\r\n$3\r\nSET\r\n$8\r\nqingshan\r\n$4\r\n2673\r\n</code>
复制代码

get qingshan 抓包:

095037tdguzuz62gygdggg.png

  1. <code>*2\r\n$3\r\nGET\r\n$8\r\nqingshan\r\n</code>
复制代码

客户端跟 Redis 之间 使用一种特殊的编码格式(在 AOF 文件内里我们看到了),叫做 Redis Serialization Protocol (Redis 序列化协议)。特点:轻易实现、分析快、可读性强。客户端发给服务端的消息必要经过编码,服务端收到之后会按约定举行解码,反之亦然。

基于此,我们可以自己实现一个 Redis 客户端。
参考:myclient.MyClient.java
1、创建 Socket 毗连
2、OutputStream 写入数据(发送到服务端)
3、InputStream 读取数据(从服务端接口)
基于这种协议,我们可以用 Java 实现所有的 Redis 操作下令。当然,我们不必要这么做,因为已经有很多比较成熟的 Java 客户端,实现了完备的功能和高级特性,而且提供了良好的性能。

https://redis.io/clients#java
官网保举的 Java 客户端有 3 个 Jedis,Redisson 和 Luttuce。

客户端 描述
Jedis A blazingly small and sane redis java client
lettuce Advanced Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel,Pipelining, and codecs
Redisson distributed and scalable Java data structures on top of Redis server

Spring 毗连 Redis 用的是什么?RedisConnectionFactory 接口支持多种实现,比方 : JedisConnectionFactory 、 JredisConnectionFactory 、LettuceConnectionFactory、SrpConnectionFactory。

1.2 Jedis

https://github.com/xetorthio/jedis

1.2.1 特点

Jedis 是我们最认识和最常用的客户端。轻量,简便,便于集成和改造。

  1. <code>public static void main(String[] args) {
  2. Jedis jedis = new Jedis("127.0.0.1", 6379);
  3. jedis.set("qingshan", "2673");
  4. System.out.println(jedis.get("qingshan"));
  5. jedis.close();
  6. }</code>
复制代码

Jedis 多个线程使用一个毗连的时间线程不安全。可以使用毗连池,为每个请求创建差别的毗连,基于 Apache common pool 实现。跟数据库一样,可以设置最大毗连数等参数。Jedis 中有多种毗连池的子类。

095037ccic4bu15i6fmicn.png

比方:

  1. <code>public class ShardingTest {
  2. public static void main(String[] args) {
  3. JedisPoolConfig poolConfig = new JedisPoolConfig();
  4. // Redis服务器
  5. JedisShardInfo shardInfo1 = new JedisShardInfo("127.0.0.1", 6379);
  6. JedisShardInfo shardInfo2 = new JedisShardInfo("192.168.8.205", 6379);
  7. // 毗连池
  8. List<JedisShardInfo> infoList = Arrays.asList(shardInfo1, shardInfo2);
  9. ShardedJedisPool jedisPool = new ShardedJedisPool(poolConfig, infoList);
  10. ShardedJedis jedis = null;
  11. try{
  12. jedis = jedisPool.getResource();
  13. for(int i=0; i<100; i++){
  14. jedis.set("k"+i, ""+i);
  15. }
  16. for(int i=0; i<100; i++){
  17. Client client = jedis.getShard("k"+i).getClient();
  18. System.out.println("取到值:"+jedis.get("k"+i)+","+"当前key位于:" + client.getHost() + ":" + client.getPort());
  19. }
  20. }finally{
  21. if(jedis!=null) {
  22. jedis.close();
  23. }
  24. }
  25. }
  26. }</code>
复制代码

Jedis 有 4 种工作模式:单节点、分片、哨兵、集群。
3 种请求模式:Client、Pipeline、事务。Client 模式就是客户端发送一个下令,壅闭等待服务端实行,然后读取 返回结果。Pipeline 模式是一次性发送多个下令,末了一次取回所有的返回结果,这种模式通过减少网络的往返时间和 io 读写次数,大幅度进步通讯性能。第三种是事务模式。Transaction 模式即开启 Redis 的事务管理,事务模式开启后,所有的下令(除了 exec,discard,multi 和 watch)到达服务端以后不会立即实行,会进入一个等待队列。

1.2.2 Sentinel 获取 毗连原理

题目:Jedis 毗连 Sentinel 的时间,我们配置的是全部哨兵的地点。Sentinel 是如何返回可用的 master 地点的呢?

在构造方法中:

  1. <code>pool = new JedisSentinelPool(masterName, sentinels);</code>
复制代码

调用了:

  1. <code>HostAndPort master = initSentinels(sentinels, masterName);</code>
复制代码

查看:

  1. <code>private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
  2. HostAndPort master = null;
  3. boolean sentinelAvailable = false;
  4. log.info("Trying to find master from available Sentinels...");
  5. // 有多个 sentinels,遍历这些个 sentinels
  6. for (String sentinel : sentinels) {
  7. // host:port 表现的 sentinel 地点转化为一个 HostAndPort 对象。
  8. final HostAndPort hap = HostAndPort.parseString(sentinel);
  9. log.fine("Connecting to Sentinel " + hap);
  10. Jedis jedis = null;
  11. try {
  12. // 毗连到 sentinel
  13. jedis = new Jedis(hap.getHost(), hap.getPort());
  14. // 根据 masterName 得到 master 的地点,返回一个 list,host= list[0], port =// list[1]
  15. List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
  16. // connected to sentinel...
  17. sentinelAvailable = true;
  18. if (masterAddr == null || masterAddr.size() != 2) {
  19. log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + ".");
  20. continue;
  21. }
  22. // 假如在任何一个 sentinel 中找到了 master,不再遍历 sentinels
  23. master = toHostAndPort(masterAddr);
  24. log.fine("Found Redis master at " + master);
  25. break;
  26. } catch (JedisException e) {
  27. // resolves #1036, it should handle JedisException there&#39;s another chance
  28. // of raising JedisDataException
  29. log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e
  30. + ". Trying next one.");
  31. } finally {
  32. if (jedis != null) {
  33. jedis.close();
  34. }
  35. }
  36. }
  37. // 到这里,假如 master 为 null,则说明有两种情况,一种是所有的 sentinels 节点都 down 掉了,一种是 master节点没有被存活的 sentinels 监控到
  38. if (master == null) {
  39. if (sentinelAvailable) {
  40. // can connect to sentinel, but master name seems to not
  41. // monitored
  42. throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored...");
  43. } else {
  44. throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running...");
  45. }
  46. }
  47. // 假如走到这里,说明找到了 master 的地点
  48. log.info("Redis master running at " + master + ", starting Sentinel listeners...");
  49. // 启动对每个 sentinels 的监听为每个 sentinel 都启动了一个监听者 MasterListener。MasterListener 自己是一个线程,它会去订阅 sentinel 上关于 master 节点地点改变的消息。
  50. for (String sentinel : sentinels) {
  51. final HostAndPort hap = HostAndPort.parseString(sentinel);
  52. MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
  53. // whether MasterListener threads are alive or not, process can be stopped
  54. masterListener.setDaemon(true);
  55. masterListeners.add(masterListener);
  56. masterListener.start();
  57. }
  58. return master;
  59. }</code>
复制代码

1.2.3 Cluster 获取 毗连原理

题目:使用 Jedis 毗连 Cluster 的时间,我们只必要毗连到任意一个或者多个 redisgroup 中的实例地点,那我们是怎么获取到必要操作的 Redis Master 实例的?
关键题目:在于如何存储 slot 和 Redis 毗连池的关系。
1、步调启动初始化集群情况,读取配置文件中的节点配置,无论是主从,无论多少个,只拿第一个,获取 redis 毗连实例(背面有个 break)。

  1. <code>// redis.clients.jedis.JedisClusterConnectionHandler#initializeSlotsCache
  2. private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password)
  3. {
  4. for (HostAndPort hostAndPort : startNodes) {
  5. // 获取一个 Jedis 实例
  6. Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
  7. if (password != null) {
  8. jedis.auth(password);
  9. }
  10. try {
  11. // 获取 Redis 节点和 Slot 虚拟槽
  12. cache.discoverClusterNodesAndSlots(jedis);
  13. // 直接跳出循环
  14. break;
  15. } catch (JedisConnectionException e) {
  16. // try next nodes
  17. } finally {
  18. if (jedis != null) {
  19. jedis.close();
  20. }
  21. }
  22. }</code>
复制代码

2、用获取的 redis 毗连实例实行 clusterSlots ()方法,现实实行 redis 服务端 clusterslots 下令,获取虚拟槽信息。
该集合的基本信息为[long, long, List, List], 第一,二个元素是该节点负责槽点的起始位置,第三个元素是主节点信息,第四个元素为主节点对应的从节点信息。该 list 的基本信息为[string,int,string],第一个为 host 信息,第二个为 port 信息,第三个为唯一id。

095038n7txs5xhib3jnn6z.png

3、获取有关节点的槽点信息后,调用 getAssignedSlotArray(slotinfo)来获取所有的槽点值。
4、再获取主节点的地点信息,调用 generateHostAndPort(hostInfo)方法,天生一个 ostAndPort 对象。
5、再根据节点地点信息来设置节点对应的 JedisPool,即设置 Map nodes 的值。
接下来判定若此时节点信息为主节点信息时,则调用 assignSlotsToNodes 方法,设置每个槽点值对应的毗连池,即设置 Map slots 的值。

  1. <code>public void discoverClusterNodesAndSlots(Jedis jedis) {
  2. w.lock();
  3. try {
  4. reset();
  5. // 获取节点集合
  6. List<Object> slots = jedis.clusterSlots();
  7. // 遍历 3 个 master 节点
  8. for (Object slotInfoObj : slots) {
  9. // slotInfo 槽开始,槽结束,主,从
  10. // {[0,5460,7291,7294],[5461,10922,7292,7295],[10923,16383,7293,7296]}
  11. List<Object> slotInfo = (List<Object>) slotInfoObj;
  12. // 假如<=2,代表没有分配 slot
  13. if (slotInfo.size() <= MASTER_NODE_INDEX) {
  14. continue;
  15. }
  16. // 获取分配到当前 master 节点的数据槽,比方 7291 节点的{0,1,2,3……5460}
  17. List<Integer> slotNums = getAssignedSlotArray(slotInfo);
  18. // hostInfos
  19. int size = slotInfo.size(); // size 是 4,槽最小最大,主,从
  20. // 第 3 位和第 4 位是主从端口的信息
  21. for (int i = MASTER_NODE_INDEX; i < size; i++) {
  22. List<Object> hostInfos = (List<Object>) slotInfo.get(i);
  23. if (hostInfos.size() <= 0) {
  24. continue;
  25. }
  26. // 根据 IP 端口天生 HostAndPort 实例
  27. HostAndPort targetNode = generateHostAndPort(hostInfos);
  28. // 据HostAndPort分析出ip:port的key值,再根据key从缓存中查询对应的jedisPool实例。假如没有jedisPool实例,就创建 JedisPool 实例,末了放入缓存中。nodeKey 和 nodePool 的关系
  29. setupNodeIfNotExist(targetNode);
  30. // 把 slot 和 jedisPool 缓存起来(16384 个),key 是 slot 下标,value 是毗连池
  31. if (i == MASTER_NODE_INDEX) {
  32. assignSlotsToNode(slotNums, targetNode);
  33. }
  34. }
  35. } finally {
  36. w.unlock();
  37. }
  38. }
  39. </code>
复制代码

从集群情况存取值:
1、把 key 作为参数,实行 CRC16 算法,获取 key 对应的 slot 值。
2、通过该 slot 值,去 slots 的 map 集合中获取 jedisPool 实例。
3、通过 jedisPool 实例获取 jedis 实例,终极完成 redis 数据存取工作。

1.2.4 pipeline

我们看到 set 2 万个 key 用了好几分钟,这个速率太慢了,完全没有把 Redis 10万的 QPS 使用起来。但是单个下令的实行到底慢在哪里?

1.2.4.1 慢在 哪里?

Redis 使用的是客户端/服务器(C/S)模子和请求/响应协议的 TCP 服务器。这意味着通常情况下一个请求会遵照以下步骤:

  • 客户端向服务端发送一个查询请求,并监听 Socket 返回,通常是以壅闭模式,等待服务端响应。

  • 服务端处置惩罚下令,并将结果返回给客户端。

Redis 客户端与 Redis 服务器之间使用 TCP 协议举行毗连,一个客户端可以通过一个 socket 毗连发起多个请求下令。每个请求下令发出后 client 通常会壅闭并等待 redis服务器处置惩罚,redis 处置惩罚完请求下令后会将结果通过响应报文返回给 client,因此当实行多条下令的时间都必要等待上一条下令实行完毕才气实行。实行过程如图:

095038wa1ajajsvo8lzf77.png

Redis 自己提供了一些批量操作下令,比如 mget,mset,可以减少通讯的时间,但是大部分下令是不支持 multi 操作的,比方 hash 就没有.

由于通讯会有网络延迟,假如 client 和 server 之间的包传输时间必要 10 毫秒,一次交互就是 20 毫秒(RTT:Round Trip Time)。如许的话,client 1 秒钟也只能也只能发送 50 个下令。这显然没有充实使用 Redis 的处置惩罚能力。另外一个,Redis 服务端实行 I/O 的次数过多。

1.2.4.2 Pipeline 管道

https://redis.io/topics/pipelining
那我们能不能像数据库的 batch 操作一样,把一组下令组装在一起发送给 Redis 服务端实行,然后一次性得到返回结果呢?这个就是 Pipeline 的作用。Pipeline 通过一个队列把所有的下令缓存起来,然后把多个下令在一次毗连中发送给服务器。

095038ykdrhx4qztth9va9.png

先来看一下结果(先 flushall):
PipelineSet.java,PipelineGet.java
要实现 Pipeline,既要服务端的支持,也要客户端的支持。对于服务端来说,必要能够处置惩罚客户端通过一个 TCP 毗连发来的多个下令,而且逐个地实行下令一起返回 。

对于客户端来说,要把多个下令缓存起来,达到肯定的条件就发送出去,末了才处置惩罚 Redis 的应答(这里也要注意对客户端内存的斲丧)。

jedis-pipeline 的 client-buffer 限定:8192bytes,客户端堆积的下令高出 8192bytes 时,会发送给服务端。

源码:redis.clients.util.RedisOutputStream.java

  1. <code>public RedisOutputStream(final OutputStream out) {
  2. this(out, 8192);
  3. }</code>
复制代码

pipeline 对于下令条数没有限定,但是下令大概会受限于 TCP 包巨细。
假如 Jedis 发送了一组下令,而发送请求还没有结束,Redis 响应的结果会放在接缓冲区。假如吸收缓冲区满了,jedis 会通知 redis win=0,此时 redis 不会再发送结果给 jedis 端,转而把响应结果生存在 Redis 服务端的输出缓冲区中。

输出缓冲区的配置:redis.conf

client-output-buffer-limit

  1. <code>client-output-buffer-limit normal 0 0 0
  2. client-output-buffer-limit replica 256mb 64mb 60
  3. client-output-buffer-limit pubsub 32mb 8mb 60</code>
复制代码
配置 作用
class 客户端范例,分为三种。a)normal:平凡客户端;b)slave:slave 客户端,用于复制;c)pubsub:发布订阅客户端
hard limit 假如客户端使用的输出缓冲区大于,客户端会被立即关闭,0 代表不限定
soft limit
soft seconds
假如客户端使用的输出缓冲区高出了而且一连了秒,客户端会被立即关闭

每个客户端使用的输出缓冲区的巨细可以用 client list 下令查看

  1. <code>redis> client list</code>
复制代码
  1. <code>id=5 addr=192.168.8.1:10859 fd=8 name= age=5 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=5 qbuf-free=32763
  2. obl=16380 oll=227 omem=4654408 events=rw cmd=set</code>
复制代码
  • obl : 输出缓冲区的长度(字节为单元, 0 表现没有分配输出缓冲区)
  • oll : 输出列表包含的对象数量(当输出缓冲区没有剩余空间时,下令回复会以字符串对象的情势被入队到这个队列里)
  • omem : 输出缓冲区和输出列表占用的内存总量
1.2.4.3 使用 场景

Pipeline 实用于什么场景呢?
假如某些操作必要立刻得到 Redis 操作是否乐成的结果,这种场景就不得当。
有些场景,比方批量写入数据,对于结果的及时性和乐成性要求不高,就可以用Pipeline

1.2.5 Jedis 实现分布式 锁

原文地点:https://redis.io/topics/distlock
中文地点:http://redis.cn/topics/distlock.html

分布式锁的基本特性或者要求:
1、互斥性:只有一个客户端能够持有锁。
2、不会产生死锁:纵然持有锁的客户端瓦解,也能包管后续其他客户端可以获取锁。
3、只有持有这把锁的客户端才气解锁。

distlock.DistLock.java

  1. <code>/**
  2. * 实验获取分布式锁
  3. * @param jedis Redis客户端
  4. * @param lockKey 锁
  5. * @param requestId 请求标识
  6. * @param expireTime 超期时间
  7. * @return 是否获取乐成
  8. */
  9. public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
  10. // set支持多个参数 NX(not exist) XX(exist) EX(seconds) PX(million seconds)
  11. String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
  12. if (LOCK_SUCCESS.equals(result)) {
  13. return true;
  14. }
  15. return false;
  16. }</code>
复制代码

参数解读:
1、lockKey 是 Redis key 的名称,也就是谁添加乐成这个 key 代表谁获取锁乐成。
2、requestId 是客户端的 ID(设置成 value),假如我们要包管只有加锁的客户端才气释放锁,就必须得到客户端的 ID(包管第 3 点)。
3、SET_IF_NOT_EXIST 是我们的下令内里加上 NX(包管第 1 点)。
4、SET_WITH_EXPIRE_TIME,PX 代表以毫秒为单元设置 key 的逾期时间(包管第 2 点)。expireTime 是主动释放锁的时间,比如 5000 代表 5 秒。

释放锁,直接删除 key 来释放锁可以吗?就像如许:

  1. <code>public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
  2. jedis.del(lockKey);
  3. }</code>
复制代码

没有对客户端 requestId 举行判定,大概会释放其他客户端持有的锁。
先判定后删除呢?

  1. <code>public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) {
  2. // 判定加锁与解锁是不是同一个客户端
  3. if (requestId.equals(jedis.get(lockKey))) {
  4. // 若在此时,这把锁突然不是这个客户端的,则会误解锁
  5. jedis.del(lockKey);
  6. }
  7. }</code>
复制代码

假如在释放锁的时间,这把锁已经不属于这个客户端(比方已经逾期,而且被别的客户端获取锁乐成了),那就会出现释放了其他客户端的锁的情况。

以是我们把判定客户端是否相称和删除 key 的操作放在 Lua 脚本内里实行。

  1. <code>/**
  2. * 释放分布式锁
  3. * @param jedis Redis客户端
  4. * @param lockKey 锁
  5. * @param requestId 请求标识
  6. * @return 是否释放乐成
  7. */
  8. public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
  9. String script = "if redis.call(&#39;get&#39;, KEYS[1]) == ARGV[1] then return redis.call(&#39;del&#39;, KEYS[1]) else return 0 end";
  10. Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
  11. if (RELEASE_SUCCESS.equals(result)) {
  12. return true;
  13. }
  14. return false;
  15. } </code>
复制代码

这个是 Jedis 内里分布式锁的实现。

1.3 Luttece

https://lettuce.io/

1.3.1 特点

与 Jedis 相比,Lettuce 则完全降服了其线程不安全的缺点:Lettuce 是一个可伸缩的线程安全的 Redis 客户端,支持同步、异步和响应式模式(Reactive)。多个线程可以共享一个毗连实例,而不必担心多线程并发题目。
同步调用:

  1. <code>public class LettuceSyncTest {
  2. public static void main(String[] args) {
  3. // 创建客户端
  4. RedisClient client = RedisClient.create("redis://127.0.0.1:6379");
  5. // 线程安全的长毗连,毗连丢失时会主动重连
  6. StatefulRedisConnection<String, String> connection = client.connect();
  7. // 获取同步实行下令,默认超时时间为 60s
  8. RedisCommands<String, String> sync = connection.sync();
  9. // 发送get请求,获取值
  10. sync.set("gupao:sync","lettuce-sync-666" );
  11. String value = sync.get("gupao:sync");
  12. System.out.println("------"+value);
  13. //关闭毗连
  14. connection.close();
  15. //关掉客户端
  16. client.shutdown();
  17. }
  18. }
  19. </code>
复制代码

异步的结果使用 RedisFuture 包装,提供了大量回调的方法。
异步调用:

  1. <code>
  2. import io.lettuce.core.RedisClient;
  3. import io.lettuce.core.RedisFuture;
  4. import io.lettuce.core.api.StatefulRedisConnection;
  5. import io.lettuce.core.api.async.RedisAsyncCommands;
  6. import java.util.concurrent.ExecutionException;
  7. import java.util.concurrent.TimeUnit;
  8. import java.util.concurrent.TimeoutException;
  9. public class LettuceASyncTest {
  10. public static void main(String[] args) {
  11. RedisClient client = RedisClient.create("redis://127.0.0.1:6379");
  12. // 线程安全的长毗连,毗连丢失时会主动重连
  13. StatefulRedisConnection<String, String> connection = client.connect();
  14. // 获取异步实行下令api
  15. RedisAsyncCommands<String, String> commands = connection.async();
  16. // 获取RedisFuture<T>
  17. commands.set("gupao:async","lettuce-async-666");
  18. RedisFuture<String> future = commands.get("gupao:async");
  19. try {
  20. String value = future.get(60, TimeUnit.SECONDS);
  21. System.out.println("------"+value);
  22. } catch (InterruptedException | ExecutionException | TimeoutException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. }</code>
复制代码

它基于 Netty 框架构建,支持 Redis 的高级功能,如 Pipeline、发布订阅,事务、Sentinel,集群,支持毗连池。

Lettuce 是 Spring Boot 2.x 默认的客户端,替换了 Jedis。集成之后我们不必要单独使用它,直接调用 Spring 的 RedisTemplate 操作,毗连和创建和关闭也不必要我们操心。

  1. <code><dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-redis</artifactId>
  4. </dependency></code>
复制代码

1.4 Redisson

https://redisson.org/
https://github.com/redisson/redisson/wiki/目次

1.4.1 本质

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory
Data Grid),提供了分布式和可扩展的 Java 数据结构。

1.4.2 特点

基于 Netty 实现,采用非壅闭 IO,性能高
支持异步请求
支持毗连池、pipeline、LUA Scripting、Redis Sentinel、Redis Cluster
不支持事务,官方发起以 LUA Scripting 代替事务
主从、哨兵、集群都支持。Spring 也可以配置和注入 RedissonClient。

1.4.3 实现分布式锁

在 Redisson 内里提供了更加简单的分布式锁的实现。

095039iagkzkx9kbbyp94g.png

  1. <code>public static void main(String[] args) throws InterruptedException {
  2. RLock rLock=redissonClient.getLock("updateAccount");
  3. // 最多等待 100 秒、上锁 10s 以后主动解锁
  4. if(rLock.tryLock(100,10, TimeUnit.SECONDS)){
  5. System.out.println("获取锁乐成");
  6. }
  7. // do something
  8. rLock.unlock();
  9. }</code>
复制代码

在得到 RLock 之后,只必要一个 tryLock 方法,内里有 3 个参数:
1、watiTime:获取锁的最大等待时间,高出这个时间不再实验获取锁
2、leaseTime:假如没有调用 unlock,高出了这个时间会主动释放锁
3、TimeUnit:释放时间的单元

Redisson 的分布式锁是怎么实现的呢?
在加锁的时间,在 Redis 写入了一个 HASH,key 是锁名称,field 是线程名称,value是 1(表现锁的重入次数)

095039vycpntmfj5mferho.png

源码:
tryLock()——tryAcquire()——tryAcquireAsync()——tryLockInnerAsync()

终极也是调用了一段 Lua 脚本。内里有一个参数,两个参数的值

占位 添补 寄义 现实值
KEYS[1] getName() 锁的名称(key) updateAccount
ARGV[1] internalLockLeaseTime 锁释放时间(毫秒) 10000
ARGV[2] getLockName(threadId) 线程名称 b60a9c8c-92f8-4bfe-b0e7-308967346336:1
  1. <code>// KEYS[1] 锁名称 updateAccount
  2. // ARGV[1] key 逾期时间 10000ms
  3. // ARGV[2] 线程名称
  4. // 锁名称不存在
  5. if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then
  6. // 创建一个 hash,key=锁名称,field=线程名,value=1
  7. redis.call(&#39;hset&#39;, KEYS[1], ARGV[2], 1);
  8. // 设置 hash 的逾期时间
  9. redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]);
  10. return nil;
  11. end;
  12. // 锁名称存在,判定是否当火线程持有的锁
  13. if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[2]) == 1) then
  14. // 假如是,value+1,代表重入次数+1
  15. redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[2], 1);
  16. // 重新得到锁,必要重新设置 Key 的逾期时间
  17. redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[1]);
  18. return nil;
  19. end;
  20. // 锁存在,但是不是当火线程持有,返回逾期时间(毫秒)
  21. return redis.call(&#39;pttl&#39;, KEYS[1])</code>
复制代码

释放锁,源码:

unlock——unlockInnerAsync

占位 添补 寄义 现实值
KEYS[1] getName() 锁名称 updateAccount
KEYS[2] getChannelName() 频道名称 redisson_lock__channel:{updateAccount}
KEYS[3] LockPubSub.unlockMessage 解锁时的消息 0
KEYS[4] internalLockLeaseTime 释放锁的时间 10000
KEYS[5] getLockName(threadId) 线程名称 b60a9c8c-92f8-4bfe-b0e7-308967346336:1
  1. <code>// KEYS[1] 锁的名称 updateAccount
  2. // KEYS[2] 频道名称 redisson_lock__channel:{updateAccount}
  3. // ARGV[1] 释放锁的消息 0
  4. // ARGV[2] 锁释放时间 10000
  5. // ARGV[3] 线程名称
  6. // 锁不存在(逾期或者已经释放了)
  7. if (redis.call(&#39;exists&#39;, KEYS[1]) == 0) then
  8. // 发布锁已经释放的消息
  9. redis.call(&#39;publish&#39;, KEYS[2], ARGV[1]);
  10. return 1;
  11. end;
  12. // 锁存在,但是不是当火线程加的锁
  13. if (redis.call(&#39;hexists&#39;, KEYS[1], ARGV[3]) == 0) then
  14. return nil;
  15. end;
  16. // 锁存在,是当火线程加的锁
  17. // 重入次数-1
  18. local counter = redis.call(&#39;hincrby&#39;, KEYS[1], ARGV[3], -1);
  19. // -1 后大于 0,说明这个线程持有这把锁另有其他的使命必要实行
  20. if (counter > 0) then
  21. // 重新设置锁的逾期时间
  22. redis.call(&#39;pexpire&#39;, KEYS[1], ARGV[2]);
  23. return 0;
  24. else
  25. // -1 之后即是 0,如今可以删除锁了
  26. redis.call(&#39;del&#39;, KEYS[1]);
  27. // 删除之后发布释放锁的消息
  28. redis.call(&#39;publish&#39;, KEYS[2], ARGV[1]);
  29. return 1;
  30. end;
  31. // 其他情况返回 nil
  32. return nil;</code>
复制代码

这个是 Redisson 内里分布式锁的实现,我们在调用的时间非常简单。
Redisson 跟 Jedis 定位差别,它不是一个单纯的 Redis 客户端,而是基于 Redis 实现的分布式的服务,假如有必要用到一些分布式的数据结构,比如我们还可以基于Redisson 的分布式队列实现分布式事务,就可以引入 Redisson 的依赖实现。

2 数据 一致性

2.1 缓存 使用场景

针对读多写少的高并发场景,我们可以使用缓存来提升查询速率。
当我们使用 Redis 作为缓存的时间,一般流程是如许的:
1、假如数据在 Redis 存在,应用就可以直接从 Redis 拿到数据,不消访问数据库。

095039he9de7ft993939nk.png

2、假如 Redis 内里没有,先到数据库查询,然后写入到 Redis,再返回给应用

095040z8w0z10u0sg1umqz.png

2.2 一致性 题目的界说

因为这些数据是很少修改的,以是在绝大部分的情况下可以掷中缓存。但是,一旦被缓存的数据发生变革的时间,我们既要操作数据库的数据,也要操作 Redis 的数据,以是题目来了。如今我们有两种选择:

1、先操作 Redis 的数据再操作数据库的数据
2、先操作数据库的数据再操作 Redis 的数据

到底选哪一种?
首先必要明确的是,不管选择哪一种方案, 我们肯定是希望两个操作要么都乐成,要么都一个都不乐成。否则就会发生 Redis 跟数据库的数据不一致的题目。

但是,Redis 的数据和数据库的数据是不大概通过事务达到同一的,我们只能根据相应的场景和所必要付出的代价来采取一些步伐低沉数据不一致的题目出现的概率,在数据一致性和性能之间取得一个衡量。
对于数据库的及时性一致性要求不是特殊高的场合,比如 T+1 的报表,可以采用定时使命查询数据库数据同步到 Redis 的方案。
由于我们是以数据库的数据为准的,以是给缓存设置一个逾期时间,是包管终极一致性的办理方案。

2.3 方案 选择

2.3.1 Redis :删除还是 更新?

这里我们先要补充一点,当存储的数据发生变革,Redis 的数据也要更新的时间,我们有两种方案,一种就是直接更新,调用 set;另有一种是直接删除缓存,让应用在下次查询的时间重新写入。
这两种方案怎么选择呢?这里我们重要考虑更新缓存的代价。
更新缓存之前,是不是要经过其他表的查询、接口调用、盘算才气得到最新的数据,而不是直接从数据库拿到的值。假如是的话,发起直接删除缓存,这种方案更加简单,而且避免了数据库的数据和缓存不一致的情况。在一般情况下,我们也保举使用删除的方案。

这一点明确之后,如今我们就剩一个题目:

1、到底是先更新数据库,再删除缓存

2、还是先删除缓存,再更新数据库

我们先看第一种方案。

2.3.2 先更新 数据库,再删除缓存

正常情况:
更新数据库,乐成。
删除缓存,乐成。

非常情况:
1、更新数据库失败,步调捕获非常,不会走到下一步,以是数据不会出现不一致。
2、更新数据库乐成,删除缓存失败。数据库是新数据,缓存是旧数据,发生了不一致的情况。
这种题目怎么办理呢?我们可以提供一个重试的机制。
比如:假如删除缓存失败,我们捕获这个非常,把必要删除的 key 发送到消息队列。让后自己创建一个消耗者消耗,实验再次删除这个 key。
这种方式有个缺点,会对业务代码造成入侵。

以是我们又有了第二种方案(异步更新缓存):
因为更新数据库时会往 binlog 写入日志,以是我们可以通过一个服务来监听 binlog的变革(比如阿里的 canal),然后在客户端完成删除 key 的操作。假如删除失败的话,再发送到消息队列。
总之,对于后删除缓存失败的情况,我们的做法是不停地重试删除,直到乐成。
无论是重试还是异步删除,都是终极一致性的头脑。

2.3.3 先 删除缓存,再更新数据库

正常情况:
删除缓存,乐成。
更新数据库,乐成。

非常情况:
1、删除缓存,步调捕获非常,不会走到下一步,以是数据不会出现不一致。
2、删除缓存乐成,更新数据库失败。 因为以数据库的数据为准,以是不存在数据不一致的情况。

看起来好像没题目,但是假如有步调并发操作的情况下:
1)线程 A 必要更新数据,首先删除了 Redis 缓存
2)线程 B 查询数据,发现缓存不存在,到数据库查询旧值,写入 Redis,返回
3)线程 A 更新了数据库

这个时间,Redis 是旧的值,数据库是新的值,发生了数据不一致的情况。

那题目就变成了:能不能让对同一条数据的访问串行化呢?代码肯定包管不了,因为有多个线程,纵然做了使命队列也大概有多个服务实例。数据库也包管不了,因为会有多个数据库的毗连。只有一个数据库只提供一个毗连的情况下,才气包管读写的操作是串行的,或者我们把所有的读写请求放到同一个内存队列当中,但是这种情况吞吐量太低了。

以是我们有一种延时双删的策略,在写入数据之后,再删除一次缓存。

A 线程:
1)删除缓存
2)更新数据库
3)休眠 500ms(这个时间,依据读取数据的耗时而定)
4)再次删除缓存

伪代码:

  1. <code>public void write(String key,Object data){
  2. redis.delKey(key);
  3. db.updateData(data);
  4. Thread.sleep(500);
  5. redis.delKey(key);
  6. }</code>
复制代码

3 高并发 题目

在 Redis 存储的所有数据中,有一部分是被频仍访问的。有两种情况大概会导致热点题目的产生,一个是用户会合访问的数据,比如抢购的商品,明星结婚和明星出轨的微博。另有一种就是在数据举行分片的情况下,负载不平衡,高出了单个服务器的蒙受能力。热点题目大概引起缓存服务的不可用,终极造成压力堆积到数据库。

出于存储和流量优化的角度,我们必须要找到这些热点数据。

3.1 热点数据 发现

除了主动的缓存淘汰机制之外,怎么找出那些访问频率高的 key 呢?或者说,我们可以在哪里记录 key 被访问的情况呢?

3.1.1 客户端

第一个当然是在客户端了,比如我们可不可以在所有调用了 get、set 方法的地方,加上 key 的计数。但是如许的话,每一个地方都要修改,重复的代码也多。假如我们用的是 Jedis 的客户端,我们可以在 Jedis 的 Connection 类的 sendCommand()内里,用一个 HashMap 举行 key 的计数。

但是这种方式有几个题目:
1、不知道要存多少个 key,大概会发生内存走漏的题目。
2、会对客户端的代码造成入侵。
3、只能统计当前客户端的热点 key。

3.1.2 署理层

第二种方式就是在署理端实现,比如 TwemProxy 或者 Codis,但是不是所有的项目都使用了署理的架构。

3.1.3 服务端

第三种就是在服务端统计,Redis 有一个 monitor 的下令,可以监控到所有 Redis实行的下令。

代码:

  1. <code>jedis.monitor(new JedisMonitor() {
  2. @Override
  3. public void onCommand(String command) {
  4. System.out.println("#monitor: " + command);
  5. }
  6. }); </code>
复制代码

095040ohzzzn5ijqqhztj6.png

Facebook 的 开 源 项 目 redis-faina(https://github.com/facebookarchive/redis-faina.git)就是基于这个原理实现的。它是一个 python 脚本,可以分析 monitor 的数据。

  1. <code>redis-cli -p 6379 monitor | head -n 100000 | ./redis-faina.py</code>
复制代码

这种方法也会有两个题目:1)monitor 下令在高并发的场景下,会影响性能,以是不得当长时间使用。
只能统计一个 Redis 节点的热点 key。

3.1.4 呆板层面

另有一种方法就是呆板层面的,通过对 TCP 协议举行抓包,也有一些开源的方案,
比如 ELK 的 packetbeat 插件。

当我们发现了热点 key 之后,我们来看下热点数据在高并发的场景下大概会出现的
题目,以及怎么去办理。

3.2 缓存 雪崩

3.2.1 什么 是缓存 雪崩

缓存雪崩就是 Redis 的大量热点数据同时逾期(失效),因为设置了相同的逾期时间,刚好这个时间 Redis 请求的并发量又很大,就会导致所有的请求落到数据库。

3.2.2 缓存雪崩 的办理方案

1)加互斥锁或者使用队列,针对同一个 key 只答应一个线程到数据库查询
2)缓存定时预先更新,避免同时失效
3)通过加随机数,使 key 在差别的时间逾期
4)缓存永不逾期

3.3 缓存穿透

3.3.1 缓存 穿透 何时 发生

我们已经知道了 Redis 使用的场景了。在缓存存在和缓存不存在的情况下的什么情况我们都相识了。

095041woosvobqsqbsmhvb.png

另有一种情况,数据在数据库和 Redis 内里都不存在,大概是一次条件错误的查询。在这种情况下,因为数据库值不存在,以是肯定不会写入 Redis,那么下一次查询相同的key 的时间,肯定还是会再到数据库查一次。那么这种循环查询数据库中不存在的值,而且每次使用的是相同的 key 的情况,我们有没有什么办法避免应用到数据库查询呢?

(1)缓存空数据 (2)缓存特殊字符串,比如&&
我们可以在数据库缓存一个空字符串,或者缓存一个特殊的字符串,那么在应用内里拿到这个特殊字符串的时间,就知道数据库没有值了,也没有必要再到数据库查询了。但是这里必要设置一个逾期时间,否则的话数据库已经新增了这一条记录,应用也还是拿不到值。

这个是应用重复查询同一个不存在的值的情况,假如应用每一次查询的不存在的值是不一样的呢?纵然你每次都缓存特殊字符串也没用,因为它的值不一样,比如我们的用户系统登录的场景,假如是恶意的请求,它每次都天生了一个符合 ID 规则的账号,但是这个账号在我们的数据库是不存在的,那 Redis 就完全失去了作用

这种因为每次查询的值都不存在导致的 Redis 失效的情况,我们就把它叫做缓存穿透。这个题目我们应该怎么去办理呢?

3.3.2 经典 口试题

其实它也是一个通用的题目,关键就在于我们怎么知道请求的 key 在我们的数据库内里是否存在,假如数据量特殊大的话,我们怎么去快速判定。

这也是一个非常经典的口试题:
如安在海量元素中(比方 10 亿无序、不定长、不重复)快速判定一个元素是否存在?

假如是缓存穿透的这个题目,我们要避免到数据库查询不存的数据,肯定要把这 10亿放在别的地方。这些数据在 Redis 内里也是没有的,为了加速检索速率,我们要把数据放到内存内里来判定,题目来了:
假如我们直接把这些元素的值放到基本的数据结构(List、Map、Tree)内里,比如一个元素 1 字节的字段,10 亿的数据大概必要 900G 的内存空间,这个对于平凡的服务器来说是蒙受不了的。
以是,我们存储这几十亿个元素,不能直接存值,我们应该找到一种最简单的最节省空间的数据结构,用来标志这个元素有没有出现。
这个东西我们就把它叫做位图,他是一个有序的数组,只有两个值,0 和 1。0 代表不存在,1 代表存在。

095041cqkjbq8uai87u8xu.png

那我们怎么用这个数组内里的有序的位置来标志这10亿个元素是否存在呢?我们是不是必须要有一个映射方法,把元素映射到一个下标位置上?
对于这个映射方法,我们有几个基本的要求:
1)因为我们的值长度是不固定的,我希望差别长度的输入,可以得到固定长度的输出。
2)转换成下标的时间,我希望他在我的这个有序数组内里是分布匀称的,否则的话全部挤到一对去了,我也没法判定到底哪个元素存了,哪个元素没存。
这个就是哈希函数,比如 MD5、SHA-1 等等这些都是常见的哈希算法。

095041e5cri45ais37d5ia.png

比如,这 6 个元素,我们经过哈希函数和位运算,得到了相应的下标。

3.3.3 哈希碰撞

这个时间,Tom 和 Mic 经过盘算得到的哈希值是一样的,那么再经过位运算得到的下标肯定是一样的,我们把这种情况叫做哈希辩说或者哈希碰撞。
假如发生了哈希碰撞,这个时间对于我们的容器存值肯定是有影响的,我们可以通过哪些方式去低沉哈希碰撞的概率呢?
第一种就是扩大维数组的长度或者说位图容量。因为我们的函数是分布匀称的,以是,位图容量越大,在同一个位置发生哈希碰撞的概率就越小。
是不是位图容量越大越好呢?不管存多少个元素,都创建一个几万亿巨细的位图,可以吗?当然不可,因为越大的位图容量,意味着越多的内存斲丧,以是我们要创建一个符合巨细的位图容量。
除了扩大位图容量,我们另有什么低沉哈希碰撞概率的方法呢?
假如两个元素经过一次哈希盘算,得到的相同下标的概率比较高,我可以不可以盘算多次呢? 原来我只用一个哈希函数,如今我对于每一个要存储的元素都用多个哈希函数盘算,如许每次盘算出来的下标都相同的概率就小得多了。
同样的,我们能不能引入很多个哈希函数呢?比如都盘算 100 次,都可以吗?当然也会有题目,第一个就是它会填满位图的更多空间,第二个是盘算是必要斲丧时间的。
以是总的来说,我们既要节省空间,又要很高的盘算服从,就必须在位图容量和函数个数之间找到一个最佳的平衡。
比如说:我们存放 100 万个元素,到底必要多大的位图容量,必要多少个哈希函数呢?

3.3.4 布隆过滤器原理

当然,这个事变早就有人研究过了,在 1970 年的时间,有一个叫做布隆的前辈对于判定海量元素中元素是否存在的题目举行了研究,也就是到底必要多大的位图容量和多少个哈希函数,它发表了一篇论文,提出的这个容器就叫做布隆过滤器。

我们来看一下布隆过滤器的工作原理。
首先,布隆过滤器的本质就是我们刚才分析的,一个位数组,和多少个哈希函数。

095042lex8klk4yeb7dibg.png

集合内里有 3 个元素,要把它存到布隆过滤器内里去,应该怎么做?首先是 a 元素,这里我们用 3 次盘算。b、c 元素也一样。
元素已经存进去之后,如今我要来判定一个元素在这个容器内里是否存在,就要使用同样的三个函数举行盘算。
比如 d 元素,我用第一个函数 f1 盘算,发现这个位置上是 1,没题目。第二个位置也是 1,第三个位置也是 1 。
假如经过三次盘算得到的下标位置值都是 1,这种情况下,能不能确定 d 元素肯定在这个容器内里呢? 现实上是不能的。比如这张图内里,这三个位置分别是把 a,b,c 存进去的时间置成 1 的,以是纵然 d 元素之前没有存进去,也会得到三个 1,判定返回 true。

以是,这个是布隆过滤器的一个很告急的特性,因为哈希碰撞不可避免,以是它会存在肯定的误判率。这种把原来不存在布隆过滤器中的元素误判为存在的情况,我们把它叫做假阳性(False Positive Probability,FPP)。
我们再来看另一个元素,e 元素。我们要判定它在容器内里是否存在,一样地要用这三个函数去盘算。第一个位置是 1,第二个位置是 1,第三个位置是 0。
e 元素是不是肯定不在这个容器内里呢? 可以确定肯定不存在。假如说当时已经把e 元素存到布隆过滤器内里去了,那么这三个位置肯定都是 1,不大概出现 0。
总结:布隆过滤器的特点:
从容器的角度来说:
1、假如布隆过滤器判定元素在集合中存在,不肯定存在
2、假如布隆过滤器判定不存在,肯定不存在从元素的角度来说:
3、假如元素现实存在,布隆过滤器肯定判定存在
4、假如元素现实不存在,布隆过滤器大概判定存在使用,第二个特性,我们是不是就能办理一连从数据库查询不存在的值的题目?

3.3.5 Guava 的 实现

谷歌的 Guava 内里就提供了一个现成的布隆过滤器。

  1. <code><dependency>
  2. <groupId>com.google.guava</groupId>
  3. <artifactId>guava</artifactId>
  4. <version>21.0</version>
  5. </dependency></code>
复制代码

创建布隆过滤器:

  1. <code>BloomFilter<String> bf = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), insertions);</code>
复制代码

布隆过滤器提供的存放元素的方法是 put()。
布隆过滤器提供的判定元素是否存在的方法是 mightContain()。

  1. <code>if (bf.mightContain(data)) {
  2. if (sets.contains(data)) {
  3. // 判定存在现实存在的时间,掷中
  4. right++;
  5. continue;
  6. }
  7. // 判定存在却不存在的时间,错误
  8. wrong++;
  9. }</code>
复制代码

布隆过滤器把误判率默认设置为 0.03,也可以在创建的时间指定。

  1. <code>public static <T> BloomFilter<T> create(Funnel<? super T> funnel, long expectedInsertions) {
  2. return create(funnel, expectedInsertions, 0.03D);
  3. }</code>
复制代码

位图的容量是基于元素个数和误判率盘算出来的。

  1. <code>long numBits = optimalNumOfBits(expectedInsertions, fpp);</code>
复制代码

根据位数组的巨细,我们进一步盘算出了哈希函数的个数。

  1. <code>int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);</code>
复制代码

存储 100 万个元素只占用了 0.87M 的内存,天生了 5 个哈希函数。

https://hur.st/bloomfilter/?n=1000000&p=0.03&m=&k=

3.3.6 布隆过滤器 在项目中的使用

布隆过滤器的工作位置:

095043lgf0z0p1grfgss4g.png

因为要判定数据库的值是否存在,以是第一步是加载数据库所有的数据。在去 Redis查询之前,先在布隆过滤器查询,假如 bf 说没有,那数据库肯定没有,也不消去查了。假如 bf 说有,才走之前的流程。

  1. <code>import com.google.common.base.Charsets;
  2. import com.google.common.hash.BloomFilter;
  3. import com.google.common.hash.Funnels;
  4. import com.gupaoedu.entity.User;
  5. import com.gupaoedu.service.UserService;
  6. import org.junit.Test;
  7. import org.junit.runner.RunWith;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
  10. import org.springframework.boot.test.context.SpringBootTest;
  11. import org.springframework.data.redis.core.RedisTemplate;
  12. import org.springframework.data.redis.core.ValueOperations;
  13. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  14. import javax.annotation.PostConstruct;
  15. import javax.annotation.Resource;
  16. import java.text.SimpleDateFormat;
  17. import java.util.Date;
  18. import java.util.List;
  19. import java.util.UUID;
  20. import java.util.concurrent.BrokenBarrierException;
  21. import java.util.concurrent.CyclicBarrier;
  22. import java.util.concurrent.ExecutorService;
  23. import java.util.concurrent.Executors;
  24. @RunWith(SpringJUnit4ClassRunner.class)
  25. @SpringBootTest
  26. @EnableAutoConfiguration
  27. public class BloomTestsConcurrency {
  28. @Resource
  29. private RedisTemplate redisTemplate;
  30. @Autowired
  31. private UserService userService;
  32. private static final int THREAD_NUM = 1000; // 并发线程数量,Windows呆板不要设置过大
  33. static BloomFilter<String> bf;
  34. static List<User> allUsers;
  35. @PostConstruct
  36. public void init() {
  37. // 从数据库获取数据,加载到布隆过滤器
  38. long start = System.currentTimeMillis();
  39. allUsers = userService.getAllUser();
  40. if (allUsers == null || allUsers.size() == 0) {
  41. return;
  42. }
  43. // 创建布隆过滤器,默认误判率0.03,即3%
  44. bf = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), allUsers.size());
  45. // 误判率越低,数组长度越长,必要的哈希函数越多
  46. // bf = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), allUsers.size(), 0.0001);
  47. // 将数据存入布隆过滤器
  48. for (User user : allUsers) {
  49. bf.put(user.getAccount());
  50. }
  51. long end = System.currentTimeMillis();
  52. System.out.println("查询并加载"+allUsers.size()+"条数据到布隆过滤器完毕,总耗时:"+(end -start ) +"毫秒");
  53. }
  54. @Test
  55. public void cacheBreakDownTest() {
  56. long start = System.currentTimeMillis();
  57. allUsers = userService.getAllUser();
  58. CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_NUM);
  59. ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUM);
  60. for (int i = 0; i < THREAD_NUM; i++){
  61. executorService.execute(new BloomTestsConcurrency().new MyThread(cyclicBarrier, redisTemplate, userService));
  62. }
  63. executorService.shutdown();
  64. //判定是否所有的线程已经运行完
  65. while (!executorService.isTerminated()) {
  66. }
  67. long end = System.currentTimeMillis();
  68. System.out.println("并发数:"+THREAD_NUM + ",新建线程以及过滤总耗时:"+(end -start ) +"毫秒,演示结束");
  69. }
  70. public class MyThread implements Runnable {
  71. private CyclicBarrier cyclicBarrier;
  72. private RedisTemplate redisTemplate;
  73. private UserService userService;
  74. public MyThread(CyclicBarrier cyclicBarrier, RedisTemplate redisTemplate, UserService userService) {
  75. this.cyclicBarrier = cyclicBarrier;
  76. this.redisTemplate = redisTemplate;
  77. this.userService = userService;
  78. }
  79. @Override
  80. public void run() {
  81. //所有子线程等待,当子线程全部创建完成再一起并发实行背面的代码
  82. try {
  83. cyclicBarrier.await();
  84. } catch (InterruptedException e) {
  85. e.printStackTrace();
  86. } catch (BrokenBarrierException e) {
  87. e.printStackTrace();
  88. }
  89. // 1.1 (测试:布隆过滤器判定不存在,拦截——假如没有布隆过滤器,将造成缓存穿透)
  90. // 随机产生一个字符串,在布隆过滤器中不存在
  91. String randomUser = UUID.randomUUID().toString();
  92. // 1.2 (测试:布隆过滤器判定存在,从Redis缓存取值,假如Redis为空则查询数据库并写入Redis)
  93. // 从List中获取一个存在的用户
  94. // String randomUser = allUsers.get(new Random().nextInt(allUsers.size())).getAccount();
  95. String key = "Key:" + randomUser;
  96. Date date1 = new Date();
  97. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  98. // 假如布隆过滤器中不存在这个用户直接返回,将流量挡掉
  99. /* if (!bf.mightContain(randomUser)) {
  100. System.out.println(sdf.format(date1)+" 布隆过滤器中不存在,非法请求");
  101. return;
  102. }*/
  103. // 查询缓存,假如缓存中存在直接返回缓存数据
  104. ValueOperations<String, String> operation =
  105. (ValueOperations<String, String>) redisTemplate.opsForValue();
  106. Object cacheUser = operation.get(key);
  107. if (cacheUser != null) {
  108. Date date2 = new Date();
  109. System.out.println(sdf.format(date2)+" 掷中redis缓存");
  110. return;
  111. }
  112. // TODO 防止并发重复写缓存,加锁
  113. synchronized (randomUser) {
  114. // 假如缓存不存在查询数据库
  115. List<User> user = userService.getUserByAccount(randomUser);
  116. if (user == null || user.size() == 0) {
  117. // 很轻易发生毗连池不够用的情况 HikariPool-1 - Connection is not available, request timed out after
  118. System.out.println(" Redis缓存不存在,查询数据库也不存在,发生缓存穿透!!!");
  119. return;
  120. }
  121. // 将mysql数据库查询到的数据写入到redis中
  122. Date date3 = new Date();
  123. System.out.println(sdf.format(date3)+" 从数据库查询并写入Reids");
  124. operation.set("Key:" + user.get(0).getAccount(), user.get(0).getAccount());
  125. }
  126. }
  127. }
  128. }</code>
复制代码

3.3.7 布隆过滤器 的其他应用场景

布隆过滤器办理的题目是什么?如安在海量元素中快速判定一个元素是否存在。以是除了办理缓存穿透的题目之外,我们另有很多其他的用途。
比如爬数据的爬虫,爬过的 url 我们不必要重复爬,那么在几十亿的 url 内里,怎么判定一个 url 是不是已经爬过了?
另有我们的邮箱服务器,发送垃圾邮件的账号我们把它们叫做 spamer,在这么多的邮箱账号内里,怎么判定一个账号是不是 spamer 等等一些场景,我们都可以用到布隆过滤器。

公众号

假如各人想要及时关注我更新的文章以及分享的干货的话,可以关注我的公众号。

095043gt9gt7otj9dzeeju.png







来源:https://www.cnblogs.com/sundaboke/p/11726624.html
C#论坛 www.ibcibc.com IBC编程社区
C#
C#论坛
IBC编程社区
*滑块验证:
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则